Whamcloud - gitweb
- a little reorganization in fid and fld sources, client part is moved to fid_request...
authoryury <yury>
Sat, 17 Jun 2006 09:23:15 +0000 (09:23 +0000)
committeryury <yury>
Sat, 17 Jun 2006 09:23:15 +0000 (09:23 +0000)
lustre/fid/Makefile.in
lustre/fid/autoMakefile.am
lustre/fid/fid_handler.c
lustre/fid/fid_request.c [new file with mode: 0644]
lustre/fld/Makefile.in
lustre/fld/autoMakefile.am
lustre/fld/fld_handler.c
lustre/fld/fld_internal.h
lustre/fld/fld_request.c [new file with mode: 0644]

index 857660d..cb15fe8 100644 (file)
@@ -1,4 +1,4 @@
 MODULES := fid
-fid-objs := fid_handler.o fid_lib.o
+fid-objs := fid_handler.o fid_request.o fid_lib.o
 
 @INCLUDE_RULES@
index acce2f4..0c8c5a1 100644 (file)
@@ -5,7 +5,7 @@
 
 if LIBLUSTRE
 noinst_LIBRARIES = libfid.a
-libfid_a_SOURCES = fid_handler.c fid_lib.c fid_internal.h
+libfid_a_SOURCES = fid_handler.c fid_request.c fid_lib.c fid_internal.h
 libfid_a_CPPFLAGS = $(LLCPPFLAGS)
 libfid_a_CFLAGS = $(LLCFLAGS)
 endif
index b7b0eac..2f1ceb0 100644 (file)
 #include <lustre_fid.h>
 #include "fid_internal.h"
 
