Whamcloud - gitweb
- changes about FLD cache. By now it is not belong module FLD and rather belong only...
authoryury <yury>
Fri, 30 Jun 2006 15:29:15 +0000 (15:29 +0000)
committeryury <yury>
Fri, 30 Jun 2006 15:29:15 +0000 (15:29 +0000)
- fid_is_local() uses client FLD cache, site contains ls_client_fld which is initialized and used by cmm;
- in fld_client_del_target() use class_export_put() instead of class_export_get(). The same in fini path;

- added struct fld_target which wraps obd_export and target index to not use silly counter while getting correct target by hash. This prevents it from wrong behavios if some target will be removed in alive cluster;

- all __u64 seq are replaced by seqno_t seq and all __u64 mds replaced by mdsno_t mds.

17 files changed:
lustre/cmm/cmm_device.c
lustre/cmm/cmm_internal.h
lustre/cmm/cmm_object.c
lustre/fid/fid_lib.c
lustre/fid/fid_request.c
lustre/fld/fld_cache.c
lustre/fld/fld_handler.c
lustre/fld/fld_index.c
lustre/fld/fld_internal.h
lustre/fld/fld_request.c
lustre/fld/lproc_fld.c
lustre/include/lu_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_export.h
lustre/include/lustre_fid.h
lustre/include/lustre_fld.h
lustre/mdt/mdt_handler.c

index d410861..ba05142 100644 (file)
@@ -91,6 +91,7 @@ static int cmm_add_mdc(const struct lu_context *ctx,
 {
         struct  lu_device_type *ldt = &mdc_device_type;
         struct  lu_device *ld;
+        struct lu_site *ls;
         struct  mdc_device *mc, *tmp;
         char *p, *num = lustre_cfg_string(cfg, 2);
         mdsno_t mdc_num;
@@ -142,7 +143,8 @@ static int cmm_add_mdc(const struct lu_context *ctx,
 
                 lu_device_get(cmm2lu_dev(cm));
 
-                fld_client_add_target(&cm->cmm_fld,
+                ls = cm->cmm_md_dev.md_lu_dev.ld_site;
+                fld_client_add_target(ls->ls_client_fld,
                                       mc->mc_desc.cl_exp);
         }
         RETURN(rc);
@@ -255,8 +257,8 @@ static int cmm_device_init(const struct lu_context *ctx,
                            struct lu_device *d, struct lu_device *next)
 {
         struct cmm_device *m = lu2cmm_dev(d);
+        struct lu_site *ls;
         int err = 0;
-
         ENTRY;
 
         spin_lock_init(&m->cmm_tgt_guard);
@@ -264,7 +266,12 @@ static int cmm_device_init(const struct lu_context *ctx,
         m->cmm_tgt_count = 0;
         m->cmm_child = lu2md_dev(next);
 
-        err = fld_client_init(&m->cmm_fld, "CMM_UUID",
+        ls = m->cmm_md_dev.md_lu_dev.ld_site;
+        OBD_ALLOC_PTR(ls->ls_client_fld);
+        if (!ls->ls_client_fld)
+                RETURN(-ENOMEM);
+        
+        err = fld_client_init(ls->ls_client_fld, "CMM_UUID",
                               LUSTRE_CLI_FLD_HASH_RRB);
         if (err) {
                 CERROR("can't init FLD, err %d\n",  err);
@@ -277,9 +284,14 @@ static struct lu_device *cmm_device_fini(const struct lu_context *ctx,
 {
        struct cmm_device *cm = lu2cmm_dev(ld);
         struct mdc_device *mc, *tmp;
+        struct lu_site *ls;
         ENTRY;
 
-        fld_client_fini(&cm->cmm_fld);
+        ls = cm->cmm_md_dev.md_lu_dev.ld_site;
+        fld_client_fini(ls->ls_client_fld);
+        OBD_FREE_PTR(ls->ls_client_fld);
+        ls->ls_client_fld = NULL;
+        
         /* finish all mdc devices */
         spin_lock(&cm->cmm_tgt_guard);
         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
index 12806c4..2fbfd4e 100644 (file)
@@ -46,9 +46,6 @@ struct cmm_device {
         __u32                 cmm_tgt_count;
         struct list_head      cmm_targets;
         spinlock_t            cmm_tgt_guard;
-
-        /* client FLD interface */
-        struct lu_client_fld  cmm_fld;
 };
 
 enum cmm_flags {
index 1d11696..d1f38cd 100644 (file)
 static int cmm_fld_lookup(struct cmm_device *cm,
                           const struct lu_fid *fid, mdsno_t *mds)
 {
+        struct lu_site *ls;
         int rc = 0;
         ENTRY;
 
         LASSERT(fid_is_sane(fid));
         
-        rc = fld_client_lookup(&cm->cmm_fld, fid_seq(fid), mds);
+        ls = cm->cmm_md_dev.md_lu_dev.ld_site;
+        
+        rc = fld_client_lookup(ls->ls_client_fld,
+                               fid_seq(fid), mds);
         if (rc) {
                 CERROR("can't find mds by seq "LPX64", rc %d\n",
                        fid_seq(fid), rc);
index 14bad9e..8811954 100644 (file)
@@ -38,6 +38,8 @@
 # include <liblustre.h>
 #endif
 
+#include <obd.h>
+#include <lu_object.h>
 #include <lustre_fid.h>
 
 void fid_to_le(struct lu_fid *dst, const struct lu_fid *src)
@@ -51,3 +53,28 @@ void fid_to_le(struct lu_fid *dst, const struct lu_fid *src)
         dst->f_ver = le32_to_cpu(fid_ver(src));
 }
 EXPORT_SYMBOL(fid_to_le);
+
+/*
+ * Returns true, if fid is local to this server node.
+ *
+ * WARNING: this function is *not* guaranteed to return false if fid is
+ * remote: it makes an educated conservative guess only.
+ *
+ * fid_is_local() is supposed to be used in assertion checks only.
+ */
+int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
+{
+        int result;
+
+        result = 1; /* conservatively assume fid is local */
+        if (site->ls_client_fld != NULL) {
+                struct fld_cache_entry *entry;
+
+                entry = fld_cache_lookup(site->ls_client_fld->fld_cache,
+                                         fid_seq(fid));
+                if (entry != NULL)
+                        result = (entry->fce_mds == site->ls_node_id);
+        }
+        return result;
+}
+EXPORT_SYMBOL(fid_is_local);
index f41a4c1..4706f93 100644 (file)
@@ -161,7 +161,7 @@ EXPORT_SYMBOL(seq_client_alloc_meta);
 
 /* allocate new sequence for client (llite or MDC are expected to use this) */
 static int
-__seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+__seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
 {
         int rc = 0;
         ENTRY;
@@ -190,7 +190,7 @@ __seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
 }
 
 int
-seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
 {
         int rc = 0;
         ENTRY;
@@ -206,7 +206,7 @@ EXPORT_SYMBOL(seq_client_alloc_seq);
 int
 seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
 {
-        __u64 seqnr = 0;
+        seqno_t seqnr = 0;
         int rc;
         ENTRY;
 
index 6dca0b6..6d5105d 100644 (file)
@@ -83,6 +83,7 @@ struct fld_cache_info *fld_cache_init(int size)
         
         RETURN(cache);
 }
+EXPORT_SYMBOL(fld_cache_init);
 
 void fld_cache_fini(struct fld_cache_info *cache)
 {
@@ -112,10 +113,11 @@ void fld_cache_fini(struct fld_cache_info *cache)
        
         EXIT;
 }
+EXPORT_SYMBOL(fld_cache_fini);
 
 int
 fld_cache_insert(struct fld_cache_info *cache,
-                 __u64 seq, __u64 mds)
+                 seqno_t seq, mdsno_t mds)
 {
         struct fld_cache_entry *flde, *fldt;
         struct hlist_head *bucket;
@@ -149,9 +151,10 @@ exit_unlock:
                 OBD_FREE_PTR(flde);
         return rc;
 }
+EXPORT_SYMBOL(fld_cache_insert);
 
 void
-fld_cache_delete(struct fld_cache_info *cache, __u64 seq)
+fld_cache_delete(struct fld_cache_info *cache, seqno_t seq)
 {
         struct fld_cache_entry *flde;
         struct hlist_head *bucket;
@@ -175,9 +178,10 @@ out_unlock:
         spin_unlock(&cache->fci_lock);
         return;
 }
+EXPORT_SYMBOL(fld_cache_delete);
 
 struct fld_cache_entry *
-fld_cache_lookup(struct fld_cache_info *cache, __u64 seq)
+fld_cache_lookup(struct fld_cache_info *cache, seqno_t seq)
 {
         struct fld_cache_entry *flde;
         struct hlist_head *bucket;
@@ -197,5 +201,6 @@ fld_cache_lookup(struct fld_cache_info *cache, __u64 seq)
         spin_unlock(&cache->fci_lock);
         RETURN(NULL);
 }
+EXPORT_SYMBOL(fld_cache_lookup);
 #endif
 
index 2067a38..3da5263 100644 (file)
 #include "fld_internal.h"
 
 #ifdef __KERNEL__
-struct fld_cache_info *fld_cache = NULL;
-
-static int fld_init(void)
-{
-        int rc = 0;
-        ENTRY;
-
-        fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
-        if (IS_ERR(fld_cache))
-                rc = PTR_ERR(fld_cache);
-
-        if (rc != 0)
-                fld_cache = NULL;
-
-        RETURN(rc);
-}
-
-static void fld_fini(void)
-{
-        ENTRY;
-        if (fld_cache != NULL) {
-                fld_cache_fini(fld_cache);
-                fld_cache = NULL;
-        }
-        EXIT;
-}
-
 static int __init fld_mod_init(void)
 {
-        fld_init();
+        /* nothing to init seems */
         return 0;
 }
 
 static void __exit fld_mod_exit(void)
 {
-        fld_fini();
+        /* nothing to fini seems */
         return;
 }
 
@@ -97,54 +70,32 @@ static void __exit fld_mod_exit(void)
 int
 fld_server_create(struct lu_server_fld *fld,
                   const struct lu_context *ctx,
-                  __u64 seq, mdsno_t mds)
+                  seqno_t seq, mdsno_t mds)
 {
-        int rc;
         ENTRY;
-
-        rc = fld_index_create(fld, ctx, seq, mds);
-        if (rc == 0) {
-                /* do not return result of calling fld_cache_insert()
-                 * here. First of all because it may return -EEXISTS. Another
-                 * reason is that, we do not want to stop proceeding because of
-                 * cache errors. --umka */
-                fld_cache_insert(fld_cache, seq, mds);
-        }
-        RETURN(rc);
+        RETURN(fld_index_create(fld, ctx, seq, mds));
 }
 EXPORT_SYMBOL(fld_server_create);
 
-/* delete index entry and update cache */
+/* delete index entry */
 int
 fld_server_delete(struct lu_server_fld *fld,
                   const struct lu_context *ctx,
-                  __u64 seq)
+                  seqno_t seq)
 {
         ENTRY;
-        fld_cache_delete(fld_cache, seq);
         RETURN(fld_index_delete(fld, ctx, seq));
 }
 EXPORT_SYMBOL(fld_server_delete);
 
-/* lookup in cache first and then issue index lookup */
+/* issue on-disk index lookup */
 int
 fld_server_lookup(struct lu_server_fld *fld,
                   const struct lu_context *ctx,
-                  __u64 seq, mdsno_t *mds)
+                  seqno_t seq, mdsno_t *mds)
 {
-        struct fld_cache_entry *flde;
-        int rc;
         ENTRY;
-
-        /* lookup it in the cache first */
-        flde = fld_cache_lookup(fld_cache, seq);
-        if (flde != NULL) {
-                *mds = flde->fce_mds;
-                RETURN(0);
-        }
-
-        rc = fld_index_lookup(fld, ctx, seq, mds);
-        RETURN(rc);
+        RETURN(fld_index_lookup(fld, ctx, seq, mds));
 }
 EXPORT_SYMBOL(fld_server_lookup);
 
@@ -236,7 +187,7 @@ static int fld_req_handle(struct ptlrpc_request *req)
                 if (req->rq_export != NULL) {
                         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
                         LASSERT(site != NULL);
-                        rc = fld_req_handle0(ctx, site->ls_fld, req);
+                        rc = fld_req_handle0(ctx, site->ls_server_fld, req);
                 } else {
                         CERROR("Unconnected request\n");
                         req->rq_status = -ENOTCONN;
@@ -255,30 +206,6 @@ out:
         return 0;
 }
 
-/*
- * Returns true, if fid is local to this server node.
- *
- * WARNING: this function is *not* guaranteed to return false if fid is
- * remote: it makes an educated conservative guess only.
- *
- * fid_is_local() is supposed to be used in assertion checks only.
- */
-int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
-{
-        int result;
-
-        result = 1; /* conservatively assume fid is local */
-        if (site->ls_fld != NULL) {
-                struct fld_cache_entry *entry;
-
-                entry = fld_cache_lookup(fld_cache, fid_seq(fid));
-                if (entry != NULL)
-                        result = (entry->fce_mds == site->ls_node_id);
-        }
-        return result;
-}
-EXPORT_SYMBOL(fid_is_local);
-
 #ifdef LPROCFS
 static int
 fld_server_proc_init(struct lu_server_fld *fld)
index 88a84f1..c4580b1 100644 (file)
@@ -53,8 +53,8 @@
 
 static const struct dt_index_features fld_index_features = {
         .dif_flags       = DT_IND_UPDATE,
-        .dif_keysize_min = sizeof(fidseq_t),
-        .dif_keysize_max = sizeof(fidseq_t),
+        .dif_keysize_min = sizeof(seqno_t),
+        .dif_keysize_max = sizeof(seqno_t),
         .dif_recsize_min = sizeof(mdsno_t),
         .dif_recsize_max = sizeof(mdsno_t)
 };
@@ -102,7 +102,7 @@ static struct lu_context_key fld_thread_key = {
 };
 
 static struct dt_key *fld_key(const struct lu_context *ctx,
-                              const fidseq_t seq)
+                              const seqno_t seq)
 {
         struct fld_thread_info *info;
         ENTRY;
@@ -129,7 +129,7 @@ static struct dt_rec *fld_rec(const struct lu_context *ctx,
 
 int fld_index_create(struct lu_server_fld *fld,
                      const struct lu_context *ctx,
-                     fidseq_t seq, mdsno_t mds)
+                     seqno_t seq, mdsno_t mds)
 {
         struct dt_device *dt = fld->fld_dt;
         struct dt_object *dt_obj = fld->fld_obj;
@@ -156,7 +156,7 @@ int fld_index_create(struct lu_server_fld *fld,
 
 int fld_index_delete(struct lu_server_fld *fld,
                      const struct lu_context *ctx,
-                     fidseq_t seq)
+                     seqno_t seq)
 {
         struct dt_device *dt = fld->fld_dt;
         struct dt_object *dt_obj = fld->fld_obj;
@@ -180,7 +180,7 @@ int fld_index_delete(struct lu_server_fld *fld,
 
 int fld_index_lookup(struct lu_server_fld *fld,
                      const struct lu_context *ctx,
-                     fidseq_t seq, mdsno_t *mds)
+                     seqno_t seq, mdsno_t *mds)
 {
         struct dt_object *dt_obj = fld->fld_obj;
         struct dt_rec    *rec = fld_rec(ctx, 0);
index 812378f..37dd080 100644 (file)
 
 #include <linux/types.h>
 
-typedef __u64 fidseq_t;
-
-struct fld_cache_entry {
-        struct hlist_node  fce_list;
-        mdsno_t            fce_mds;
-        fidseq_t           fce_seq;
-};
-
-struct fld_cache_info {
-        struct hlist_head *fci_hash;
-        spinlock_t         fci_lock;
-        int                fci_size;
-        int                fci_hash_mask;
+struct fld_target {
+        struct list_head   fldt_chain;
+        struct obd_export *fldt_exp;
+        __u64              fldt_idx;
 };
 
 enum fld_op {
@@ -52,7 +43,6 @@ enum fld_op {
 #define FLD_HTABLE_SIZE 256
 
 extern struct lu_fld_hash fld_hash[3];
-extern struct fld_cache_info *fld_cache;
 
 #ifdef __KERNEL__
 #define FLD_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
@@ -65,35 +55,20 @@ void fld_index_fini(struct lu_server_fld *fld,
 
 int fld_index_create(struct lu_server_fld *fld,
                      const struct lu_context *ctx,
-                     fidseq_t seq, mdsno_t mds);
+                     seqno_t seq, mdsno_t mds);
 
 int fld_index_delete(struct lu_server_fld *fld,
                      const struct lu_context *ctx,
-                     fidseq_t seq);
+                     seqno_t seq);
 
 int fld_index_lookup(struct lu_server_fld *fld,
                      const struct lu_context *ctx,
-                     fidseq_t seq, mdsno_t *mds);
-
-struct fld_cache_info *fld_cache_init(int size);
-
-void fld_cache_fini(struct fld_cache_info *cache);
+                     seqno_t seq, mdsno_t *mds);
 
-int fld_cache_insert(struct fld_cache_info *cache,
-                     __u64 seq, __u64 mds);
-
-void fld_cache_delete(struct fld_cache_info *cache,
-                      __u64 seq);
-
-struct fld_cache_entry *
-fld_cache_lookup(struct fld_cache_info *cache,
-                 __u64 seq);
-
-static inline __u32 fld_cache_hash(__u64 seq)
+static inline __u32 fld_cache_hash(seqno_t seq)
 {
         return (__u32)seq;
 }
-
 #endif
 
 #ifdef LPROCFS
index 28c3c0b..852b2a8 100644 (file)
@@ -53,7 +53,7 @@
 #include "fld_internal.h"
 
 static int
-fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
+fld_rrb_hash(struct lu_client_fld *fld, seqno_t seq)
 {
         if (fld->fld_count == 0)
                 return 0;
@@ -62,7 +62,7 @@ fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
 }
 
 static int
-fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
+fld_dht_hash(struct lu_client_fld *fld, seqno_t seq)
 {
         /* XXX: here should DHT hash */
         return fld_rrb_hash(fld, seq);
@@ -82,11 +82,11 @@ struct lu_fld_hash fld_hash[3] = {
         }
 };
 
-static struct obd_export *
-fld_client_get_target(struct lu_client_fld *fld, __u64 seq)
+static struct fld_target *
+fld_client_get_target(struct lu_client_fld *fld, seqno_t seq)
 {
-        struct obd_export *fld_exp;
-        int count = 0, hash;
+        struct fld_target *target;
+        int hash;
         ENTRY;
 
         LASSERT(fld->fld_hash != NULL);
@@ -96,13 +96,12 @@ fld_client_get_target(struct lu_client_fld *fld, __u64 seq)
         spin_unlock(&fld->fld_lock);
 
         spin_lock(&fld->fld_lock);
-        list_for_each_entry(fld_exp,
-                            &fld->fld_exports, exp_fld_chain) {
-                if (count == hash) {
+        list_for_each_entry(target,
+                            &fld->fld_targets, fldt_chain) {
+                if (target->fldt_idx == hash) {
                         spin_unlock(&fld->fld_lock);
-                        RETURN(fld_exp);
+                        RETURN(target);
                 }
-                count++;
         }
         spin_unlock(&fld->fld_lock);
         RETURN(NULL);
@@ -115,7 +114,7 @@ fld_client_add_target(struct lu_client_fld *fld,
                       struct obd_export *exp)
 {
         struct client_obd *cli = &exp->exp_obd->u.cli;
-        struct obd_export *fld_exp;
+        struct fld_target *target, *tmp;
         ENTRY;
 
         LASSERT(exp != NULL);
@@ -123,21 +122,27 @@ fld_client_add_target(struct lu_client_fld *fld,
         CDEBUG(D_INFO|D_WARNING, "FLD(cli): adding export %s\n",
               cli->cl_target_uuid.uuid);
         
+        OBD_ALLOC_PTR(target);
+        if (target == NULL)
+                RETURN(-ENOMEM);
+        
         spin_lock(&fld->fld_lock);
-        list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
-                if (obd_uuid_equals(&fld_exp->exp_client_uuid,
+        list_for_each_entry(tmp, &fld->fld_targets, fldt_chain) {
+                if (obd_uuid_equals(&tmp->fldt_exp->exp_client_uuid,
                                     &exp->exp_client_uuid))
                 {
                         spin_unlock(&fld->fld_lock);
+                        OBD_FREE_PTR(target);
                         RETURN(-EEXIST);
                 }
         }
+
+        target->fldt_exp = class_export_get(exp);
+        target->fldt_idx = fld->fld_count;
         
-        fld_exp = class_export_get(exp);
-        list_add_tail(&fld_exp->exp_fld_chain,
-                      &fld->fld_exports);
+        list_add_tail(&target->fldt_chain,
+                      &fld->fld_targets);
         fld->fld_count++;
-        
         spin_unlock(&fld->fld_lock);
         
         RETURN(0);
@@ -147,22 +152,22 @@ EXPORT_SYMBOL(fld_client_add_target);
 /* remove export from FLD */
 int
 fld_client_del_target(struct lu_client_fld *fld,
-                          struct obd_export *exp)
+                      struct obd_export *exp)
 {
-        struct obd_export *fld_exp;
-        struct obd_export *tmp;
+        struct fld_target *target, *tmp;
         ENTRY;
 
         spin_lock(&fld->fld_lock);
-        list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
-                if (obd_uuid_equals(&fld_exp->exp_client_uuid,
+        list_for_each_entry_safe(target, tmp,
+                                 &fld->fld_targets, fldt_chain) {
+                if (obd_uuid_equals(&target->fldt_exp->exp_client_uuid,
                                     &exp->exp_client_uuid))
                 {
                         fld->fld_count--;
-                        list_del(&fld_exp->exp_fld_chain);
-                        class_export_get(fld_exp);
-
+                        list_del(&target->fldt_chain);
+                        class_export_put(target->fldt_exp);
                         spin_unlock(&fld->fld_lock);
+                        OBD_FREE_PTR(target);
                         RETURN(0);
                 }
         }
@@ -229,7 +234,7 @@ fld_client_init(struct lu_client_fld *fld,
                 RETURN(-EINVAL);
         }
         
-        INIT_LIST_HEAD(&fld->fld_exports);
+        INIT_LIST_HEAD(&fld->fld_targets);
         spin_lock_init(&fld->fld_lock);
         fld->fld_hash = &fld_hash[hash];
         fld->fld_count = 0;
@@ -237,6 +242,13 @@ fld_client_init(struct lu_client_fld *fld,
         snprintf(fld->fld_name, sizeof(fld->fld_name),
                  "%s-%s", LUSTRE_FLD_NAME, uuid);
         
+        fld->fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
+        if (IS_ERR(fld->fld_cache)) {
+                rc = PTR_ERR(fld->fld_cache);
+                fld->fld_cache = NULL;
+                GOTO(out, rc);
+        }
+
 #ifdef LPROCFS
         rc = fld_client_proc_init(fld);
         if (rc)
@@ -257,8 +269,7 @@ EXPORT_SYMBOL(fld_client_init);
 void
 fld_client_fini(struct lu_client_fld *fld)
 {
-        struct obd_export *fld_exp;
-        struct obd_export *tmp;
+        struct fld_target *target, *tmp;
         ENTRY;
 
 #ifdef LPROCFS
@@ -266,13 +277,20 @@ fld_client_fini(struct lu_client_fld *fld)
 #endif
         
         spin_lock(&fld->fld_lock);
-        list_for_each_entry_safe(fld_exp, tmp,
-                                 &fld->fld_exports, exp_fld_chain) {
+        list_for_each_entry_safe(target, tmp,
+                                 &fld->fld_targets, fldt_chain) {
                 fld->fld_count--;
-                list_del(&fld_exp->exp_fld_chain);
-                class_export_get(fld_exp);
+                list_del(&target->fldt_chain);
+                class_export_put(target->fldt_exp);
+                OBD_FREE_PTR(target);
         }
         spin_unlock(&fld->fld_lock);
+
+        if (fld->fld_cache != NULL) {
+                fld_cache_fini(fld->fld_cache);
+                fld->fld_cache = NULL;
+        }
+        
         CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
         EXIT;
 }
@@ -320,20 +338,20 @@ out_req:
 
 int
 fld_client_create(struct lu_client_fld *fld,
-                  __u64 seq, mdsno_t mds)
+                  seqno_t seq, mdsno_t mds)
 {
-        struct obd_export *fld_exp;
+        struct fld_target *target;
         struct md_fld      md_fld;
         __u32 rc;
         ENTRY;
 
-        fld_exp = fld_client_get_target(fld, seq);
-        if (!fld_exp)
+        target = fld_client_get_target(fld, seq);
+        if (!target)
                 RETURN(-EINVAL);
         md_fld.mf_seq = seq;
         md_fld.mf_mds = mds;
         
-        rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
+        rc = fld_client_rpc(target->fldt_exp, &md_fld, FLD_CREATE);
         
 #ifdef __KERNEL__
         if (rc  == 0) {
@@ -341,7 +359,7 @@ fld_client_create(struct lu_client_fld *fld,
                  * here. First of all because it may return -EEXISTS. Another
                  * reason is that, we do not want to stop proceeding because of
                  * cache errors. --umka */
-                fld_cache_insert(fld_cache, seq, mds);
+                fld_cache_insert(fld->fld_cache, seq, mds);
         }
 #endif
         
@@ -351,43 +369,44 @@ EXPORT_SYMBOL(fld_client_create);
 
 int
 fld_client_delete(struct lu_client_fld *fld,
-                  __u64 seq)
+                  seqno_t seq)
 {
-        struct obd_export *fld_exp;
+        struct fld_target *target;
         struct md_fld      md_fld;
         __u32 rc;
 
 #ifdef __KERNEL__
-        fld_cache_delete(fld_cache, seq);
+        fld_cache_delete(fld->fld_cache, seq);
 #endif
         
-        fld_exp = fld_client_get_target(fld, seq);
-        if (!fld_exp)
+        target = fld_client_get_target(fld, seq);
+        if (!target)
                 RETURN(-EINVAL);
 
         md_fld.mf_seq = seq;
         md_fld.mf_mds = 0;
 
-        rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
+        rc = fld_client_rpc(target->fldt_exp,
+                            &md_fld, FLD_DELETE);
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_client_delete);
 
 static int
 fld_client_get(struct lu_client_fld *fld,
-               __u64 seq, mdsno_t *mds)
+               seqno_t seq, mdsno_t *mds)
 {
-        struct obd_export *fld_exp;
+        struct fld_target *target;
         struct md_fld md_fld;
         int rc;
         ENTRY;
 
-        fld_exp = fld_client_get_target(fld, seq);
-        if (!fld_exp)
+        target = fld_client_get_target(fld, seq);
+        if (!target)
                 RETURN(-EINVAL);
                 
         md_fld.mf_seq = seq;
-        rc = fld_client_rpc(fld_exp,
+        rc = fld_client_rpc(target->fldt_exp,
                             &md_fld, FLD_LOOKUP);
         if (rc == 0)
                 *mds = md_fld.mf_mds;
@@ -398,7 +417,7 @@ fld_client_get(struct lu_client_fld *fld,
 /* lookup fid in the namespace of pfid according to the name */
 int
 fld_client_lookup(struct lu_client_fld *fld,
-                  __u64 seq, mdsno_t *mds)
+                  seqno_t seq, mdsno_t *mds)
 {
 #ifdef __KERNEL__
         struct fld_cache_entry *flde;
@@ -408,7 +427,7 @@ fld_client_lookup(struct lu_client_fld *fld,
 
 #ifdef __KERNEL__
         /* lookup it in the cache */
-        flde = fld_cache_lookup(fld_cache, seq);
+        flde = fld_cache_lookup(fld->fld_cache, seq);
         if (flde != NULL) {
                 *mds = flde->fce_mds;
                 RETURN(0);
@@ -423,7 +442,7 @@ fld_client_lookup(struct lu_client_fld *fld,
 #ifdef __KERNEL__
         /* do not return error here as well. See previous comment in same
          * situation in function fld_client_create(). --umka */
-        fld_cache_insert(fld_cache, seq, *mds);
+        fld_cache_insert(fld->fld_cache, seq, *mds);
 #endif
         
         RETURN(rc);
index d0988d1..10a3cc3 100644 (file)
@@ -53,17 +53,17 @@ fld_proc_read_targets(char *page, char **start, off_t off,
                       int count, int *eof, void *data)
 {
         struct lu_client_fld *fld = (struct lu_client_fld *)data;
-        struct obd_export *fld_exp;
+        struct fld_target *target;
        int total = 0, rc;
        ENTRY;
 
         LASSERT(fld != NULL);
 
         spin_lock(&fld->fld_lock);
-        list_for_each_entry(fld_exp,
-                            &fld->fld_exports, exp_fld_chain)
+        list_for_each_entry(target,
+                            &fld->fld_targets, fldt_chain)
         {
-                struct client_obd *cli = &fld_exp->exp_obd->u.cli;
+                struct client_obd *cli = &target->fldt_exp->exp_obd->u.cli;
                 
                 rc = snprintf(page, count, "%s\n",
                               cli->cl_target_uuid.uuid);
index 14caa28..c3993ec 100644 (file)
@@ -487,7 +487,8 @@ struct lu_site {
         /*
          * Fid location database
          */
-        struct lu_server_fld *ls_fld;
+        struct lu_server_fld *ls_server_fld;
+        struct lu_client_fld *ls_client_fld;
 
         /*
          * Server Seq Manager
index 6d8496c..8a9d1af 100644 (file)
 #define LUSTRE_MGS_VERSION  0x00060000
 
 typedef __u64 mdsno_t;
+typedef __u64 seqno_t;
 
 struct lu_range {
         __u64 lr_start;
@@ -1163,7 +1164,7 @@ extern void lustre_swab_lmv_desc (struct lmv_desc *ld);
 /* end adding MDT by huanghua@clusterfs.com */
 
 struct md_fld {
-        __u64   mf_seq;
+        seqno_t mf_seq;
         mdsno_t mf_mds;
 };
 
index 95bfa74..4c09e9e 100644 (file)
@@ -65,7 +65,6 @@ struct obd_export {
         struct portals_handle     exp_handle;
         atomic_t                  exp_refcount;
         struct obd_uuid           exp_client_uuid;
-        struct list_head          exp_fld_chain;
         struct list_head          exp_obd_chain;
         /* exp_obd_chain_timed fo ping evictor, protected by obd_dev_lock */
         struct list_head          exp_obd_chain_timed;
index c42492d..b2d7ef8 100644 (file)
@@ -157,7 +157,7 @@ int seq_client_alloc_super(struct lu_client_seq *seq);
 int seq_client_alloc_meta(struct lu_client_seq *seq);
 
 int seq_client_alloc_seq(struct lu_client_seq *seq,
-                         __u64 *seqnr);
+                         seqno_t *seqnr);
 int seq_client_alloc_fid(struct lu_client_seq *seq,
                          struct lu_fid *fid);
 
index 2b1c298..e505dc8 100644 (file)
@@ -65,12 +65,25 @@ struct lu_server_fld {
         char                     fld_name[80];
 };
 
+struct fld_cache_entry {
+        struct hlist_node        fce_list;
+        mdsno_t                  fce_mds;
+        seqno_t                  fce_seq;
+};
+
+struct fld_cache_info {
+        struct hlist_head       *fci_hash;
+        spinlock_t               fci_lock;
+        int                      fci_size;
+        int                      fci_hash_mask;
+};
+
 struct lu_client_fld {
         /* client side proc entry */
         cfs_proc_dir_entry_t    *fld_proc_dir;
 
         /* list of exports client FLD knows about */
-        struct list_head         fld_exports;
+        struct list_head         fld_targets;
 
         /* current hash to be used to chose an export */
         struct lu_fld_hash      *fld_hash;
@@ -81,6 +94,9 @@ struct lu_client_fld {
         /* lock protecting exports list and fld_hash */
         spinlock_t               fld_lock;
 
+        /* client FLD cache */
+        struct fld_cache_info   *fld_cache;
+
         /* client fld proc entry name */
         char                     fld_name[80];
 };
@@ -96,15 +112,15 @@ void fld_server_fini(struct lu_server_fld *fld,
 
 int fld_server_lookup(struct lu_server_fld *fld,
                       const struct lu_context *ctx,
-                      __u64 seq, mdsno_t *mds);
+                      seqno_t seq, mdsno_t *mds);
         
 int fld_server_create(struct lu_server_fld *fld,
                       const struct lu_context *ctx,
-                      __u64 seq, mdsno_t mds);
+                      seqno_t seq, mdsno_t mds);
 
 int fld_server_delete(struct lu_server_fld *fld,
                       const struct lu_context *ctx,
-                      __u64 seq);
+                      seqno_t seq);
 
 /* client methods */
 int fld_client_init(struct lu_client_fld *fld,
@@ -114,13 +130,13 @@ int fld_client_init(struct lu_client_fld *fld,
 void fld_client_fini(struct lu_client_fld *fld);
 
 int fld_client_lookup(struct lu_client_fld *fld,
-                      __u64 seq, mdsno_t *mds);
+                      seqno_t seq, mdsno_t *mds);
 
 int fld_client_create(struct lu_client_fld *fld,
-                      __u64 seq, mdsno_t mds);
+                      seqno_t seq, mdsno_t mds);
 
 int fld_client_delete(struct lu_client_fld *fld,
-                      __u64 seq);
+                      seqno_t seq);
 
 int fld_client_add_target(struct lu_client_fld *fld,
                           struct obd_export *exp);
@@ -128,4 +144,19 @@ int fld_client_add_target(struct lu_client_fld *fld,
 int fld_client_del_target(struct lu_client_fld *fld,
                           struct obd_export *exp);
 
+/* cache methods */
+struct fld_cache_info *fld_cache_init(int size);
+
+void fld_cache_fini(struct fld_cache_info *cache);
+
+int fld_cache_insert(struct fld_cache_info *cache,
+                     seqno_t seq, mdsno_t mds);
+
+void fld_cache_delete(struct fld_cache_info *cache,
+                      seqno_t seq);
+
+struct fld_cache_entry *
+fld_cache_lookup(struct fld_cache_info *cache,
+                 seqno_t seq);
+
 #endif
index 0a718da..6a6afca 100644 (file)
@@ -1853,14 +1853,14 @@ static int mdt_fld_init(const struct lu_context *ctx,
 
         ls = m->mdt_md_dev.md_lu_dev.ld_site;
 
-        OBD_ALLOC_PTR(ls->ls_fld);
+        OBD_ALLOC_PTR(ls->ls_server_fld);
 
-        if (ls->ls_fld != NULL) {
-                rc = fld_server_init(ls->ls_fld, ctx,
+        if (ls->ls_server_fld != NULL) {
+                rc = fld_server_init(ls->ls_server_fld, ctx,
                                      uuid, m->mdt_bottom);
                 if (rc) {
-                        OBD_FREE_PTR(ls->ls_fld);
-                        ls->ls_fld = NULL;
+                        OBD_FREE_PTR(ls->ls_server_fld);
+                        ls->ls_server_fld = NULL;
                 }
         } else
                 rc = -ENOMEM;
@@ -1874,10 +1874,10 @@ static int mdt_fld_fini(const struct lu_context *ctx,
         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
         ENTRY;
 
-        if (ls && ls->ls_fld) {
-                fld_server_fini(ls->ls_fld, ctx);
-                OBD_FREE_PTR(ls->ls_fld);
-                ls->ls_fld = NULL;
+        if (ls && ls->ls_server_fld) {
+                fld_server_fini(ls->ls_server_fld, ctx);
+                OBD_FREE_PTR(ls->ls_server_fld);
+                ls->ls_server_fld = NULL;
         }
         RETURN(0);
 }