-/* client seq mgr interface */
-static int 
-seq_client_rpc(struct lu_client_seq *seq, 
-               struct lu_range *range,
-               __u32 opc)
-{
-        int repsize = sizeof(struct lu_range);
-        int rc, reqsize = sizeof(__u32);
-        struct ptlrpc_request *req;
-        struct lu_range *ran;
-        __u32 *op;
-        ENTRY;
-
-        req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp), 
-                             LUSTRE_MDS_VERSION, SEQ_QUERY,
-                             1, &reqsize, NULL);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op));
-        *op = opc;
-
-        req->rq_replen = lustre_msg_size(1, &repsize);
-        req->rq_request_portal = MDS_SEQ_PORTAL;
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out_req, rc);
-
-        ran = lustre_swab_repbuf(req, 0, sizeof(*ran),
-                                 lustre_swab_lu_range);
-
-        if (ran == NULL) {
-                CERROR("invalid range is returned\n");
-                GOTO(out_req, rc = -EPROTO);
-        }
-        *range = *ran;
-        EXIT;
-out_req:
-        ptlrpc_req_finished(req); 
-        return rc;
-}
-
-/* request sequence-controller node to allocate new super-sequence. */
-int
-seq_client_alloc_super(struct lu_client_seq *seq)
-{
-        int rc;
-        ENTRY;
-
-        LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER);
-        rc = seq_client_rpc(seq, &seq->seq_cl_range,
-                            SEQ_ALLOC_SUPER);
-        if (rc == 0) {
-                CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated super-sequence "
-                       "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
-                       seq->seq_cl_range.lr_end);
-        }
-        RETURN(rc);
-}
-EXPORT_SYMBOL(seq_client_alloc_super);
-
-/* request sequence-controller node to allocate new meta-sequence. */
-int
-seq_client_alloc_meta(struct lu_client_seq *seq)
-{
-        int rc;
-        ENTRY;
-
-        LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
-        rc = seq_client_rpc(seq, &seq->seq_cl_range,
-                            SEQ_ALLOC_META);
-        if (rc == 0) {
-                CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated meta-sequence "
-                       "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
-                       seq->seq_cl_range.lr_end);
-        }
-        RETURN(rc);
-}
-EXPORT_SYMBOL(seq_client_alloc_meta);
-
-/* allocate new sequence for client (llite or MDC are expected to use this) */
-int
-seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
-{
-        int rc;
-        ENTRY;
-
-        down(&seq->seq_sem);
-
-        LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
-        LASSERT(range_is_sane(&seq->seq_cl_range));
-
-        /* if we still have free sequences in meta-sequence we allocate new seq
-         * from given range. */
-        if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) {
-                *seqnr = seq->seq_cl_range.lr_start;
-                seq->seq_cl_range.lr_start += 1;
-                rc = 0;
-        } else {
-                /* meta-sequence is exhausted, request MDT to allocate new
-                 * meta-sequence for us. */
-                rc = seq_client_alloc_meta(seq);
-                if (rc) {
-                        CERROR("can't allocate new meta-sequence, "
-                               "rc %d\n", rc);
-                }
-                
-                *seqnr = seq->seq_cl_range.lr_start;
-                seq->seq_cl_range.lr_start += 1;
-        }
-        up(&seq->seq_sem);
-
-        if (rc == 0) {
-                CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated sequence "
-                       "["LPX64"]\n", *seqnr);
-        }
-        RETURN(rc);
-}
-EXPORT_SYMBOL(seq_client_alloc_seq);
-
-int
-seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
-{
-        int rc;
-        ENTRY;
-
-        LASSERT(fid != NULL);
-        LASSERT(fid_is_sane(&seq->seq_fid));
-        LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
-
-        down(&seq->seq_sem);
-        if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) {
-                *fid = seq->seq_fid;
-                seq->seq_fid.f_oid += 1;
-                rc = 0;
-        } else {
-                __u64 seqnr = 0;
-                
-                rc = seq_client_alloc_seq(seq, &seqnr);
-                if (rc) {
-                        CERROR("can't allocate new sequence, "
-                               "rc %d\n", rc);
-                        GOTO(out, rc);
-                } else {
-                        seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
-                        seq->seq_fid.f_seq = seqnr;
-                        seq->seq_fid.f_ver = 0;
-                        
-                        *fid = seq->seq_fid;
-                        seq->seq_fid.f_oid += 1;
-                        rc = -ERESTART;
-                }
-        }
-        LASSERT(fid_is_sane(fid));
-        
-        CDEBUG(D_INFO, "SEQ-MGR(cli): allocated FID "DFID3"\n",
-               PFID3(fid));
-
-        EXIT;
-out:
-        up(&seq->seq_sem);
-        return rc;
-}
-EXPORT_SYMBOL(seq_client_alloc_fid);
-
-int 
-seq_client_init(struct lu_client_seq *seq, 
-                struct obd_export *exp,
-                int flags)
-{
-        int rc;
-        ENTRY;
-
-        LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT |
-                         LUSTRE_CLI_SEQ_SERVER));
-
-        seq->seq_flags = flags;
-        fid_zero(&seq->seq_fid);
-        sema_init(&seq->seq_sem, 1);
-        
-        seq->seq_cl_range.lr_end = 0;
-        seq->seq_cl_range.lr_start = 0;
-       
-        if (exp != NULL)
-                seq->seq_exp = class_export_get(exp);
-
-        if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) {
-                __u64 seqnr = 0;
-                
-                /* client (llite or MDC) init case, we need new sequence from
-                 * MDT. This will allocate new meta-sequemce first, because seq
-                 * range in init state and looks the same as exhausted. */
-                rc = seq_client_alloc_seq(seq, &seqnr);
-                if (rc) {
-                        CERROR("can't allocate new sequence, rc %d\n", rc);
-                        GOTO(out, rc);
-                } else {
-                        seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
-                        seq->seq_fid.f_seq = seqnr;
-                        seq->seq_fid.f_ver = 0;
-                }
-
-                LASSERT(fid_is_sane(&seq->seq_fid));
-        } else {
-                /* check if this is controller node is trying to init client. */
-                if (seq->seq_exp) {
-                        /* MDT uses client seq manager to talk to sequence
-                         * controller, and thus, we need super-sequence. */
-                        rc = seq_client_alloc_super(seq);
-                } else {
-                        rc = 0;
-                }
-        }
-
-        EXIT;
-out:
-        if (rc)
-                seq_client_fini(seq);
-        else
-                CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager initialized\n");
-        return rc;
-}
-EXPORT_SYMBOL(seq_client_init);
-
-void seq_client_fini(struct lu_client_seq *seq)
-{
-        ENTRY;
-        if (seq->seq_exp != NULL) {
-                class_export_put(seq->seq_exp);
-                seq->seq_exp = NULL;
-        }
-        CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager finalized\n");
-        EXIT;
-}
-EXPORT_SYMBOL(seq_client_fini);
-
 #ifdef __KERNEL__
 /* server side seq mgr stuff */
 static const struct lu_range LUSTRE_SEQ_SUPER_INIT = {
diff --git a/lustre/fid/fid_request.c b/lustre/fid/fid_request.c
new file mode 100644 (file)
index 0000000..964c279
--- /dev/null
@@ -0,0 +1,284 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  lustre/fid/fid_request.c
+ *  Lustre Sequence Manager
+ *
+ *  Copyright (c) 2006 Cluster File Systems, Inc.
+ *   Author: Yury Umanets <umka@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_FID
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+#endif
+
+#include <obd.h>
+#include <obd_class.h>
+#include <dt_object.h>
+#include <md_object.h>
+#include <obd_support.h>
+#include <lustre_req_layout.h>
+#include <lustre_fid.h>
+#include "fid_internal.h"
+
+/* client seq mgr interface */
+static int 
+seq_client_rpc(struct lu_client_seq *seq, 
+               struct lu_range *range,
+               __u32 opc)
+{
+        int repsize = sizeof(struct lu_range);
+        int rc, reqsize = sizeof(__u32);
+        struct ptlrpc_request *req;
+        struct lu_range *ran;
+        __u32 *op;
+        ENTRY;
+
+        req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp), 
+                             LUSTRE_MDS_VERSION, SEQ_QUERY,
+                             1, &reqsize, NULL);
+        if (req == NULL)
+                RETURN(-ENOMEM);
+
+        op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op));
+        *op = opc;
+
+        req->rq_replen = lustre_msg_size(1, &repsize);
+        req->rq_request_portal = MDS_SEQ_PORTAL;
+        rc = ptlrpc_queue_wait(req);
+        if (rc)
+                GOTO(out_req, rc);
+
+        ran = lustre_swab_repbuf(req, 0, sizeof(*ran),
+                                 lustre_swab_lu_range);
+
+        if (ran == NULL) {
+                CERROR("invalid range is returned\n");
+                GOTO(out_req, rc = -EPROTO);
+        }
+        *range = *ran;
+        EXIT;
+out_req:
+        ptlrpc_req_finished(req); 
+        return rc;
+}
+
+/* request sequence-controller node to allocate new super-sequence. */
+int
+seq_client_alloc_super(struct lu_client_seq *seq)
+{
+        int rc;
+        ENTRY;
+
+        LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_SERVER);
+        rc = seq_client_rpc(seq, &seq->seq_cl_range,
+                            SEQ_ALLOC_SUPER);
+        if (rc == 0) {
+                CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated super-sequence "
+                       "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
+                       seq->seq_cl_range.lr_end);
+        }
+        RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_super);
+
+/* request sequence-controller node to allocate new meta-sequence. */
+int
+seq_client_alloc_meta(struct lu_client_seq *seq)
+{
+        int rc;
+        ENTRY;
+
+        LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
+        rc = seq_client_rpc(seq, &seq->seq_cl_range,
+                            SEQ_ALLOC_META);
+        if (rc == 0) {
+                CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated meta-sequence "
+                       "["LPX64"-"LPX64"]\n", seq->seq_cl_range.lr_start,
+                       seq->seq_cl_range.lr_end);
+        }
+        RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_meta);
+
+/* allocate new sequence for client (llite or MDC are expected to use this) */
+int
+seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+{
+        int rc;
+        ENTRY;
+
+        down(&seq->seq_sem);
+
+        LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
+        LASSERT(range_is_sane(&seq->seq_cl_range));
+
+        /* if we still have free sequences in meta-sequence we allocate new seq
+         * from given range. */
+        if (seq->seq_cl_range.lr_end > seq->seq_cl_range.lr_start) {
+                *seqnr = seq->seq_cl_range.lr_start;
+                seq->seq_cl_range.lr_start += 1;
+                rc = 0;
+        } else {
+                /* meta-sequence is exhausted, request MDT to allocate new
+                 * meta-sequence for us. */
+                rc = seq_client_alloc_meta(seq);
+                if (rc) {
+                        CERROR("can't allocate new meta-sequence, "
+                               "rc %d\n", rc);
+                }
+                
+                *seqnr = seq->seq_cl_range.lr_start;
+                seq->seq_cl_range.lr_start += 1;
+        }
+        up(&seq->seq_sem);
+
+        if (rc == 0) {
+                CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated sequence "
+                       "["LPX64"]\n", *seqnr);
+        }
+        RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_seq);
+
+int
+seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
+{
+        int rc;
+        ENTRY;
+
+        LASSERT(fid != NULL);
+        LASSERT(fid_is_sane(&seq->seq_fid));
+        LASSERT(seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT);
+
+        down(&seq->seq_sem);
+        if (fid_oid(&seq->seq_fid) < LUSTRE_SEQ_WIDTH) {
+                *fid = seq->seq_fid;
+                seq->seq_fid.f_oid += 1;
+                rc = 0;
+        } else {
+                __u64 seqnr = 0;
+                
+                rc = seq_client_alloc_seq(seq, &seqnr);
+                if (rc) {
+                        CERROR("can't allocate new sequence, "
+                               "rc %d\n", rc);
+                        GOTO(out, rc);
+                } else {
+                        seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
+                        seq->seq_fid.f_seq = seqnr;
+                        seq->seq_fid.f_ver = 0;
+                        
+                        *fid = seq->seq_fid;
+                        seq->seq_fid.f_oid += 1;
+                        rc = -ERESTART;
+                }
+        }
+        LASSERT(fid_is_sane(fid));
+        
+        CDEBUG(D_INFO, "SEQ-MGR(cli): allocated FID "DFID3"\n",
+               PFID3(fid));
+
+        EXIT;
+out:
+        up(&seq->seq_sem);
+        return rc;
+}
+EXPORT_SYMBOL(seq_client_alloc_fid);
+
+int 
+seq_client_init(struct lu_client_seq *seq, 
+                struct obd_export *exp,
+                int flags)
+{
+        int rc;
+        ENTRY;
+
+        LASSERT(flags & (LUSTRE_CLI_SEQ_CLIENT |
+                         LUSTRE_CLI_SEQ_SERVER));
+
+        seq->seq_flags = flags;
+        fid_zero(&seq->seq_fid);
+        sema_init(&seq->seq_sem, 1);
+        
+        seq->seq_cl_range.lr_end = 0;
+        seq->seq_cl_range.lr_start = 0;
+       
+        if (exp != NULL)
+                seq->seq_exp = class_export_get(exp);
+
+        if (seq->seq_flags & LUSTRE_CLI_SEQ_CLIENT) {
+                __u64 seqnr = 0;
+                
+                /* client (llite or MDC) init case, we need new sequence from
+                 * MDT. This will allocate new meta-sequemce first, because seq
+                 * range in init state and looks the same as exhausted. */
+                rc = seq_client_alloc_seq(seq, &seqnr);
+                if (rc) {
+                        CERROR("can't allocate new sequence, rc %d\n", rc);
+                        GOTO(out, rc);
+                } else {
+                        seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
+                        seq->seq_fid.f_seq = seqnr;
+                        seq->seq_fid.f_ver = 0;
+                }
+
+                LASSERT(fid_is_sane(&seq->seq_fid));
+        } else {
+                /* check if this is controller node is trying to init client. */
+                if (seq->seq_exp) {
+                        /* MDT uses client seq manager to talk to sequence
+                         * controller, and thus, we need super-sequence. */
+                        rc = seq_client_alloc_super(seq);
+                } else {
+                        rc = 0;
+                }
+        }
+
+        EXIT;
+out:
+        if (rc)
+                seq_client_fini(seq);
+        else
+                CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager initialized\n");
+        return rc;
+}
+EXPORT_SYMBOL(seq_client_init);
+
+void seq_client_fini(struct lu_client_seq *seq)
+{
+        ENTRY;
+        if (seq->seq_exp != NULL) {
+                class_export_put(seq->seq_exp);
+                seq->seq_exp = NULL;
+        }
+        CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager finalized\n");
+        EXIT;
+}
+EXPORT_SYMBOL(seq_client_fini);
index a43b009..3cb2017 100644 (file)
@@ -1,5 +1,5 @@
 MODULES := fld 
-fld-objs := fld_handler.o fld_iam.o
+fld-objs := fld_handler.o fld_request.o fld_iam.o
 
 EXTRA_PRE_CFLAGS := -I@LUSTRE@ -I@LUSTRE@/ldiskfs
 
index cffe901..3c885be 100644 (file)
@@ -5,7 +5,7 @@
 
 if LIBLUSTRE
 noinst_LIBRARIES = libfld.a
-libfld_a_SOURCES = fld_handler.c fld_iam.c fld_internal.h
+libfld_a_SOURCES = fld_handler.c fld_request.c fld_iam.c fld_internal.h
 libfld_a_CPPFLAGS = $(LLCPPFLAGS)
 libfld_a_CFLAGS = $(LLCFLAGS)
 endif
index a896b54..90438e7 100644 (file)
@@ -2,6 +2,7 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  *  lustre/fld/fld_handler.c
+ *  FLD (Fids Location Database)
  *
  *  Copyright (C) 2006 Cluster File Systems, Inc.
  *   Author: WangDi <wangdi@clusterfs.com>
 #ifdef __KERNEL__
 struct fld_cache_info *fld_cache = NULL;
 
-enum {
-        FLD_HTABLE_BITS = 8,
-        FLD_HTABLE_SIZE = (1 << FLD_HTABLE_BITS),
-        FLD_HTABLE_MASK = FLD_HTABLE_SIZE - 1
-};
-
-static __u32 fld_cache_hash(__u64 seq)
-{
-        return seq;
-}
-
-static int
-fld_cache_insert(struct fld_cache_info *fld_cache,
-                 __u64 seq, __u64 mds)
-{
-        struct fld_cache *fld;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        int rc = 0;
-        ENTRY;
-
-        bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
-                                        fld_cache->fld_hash_mask);
-
-        OBD_ALLOC_PTR(fld);
-        if (!fld)
-                RETURN(-ENOMEM);
-
-        INIT_HLIST_NODE(&fld->fld_list);
-        fld->fld_mds = mds;
-        fld->fld_seq = seq;
-
-        spin_lock(&fld_cache->fld_lock);
-        hlist_for_each_entry(fld, scan, bucket, fld_list) {
-                if (fld->fld_seq == seq)
-                        GOTO(exit_unlock, rc = -EEXIST);
-        }
-        hlist_add_head(&fld->fld_list, bucket);
-        EXIT;
-exit_unlock:
-        spin_unlock(&fld_cache->fld_lock);
-        if (rc != 0)
-                OBD_FREE(fld, sizeof(*fld));
-        return rc;
-}
-
-static struct fld_cache *
-fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
-{
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        struct fld_cache *fld;
-        ENTRY;
-
-        bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
-                                        fld_cache->fld_hash_mask);
-
-        spin_lock(&fld_cache->fld_lock);
-        hlist_for_each_entry(fld, scan, bucket, fld_list) {
-                if (fld->fld_seq == seq) {
-                        spin_unlock(&fld_cache->fld_lock);
-                        RETURN(fld);
-                }
-        }
-        spin_unlock(&fld_cache->fld_lock);
-
-        RETURN(NULL);
-}
-
-static void
-fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
-{
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        struct fld_cache *fld;
-        ENTRY;
-
-        bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
-                                        fld_cache->fld_hash_mask);
-
-        spin_lock(&fld_cache->fld_lock);
-        hlist_for_each_entry(fld, scan, bucket, fld_list) {
-                if (fld->fld_seq == seq) {
-                        hlist_del_init(&fld->fld_list);
-                        GOTO(out_unlock, 0);
-                }
-        }
-
-        EXIT;
-out_unlock:
-        spin_unlock(&fld_cache->fld_lock);
-        return;
-}
-#endif
-
-static int fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
-{
-        if (fld->fld_count == 0)
-                return 0;
-        
-        return do_div(seq, fld->fld_count);
-}
-
-static int fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
-{
-        /* XXX: here should DHT hash */
-        return fld_rrb_hash(fld, seq);
-}
-
-static struct lu_fld_hash fld_hash[3] = {
-        {
-                .fh_name = "DHT",
-                .fh_func = fld_dht_hash
-        },
-        {
-                .fh_name = "Round Robin",
-                .fh_func = fld_rrb_hash
-        },
-        {
-                0,
-        }
-};
-
-static struct obd_export *
-fld_client_get_export(struct lu_client_fld *fld, __u64 seq)
-{
-        struct obd_export *fld_exp;
-        int count = 0, hash;
-        ENTRY;
-
-        LASSERT(fld->fld_hash != NULL);
-        hash = fld->fld_hash->fh_func(fld, seq);
-
-        spin_lock(&fld->fld_lock);
-        list_for_each_entry(fld_exp,
-                            &fld->fld_exports, exp_fld_chain) {
-                if (count == hash) {
-                        spin_unlock(&fld->fld_lock);
-                        RETURN(fld_exp);
-                }
-                count++;
-        }
-        spin_unlock(&fld->fld_lock);
-        RETURN(NULL);
-}
-
-/* add export to FLD. This is usually done by CMM and LMV as they are main users
- * of FLD module. */
-int fld_client_add_export(struct lu_client_fld *fld,
-                          struct obd_export *exp)
-{
-        struct obd_export *fld_exp;
-        ENTRY;
-
-        LASSERT(exp != NULL);
-
-        CWARN("adding export %s\n", exp->exp_client_uuid.uuid);
-        
-        spin_lock(&fld->fld_lock);
-        list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
-                if (obd_uuid_equals(&fld_exp->exp_client_uuid,
-                                    &exp->exp_client_uuid))
-                {
-                        spin_unlock(&fld->fld_lock);
-                        RETURN(-EEXIST);
-                }
-        }
-        
-        fld_exp = class_export_get(exp);
-        list_add_tail(&fld_exp->exp_fld_chain,
-                      &fld->fld_exports);
-        fld->fld_count++;
-        
-        spin_unlock(&fld->fld_lock);
-        
-        RETURN(0);
-}
-EXPORT_SYMBOL(fld_client_add_export);
-
-/* remove export from FLD */
-int fld_client_del_export(struct lu_client_fld *fld,
-                          struct obd_export *exp)
-{
-        struct obd_export *fld_exp;
-        struct obd_export *tmp;
-        ENTRY;
-
-        spin_lock(&fld->fld_lock);
-        list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
-                if (obd_uuid_equals(&fld_exp->exp_client_uuid,
-                                    &exp->exp_client_uuid))
-                {
-                        fld->fld_count--;
-                        list_del(&fld_exp->exp_fld_chain);
-                        class_export_get(fld_exp);
-
-                        spin_unlock(&fld->fld_lock);
-                        RETURN(0);
-                }
-        }
-        spin_unlock(&fld->fld_lock);
-        RETURN(-ENOENT);
-}
-EXPORT_SYMBOL(fld_client_del_export);
-
-int fld_client_init(struct lu_client_fld *fld, int hash)
-{
-        int rc = 0;
-        ENTRY;
-
-        LASSERT(fld != NULL);
-
-        if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) {
-                CERROR("wrong hash function 0x%x\n", hash);
-                RETURN(-EINVAL);
-        }
-        
-        INIT_LIST_HEAD(&fld->fld_exports);
-        spin_lock_init(&fld->fld_lock);
-        fld->fld_hash = &fld_hash[hash];
-        fld->fld_count = 0;
-        
-        CDEBUG(D_INFO, "Client FLD initialized, using %s\n",
-               fld->fld_hash->fh_name);
-        RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_init);
-
-void fld_client_fini(struct lu_client_fld *fld)
-{
-        struct obd_export *fld_exp;
-        struct obd_export *tmp;
-        ENTRY;
-
-        spin_lock(&fld->fld_lock);
-        list_for_each_entry_safe(fld_exp, tmp,
-                                 &fld->fld_exports, exp_fld_chain) {
-                fld->fld_count--;
-                list_del(&fld_exp->exp_fld_chain);
-                class_export_get(fld_exp);
-        }
-        spin_unlock(&fld->fld_lock);
-        CDEBUG(D_INFO, "Client FLD finalized\n");
-        EXIT;
-}
-EXPORT_SYMBOL(fld_client_fini);
-
-static int
-fld_client_rpc(struct obd_export *exp,
-               struct md_fld *mf, __u32 fld_op)
-{
-        int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
-        int mf_size = sizeof(struct md_fld);
-        struct ptlrpc_request *req;
-        struct md_fld *pmf;
-        __u32 *op;
-        ENTRY;
-
-        LASSERT(exp != NULL);
-
-        req = ptlrpc_prep_req(class_exp2cliimp(exp),
-                              LUSTRE_MDS_VERSION, FLD_QUERY,
-                              2, size, NULL);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
-        *op = fld_op;
-
-        pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
-        memcpy(pmf, mf, sizeof(*mf));
-
-        req->rq_replen = lustre_msg_size(1, &mf_size);
-        req->rq_request_portal = MDS_FLD_PORTAL;
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out_req, rc);
-
-        pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf),
-                                 lustre_swab_md_fld);
-        *mf = *pmf; 
-out_req:
-        ptlrpc_req_finished(req);
-        RETURN(rc);
-}
-
-int
-fld_client_create(struct lu_client_fld *fld,
-                  __u64 seq, __u64 mds)
-{
-        struct obd_export *fld_exp;
-        struct md_fld      md_fld;
-        __u32 rc;
-        ENTRY;
-
-        fld_exp = fld_client_get_export(fld, seq);
-        if (!fld_exp)
-                RETURN(-EINVAL);
-        md_fld.mf_seq = seq;
-        md_fld.mf_mds = mds;
-        
-        rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
-#ifdef __KERNEL__
-        fld_cache_insert(fld_cache, seq, mds);
-#endif
-        
-        RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_create);
-
-int
-fld_client_delete(struct lu_client_fld *fld,
-                  __u64 seq, __u64 mds)
-{
-        struct obd_export *fld_exp;
-        struct md_fld      md_fld;
-        __u32 rc;
-
-#ifdef __KERNEL__
-        fld_cache_delete(fld_cache, seq);
-#endif
-        
-        fld_exp = fld_client_get_export(fld, seq);
-        if (!fld_exp)
-                RETURN(-EINVAL);
-
-        md_fld.mf_seq = seq;
-        md_fld.mf_mds = mds;
-
-        rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
-        RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_delete);
-
-static int
-fld_client_get(struct lu_client_fld *fld,
-               __u64 seq, __u64 *mds)
-{
-        struct obd_export *fld_exp;
-        struct md_fld md_fld;
-        int rc;
-        ENTRY;
-
-        fld_exp = fld_client_get_export(fld, seq);
-        if (!fld_exp)
-                RETURN(-EINVAL);
-                
-        md_fld.mf_seq = seq;
-        rc = fld_client_rpc(fld_exp,
-                            &md_fld, FLD_LOOKUP);
-        if (rc == 0)
-                *mds = md_fld.mf_mds;
-
-        RETURN(rc);
-}
-
-/* lookup fid in the namespace of pfid according to the name */
-int
-fld_client_lookup(struct lu_client_fld *fld,
-                  __u64 seq, __u64 *mds)
-{
-#ifdef __KERNEL__
-        struct fld_cache *fld_entry;
-#endif
-        int rc;
-        ENTRY;
-
-#ifdef __KERNEL__
-        /* lookup it in the cache */
-        fld_entry = fld_cache_lookup(fld_cache, seq);
-        if (fld_entry != NULL) {
-                *mds = fld_entry->fld_mds;
-                RETURN(0);
-        }
-#endif
-        
-        /* can not find it in the cache */
-        rc = fld_client_get(fld, seq, mds);
-        if (rc)
-                RETURN(rc);
-
-#ifdef __KERNEL__
-        rc = fld_cache_insert(fld_cache, seq, *mds);
-#endif
-        
-        RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_lookup);
-
-#ifdef __KERNEL__
 static int fld_init(void)
 {
         int i;
@@ -468,7 +79,7 @@ static int fld_init(void)
         for (i = 0; i < FLD_HTABLE_SIZE; i++)
                 INIT_HLIST_HEAD(&fld_cache->fld_hash[i]);
 
-        CDEBUG(D_INFO, "Client FLD, cache size %d\n",
+        CDEBUG(D_INFO|D_WARNING, "Client FLD, cache size %d\n",
                FLD_HTABLE_SIZE);
         
         RETURN(0);
@@ -569,7 +180,6 @@ out_pill:
         return rc;
 }
 
-
 static int fld_req_handle(struct ptlrpc_request *req)
 {
         int fail = OBD_FAIL_FLD_ALL_REPLY_NET;
index 90d0255..56cd638 100644 (file)
@@ -65,6 +65,12 @@ enum fld_op {
         FLD_LOOKUP = 2
 };
 
+enum {
+        FLD_HTABLE_BITS = 8,
+        FLD_HTABLE_SIZE = (1 << FLD_HTABLE_BITS),
+        FLD_HTABLE_MASK = FLD_HTABLE_SIZE - 1
+};
+
 #define FLD_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
 
 int fld_handle_insert(struct lu_server_fld *fld,
diff --git a/lustre/fld/fld_request.c b/lustre/fld/fld_request.c
new file mode 100644 (file)
index 0000000..b890e99
--- /dev/null
@@ -0,0 +1,439 @@
+/* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  lustre/fld/fld_handler.c
+ *  FLD (Fids Location Database)
+ *
+ *  Copyright (C) 2006 Cluster File Systems, Inc.
+ *   Author: Yury Umanets <umka@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_FLD
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+# include <linux/jbd.h>
+# include <asm/div64.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+# include <libcfs/list.h>
+#endif
+
+#include <obd.h>
+#include <obd_class.h>
+#include <lustre_ver.h>
+#include <obd_support.h>
+#include <lprocfs_status.h>
+
+#include <dt_object.h>
+#include <md_object.h>
+#include <lustre_req_layout.h>
+#include <lustre_fld.h>
+#include "fld_internal.h"
+
+#ifdef __KERNEL__
+extern struct fld_cache_info *fld_cache;
+
+static __u32 fld_cache_hash(__u64 seq)
+{
+        return seq;
+}
+
+static int
+fld_cache_insert(struct fld_cache_info *fld_cache,
+                 __u64 seq, __u64 mds)
+{
+        struct fld_cache *fld;
+        struct hlist_head *bucket;
+        struct hlist_node *scan;
+        int rc = 0;
+        ENTRY;
+
+        bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
+                                        fld_cache->fld_hash_mask);
+
+        OBD_ALLOC_PTR(fld);
+        if (!fld)
+                RETURN(-ENOMEM);
+
+        INIT_HLIST_NODE(&fld->fld_list);
+        fld->fld_mds = mds;
+        fld->fld_seq = seq;
+
+        spin_lock(&fld_cache->fld_lock);
+        hlist_for_each_entry(fld, scan, bucket, fld_list) {
+                if (fld->fld_seq == seq)
+                        GOTO(exit_unlock, rc = -EEXIST);
+        }
+        hlist_add_head(&fld->fld_list, bucket);
+        EXIT;
+exit_unlock:
+        spin_unlock(&fld_cache->fld_lock);
+        if (rc != 0)
+                OBD_FREE(fld, sizeof(*fld));
+        return rc;
+}
+
+static struct fld_cache *
+fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
+{
+        struct hlist_head *bucket;
+        struct hlist_node *scan;
+        struct fld_cache *fld;
+        ENTRY;
+
+        bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
+                                        fld_cache->fld_hash_mask);
+
+        spin_lock(&fld_cache->fld_lock);
+        hlist_for_each_entry(fld, scan, bucket, fld_list) {
+                if (fld->fld_seq == seq) {
+                        spin_unlock(&fld_cache->fld_lock);
+                        RETURN(fld);
+                }
+        }
+        spin_unlock(&fld_cache->fld_lock);
+
+        RETURN(NULL);
+}
+
+static void
+fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
+{
+        struct hlist_head *bucket;
+        struct hlist_node *scan;
+        struct fld_cache *fld;
+        ENTRY;
+
+        bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
+                                        fld_cache->fld_hash_mask);
+
+        spin_lock(&fld_cache->fld_lock);
+        hlist_for_each_entry(fld, scan, bucket, fld_list) {
+                if (fld->fld_seq == seq) {
+                        hlist_del_init(&fld->fld_list);
+                        GOTO(out_unlock, 0);
+                }
+        }
+
+        EXIT;
+out_unlock:
+        spin_unlock(&fld_cache->fld_lock);
+        return;
+}
+#endif
+
+static int fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
+{
+        if (fld->fld_count == 0)
+                return 0;
+        
+        return do_div(seq, fld->fld_count);
+}
+
+static int fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
+{
+        /* XXX: here should DHT hash */
+        return fld_rrb_hash(fld, seq);
+}
+
+static struct lu_fld_hash fld_hash[3] = {
+        {
+                .fh_name = "DHT",
+                .fh_func = fld_dht_hash
+        },
+        {
+                .fh_name = "Round Robin",
+                .fh_func = fld_rrb_hash
+        },
+        {
+                0,
+        }
+};
+
+static struct obd_export *
+fld_client_get_export(struct lu_client_fld *fld, __u64 seq)
+{
+        struct obd_export *fld_exp;
+        int count = 0, hash;
+        ENTRY;
+
+        LASSERT(fld->fld_hash != NULL);
+        hash = fld->fld_hash->fh_func(fld, seq);
+
+        spin_lock(&fld->fld_lock);
+        list_for_each_entry(fld_exp,
+                            &fld->fld_exports, exp_fld_chain) {
+                if (count == hash) {
+                        spin_unlock(&fld->fld_lock);
+                        RETURN(fld_exp);
+                }
+                count++;
+        }
+        spin_unlock(&fld->fld_lock);
+        RETURN(NULL);
+}
+
+/* add export to FLD. This is usually done by CMM and LMV as they are main users
+ * of FLD module. */
+int fld_client_add_export(struct lu_client_fld *fld,
+                          struct obd_export *exp)
+{
+        struct obd_export *fld_exp;
+        ENTRY;
+
+        LASSERT(exp != NULL);
+
+        CWARN("adding export %s\n", exp->exp_client_uuid.uuid);
+        
+        spin_lock(&fld->fld_lock);
+        list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
+                if (obd_uuid_equals(&fld_exp->exp_client_uuid,
+                                    &exp->exp_client_uuid))
+                {
+                        spin_unlock(&fld->fld_lock);
+                        RETURN(-EEXIST);
+                }
+        }
+        
+        fld_exp = class_export_get(exp);
+        list_add_tail(&fld_exp->exp_fld_chain,
+                      &fld->fld_exports);
+        fld->fld_count++;
+        
+        spin_unlock(&fld->fld_lock);
+        
+        RETURN(0);
+}
+EXPORT_SYMBOL(fld_client_add_export);
+
+/* remove export from FLD */
+int fld_client_del_export(struct lu_client_fld *fld,
+                          struct obd_export *exp)
+{
+        struct obd_export *fld_exp;
+        struct obd_export *tmp;
+        ENTRY;
+
+        spin_lock(&fld->fld_lock);
+        list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
+                if (obd_uuid_equals(&fld_exp->exp_client_uuid,
+                                    &exp->exp_client_uuid))
+                {
+                        fld->fld_count--;
+                        list_del(&fld_exp->exp_fld_chain);
+                        class_export_get(fld_exp);
+
+                        spin_unlock(&fld->fld_lock);
+                        RETURN(0);
+                }
+        }
+        spin_unlock(&fld->fld_lock);
+        RETURN(-ENOENT);
+}
+EXPORT_SYMBOL(fld_client_del_export);
+
+int fld_client_init(struct lu_client_fld *fld, int hash)
+{
+        int rc = 0;
+        ENTRY;
+
+        LASSERT(fld != NULL);
+
+        if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) {
+                CERROR("wrong hash function 0x%x\n", hash);
+                RETURN(-EINVAL);
+        }
+        
+        INIT_LIST_HEAD(&fld->fld_exports);
+        spin_lock_init(&fld->fld_lock);
+        fld->fld_hash = &fld_hash[hash];
+        fld->fld_count = 0;
+        
+        CDEBUG(D_INFO, "Client FLD initialized, using %s\n",
+               fld->fld_hash->fh_name);
+        RETURN(rc);
+}
+EXPORT_SYMBOL(fld_client_init);
+
+void fld_client_fini(struct lu_client_fld *fld)
+{
+        struct obd_export *fld_exp;
+        struct obd_export *tmp;
+        ENTRY;
+
+        spin_lock(&fld->fld_lock);
+        list_for_each_entry_safe(fld_exp, tmp,
+                                 &fld->fld_exports, exp_fld_chain) {
+                fld->fld_count--;
+                list_del(&fld_exp->exp_fld_chain);
+                class_export_get(fld_exp);
+        }
+        spin_unlock(&fld->fld_lock);
+        CDEBUG(D_INFO, "Client FLD finalized\n");
+        EXIT;
+}
+EXPORT_SYMBOL(fld_client_fini);
+
+static int
+fld_client_rpc(struct obd_export *exp,
+               struct md_fld *mf, __u32 fld_op)
+{
+        int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
+        int mf_size = sizeof(struct md_fld);
+        struct ptlrpc_request *req;
+        struct md_fld *pmf;
+        __u32 *op;
+        ENTRY;
+
+        LASSERT(exp != NULL);
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp),
+                              LUSTRE_MDS_VERSION, FLD_QUERY,
+                              2, size, NULL);
+        if (req == NULL)
+                RETURN(-ENOMEM);
+
+        op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
+        *op = fld_op;
+
+        pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
+        memcpy(pmf, mf, sizeof(*mf));
+
+        req->rq_replen = lustre_msg_size(1, &mf_size);
+        req->rq_request_portal = MDS_FLD_PORTAL;
+        rc = ptlrpc_queue_wait(req);
+        if (rc)
+                GOTO(out_req, rc);
+
+        pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf),
+                                 lustre_swab_md_fld);
+        *mf = *pmf; 
+out_req:
+        ptlrpc_req_finished(req);
+        RETURN(rc);
+}
+
+int
+fld_client_create(struct lu_client_fld *fld,
+                  __u64 seq, __u64 mds)
+{
+        struct obd_export *fld_exp;
+        struct md_fld      md_fld;
+        __u32 rc;
+        ENTRY;
+
+        fld_exp = fld_client_get_export(fld, seq);
+        if (!fld_exp)
+                RETURN(-EINVAL);
+        md_fld.mf_seq = seq;
+        md_fld.mf_mds = mds;
+        
+        rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
+#ifdef __KERNEL__
+        fld_cache_insert(fld_cache, seq, mds);
+#endif
+        
+        RETURN(rc);
+}
+EXPORT_SYMBOL(fld_client_create);
+
+int
+fld_client_delete(struct lu_client_fld *fld,
+                  __u64 seq, __u64 mds)
+{
+        struct obd_export *fld_exp;
+        struct md_fld      md_fld;
+        __u32 rc;
+
+#ifdef __KERNEL__
+        fld_cache_delete(fld_cache, seq);
+#endif
+        
+        fld_exp = fld_client_get_export(fld, seq);
+        if (!fld_exp)
+                RETURN(-EINVAL);
+
+        md_fld.mf_seq = seq;
+        md_fld.mf_mds = mds;
+
+        rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
+        RETURN(rc);
+}
+EXPORT_SYMBOL(fld_client_delete);
+
+static int
+fld_client_get(struct lu_client_fld *fld,
+               __u64 seq, __u64 *mds)
+{
+        struct obd_export *fld_exp;
+        struct md_fld md_fld;
+        int rc;
+        ENTRY;
+
+        fld_exp = fld_client_get_export(fld, seq);
+        if (!fld_exp)
+                RETURN(-EINVAL);
+                
+        md_fld.mf_seq = seq;
+        rc = fld_client_rpc(fld_exp,
+                            &md_fld, FLD_LOOKUP);
+        if (rc == 0)
+                *mds = md_fld.mf_mds;
+
+        RETURN(rc);
+}
+
+/* lookup fid in the namespace of pfid according to the name */
+int
+fld_client_lookup(struct lu_client_fld *fld,
+                  __u64 seq, __u64 *mds)
+{
+#ifdef __KERNEL__
+        struct fld_cache *fld_entry;
+#endif
+        int rc;
+        ENTRY;
+
+#ifdef __KERNEL__
+        /* lookup it in the cache */
+        fld_entry = fld_cache_lookup(fld_cache, seq);
+        if (fld_entry != NULL) {
+                *mds = fld_entry->fld_mds;
+                RETURN(0);
+        }
+#endif
+        
+        /* can not find it in the cache */
+        rc = fld_client_get(fld, seq, mds);
+        if (rc)
+                RETURN(rc);
+
+#ifdef __KERNEL__
+        rc = fld_cache_insert(fld_cache, seq, *mds);
+#endif
+        
+        RETURN(rc);
+}
+EXPORT_SYMBOL(fld_client_lookup);