Whamcloud - gitweb
- fixes in fld:
authoryury <yury>
Mon, 12 Jun 2006 17:55:21 +0000 (17:55 +0000)
committeryury <yury>
Mon, 12 Jun 2006 17:55:21 +0000 (17:55 +0000)
  * moving mdc_fld() into fld module and rename it to fld_rpc();
  * setting correct portal in fld_rpc();
  * handling out errors correctly in fld_req_handle();
  * putting missed target_send_repay() to fld_reqa_handle();
  * added liblustre support (FLD caching is not working in this case);

lustre/fld/autoMakefile.am
lustre/fld/fld_handler.c
lustre/fld/fld_iam.c
lustre/include/lustre_mdc.h
lustre/include/obd_support.h
lustre/liblustre/Makefile.am
lustre/liblustre/genlib.sh
lustre/mdc/mdc_request.c

index 5d8bf36..cffe901 100644 (file)
@@ -3,9 +3,16 @@
 # This code is issued under the GNU General Public License.
 # See the file COPYING in this distribution
 
+if LIBLUSTRE
+noinst_LIBRARIES = libfld.a
+libfld_a_SOURCES = fld_handler.c fld_iam.c fld_internal.h
+libfld_a_CPPFLAGS = $(LLCPPFLAGS)
+libfld_a_CFLAGS = $(LLCFLAGS)
+endif
+
 if MODULES
 modulefs_DATA = fld$(KMODEXT)
 endif
 
 MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ 
-DIST_SOURCES := $(fld-objs:%.o=%.c) 
+DIST_SOURCES := $(fld-objs:%.o=%.c) fld_internal.h
index fa65b65..3a3dbe8 100644 (file)
@@ -5,6 +5,7 @@
  *
  *  Copyright (C) 2006 Cluster File Systems, Inc.
  *   Author: WangDi <wangdi@clusterfs.com>
+ *           Yury Umanets <umka@clusterfs.com>
  *
  *   This file is part of the Lustre file system, http://www.lustre.org
  *   Lustre is a trademark of Cluster File Systems, Inc.
 #ifndef EXPORT_SYMTAB
 # define EXPORT_SYMTAB
 #endif
-#define DEBUG_SUBSYSTEM S_LLITE
-
-#include <linux/module.h>
-#include <linux/jbd.h>
+#define DEBUG_SUBSYSTEM S_FLD
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+# include <linux/jbd.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+# include <libcfs/list.h>
+#endif
 
 #include <obd.h>
 #include <obd_class.h>
 
 #include <dt_object.h>
 #include <md_object.h>
-#include <lustre_mdc.h>
 #include <lustre_fld.h>
 #include "fld_internal.h"
 
-static int fld_handle(const struct lu_context *ctx,
-                      struct lu_fld *fld, __u32 opts, struct md_fld *mf);
-
-/*XXX maybe these 2 items should go to sbi*/
-struct fld_cache_info *fld_cache = NULL;
-
 static int dht_mdt_hash(__u64 seq)
 {
         return 0;
 }
-struct obd_export* get_fld_exp(struct obd_export *exp, __u64 seq)
+
+struct obd_export *
+get_fld_exp(struct obd_export *exp, __u64 seq)
 {
         int seq_mds;
 
         seq_mds = dht_mdt_hash(seq);
         CDEBUG(D_INFO, "mds number %d\n", seq_mds);
 
-        /*get exp according to lu_seq*/
+        /* get exp according to lu_seq */
         return exp;
 }
 
+#ifdef __KERNEL__
+static int fld_handle(const struct lu_context *ctx,
+                      struct lu_fld *fld, __u32 opts,
+                      struct md_fld *mf);
+
+/* XXX: maybe these 2 items should go to sbi */
+struct fld_cache_info *fld_cache = NULL;
+
 enum {
         FLD_HTABLE_BITS = 8,
         FLD_HTABLE_SIZE = (1 << FLD_HTABLE_BITS),
@@ -76,9 +86,9 @@ static __u32 fld_hash(__u64 lu_seq)
         return lu_seq;
 }
 
-
-static int fld_cache_insert(struct fld_cache_info *fld_cache, __u64 lu_seq,
-                            __u64 mds_num)
+static int
+fld_cache_insert(struct fld_cache_info *fld_cache,
+                 __u64 lu_seq, __u64 mds_num)
 {
         struct fld_cache *fld;
         struct hlist_head *bucket;
@@ -112,7 +122,7 @@ exit:
         RETURN(rc);
 }
 
-static struct fld_cache*
+static struct fld_cache *
 fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 lu_seq)
 {
         struct hlist_head *bucket;
@@ -135,7 +145,8 @@ fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 lu_seq)
         RETURN(NULL);
 }
 
-static void fld_cache_delete(struct fld_cache_info *fld_cache, __u64 lu_seq)
+static void
+fld_cache_delete(struct fld_cache_info *fld_cache, __u64 lu_seq)
 {
         struct hlist_head *bucket;
         struct hlist_node *scan;
@@ -157,8 +168,43 @@ static void fld_cache_delete(struct fld_cache_info *fld_cache, __u64 lu_seq)
         spin_unlock(&fld_cache->fld_lock);
         return;
 }
+#endif
 
-int fld_create(struct obd_export *exp, __u64 seq, __u64 mds_num)
+static int fld_rpc(struct obd_export *exp, struct md_fld *mf, __u32 fld_op)
+{
+        struct ptlrpc_request *req;
+        struct md_fld *pmf;
+        int mf_size = sizeof(*mf);
+        __u32 *op;
+        int size[2] = {sizeof(*op), mf_size}, rc;
+        ENTRY;
+
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
+                              FLD_QUERY, 2, size, NULL);
+        if (req == NULL)
+                RETURN(-ENOMEM);
+
+        op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
+        *op = fld_op;
+
+        pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
+        memcpy(pmf, mf, sizeof(*mf));
+
+        req->rq_replen = lustre_msg_size(1, &mf_size);
+        req->rq_request_portal = MDS_FLD_PORTAL;
+        rc = ptlrpc_queue_wait(req);
+        if (rc)
+                GOTO(out_req, rc);
+
+        pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf), lustre_swab_md_fld);
+        *mf = *pmf; 
+out_req:
+        ptlrpc_req_finished(req);
+        RETURN(rc);
+}
+
+int
+fld_create(struct obd_export *exp, __u64 seq, __u64 mds_num)
 {
         struct obd_export *fld_exp;
         struct md_fld      md_fld;
@@ -168,24 +214,30 @@ int fld_create(struct obd_export *exp, __u64 seq, __u64 mds_num)
         fld_exp = get_fld_exp(exp, seq);
         if (!fld_exp)
                 RETURN(-EINVAL);
-
+        
         md_fld.mf_seq = seq;
         md_fld.mf_mds = mds_num;
 
-        rc = mdc_fld(fld_exp, &md_fld, FLD_CREATE);
+        rc = fld_rpc(fld_exp, &md_fld, FLD_CREATE);
+        
+#ifdef __KERNEL__
         fld_cache_insert(fld_cache, seq, mds_num);
-
+#endif
+        
         RETURN(rc);
 }
 
-int fld_delete(struct obd_export *exp, __u64 seq, __u64 mds_num)
+int
+fld_delete(struct obd_export *exp, __u64 seq, __u64 mds_num)
 {
         struct obd_export *fld_exp;
         struct md_fld      md_fld;
         __u32 rc;
 
+#ifdef __KERNEL__
         fld_cache_delete(fld_cache, seq);
-
+#endif
+        
         fld_exp = get_fld_exp(exp, seq);
         if (!fld_exp)
                 RETURN(-EINVAL);
@@ -193,12 +245,12 @@ int fld_delete(struct obd_export *exp, __u64 seq, __u64 mds_num)
         md_fld.mf_seq = seq;
         md_fld.mf_mds = mds_num;
 
-        rc = mdc_fld(fld_exp, &md_fld, FLD_DELETE);
-
+        rc = fld_rpc(fld_exp, &md_fld, FLD_DELETE);
         RETURN(rc);
 }
 
-int fld_get(struct obd_export *exp, __u64 lu_seq, __u64 *mds_num)
+int
+fld_get(struct obd_export *exp, __u64 lu_seq, __u64 *mds_num)
 {
         struct obd_export *fld_exp;
         struct md_fld      md_fld;
@@ -209,39 +261,45 @@ int fld_get(struct obd_export *exp, __u64 lu_seq, __u64 *mds_num)
                 RETURN(-EINVAL);
 
         md_fld.mf_seq = lu_seq;
-
         vallen = sizeof(struct md_fld);
 
-        rc = mdc_fld(fld_exp, &md_fld, FLD_GET);
-
-        *mds_num = md_fld.mf_mds;
+        rc = fld_rpc(fld_exp, &md_fld, FLD_GET);
+        if (rc == 0)
+                *mds_num = md_fld.mf_mds;
 
         RETURN(rc);
 }
 
-/*lookup fid in the namespace of pfid according to the name*/
-int fld_lookup(struct obd_export *exp, __u64 lu_seq, __u64 *mds_num)
+/* lookup fid in the namespace of pfid according to the name */
+int
+fld_lookup(struct obd_export *exp, __u64 lu_seq, __u64 *mds_num)
 {
         struct fld_cache *fld;
         int rc;
         ENTRY;
 
-        /*lookup it in the cache*/
+#ifdef __KERNEL__
+        /* lookup it in the cache */
         fld = fld_cache_lookup(fld_cache, lu_seq);
         if (fld != NULL) {
                 *mds_num = fld->fld_mds;
                 RETURN(0);
         }
-        /*can not find it in the cache*/
+#endif
+        
+        /* can not find it in the cache */
         rc = fld_get(exp, lu_seq, mds_num);
         if (rc)
                 RETURN(rc);
 
+#ifdef __KERNEL__
         rc = fld_cache_insert(fld_cache, lu_seq, *mds_num);
-
+#endif
+        
         RETURN(rc);
 }
 
+#ifdef __KERNEL__
 static int fld_init(void)
 {
         ENTRY;
@@ -250,10 +308,10 @@ static int fld_init(void)
         if (fld_cache == NULL)
                 RETURN(-ENOMEM);
 
-        /*init fld cache info*/
+        /* init fld cache info */
         fld_cache->fld_hash_mask = FLD_HTABLE_MASK;
         OBD_ALLOC(fld_cache->fld_hash, FLD_HTABLE_SIZE *
-                                       sizeof fld_cache->fld_hash[0]);
+                  sizeof fld_cache->fld_hash[0]);
         spin_lock_init(&fld_cache->fld_lock);
 
         RETURN(0);
@@ -263,7 +321,7 @@ static int fld_fini(void)
 {
         if (fld_cache != NULL) {
                 OBD_FREE(fld_cache->fld_hash, FLD_HTABLE_SIZE *
-                                              sizeof fld_cache->fld_hash[0]);
+                         sizeof fld_cache->fld_hash[0]);
                 OBD_FREE_PTR(fld_cache);
         }
         return 0;
@@ -282,10 +340,11 @@ static void __exit fld_mod_exit(void)
 }
 
 
-struct fld_list fld_list_head;
+static struct fld_list fld_list_head;
 
-static int fld_req_handle0(const struct lu_context *ctx,
-                           struct lu_fld *fld, struct ptlrpc_request *req)
+static int
+fld_req_handle0(const struct lu_context *ctx,
+                struct lu_fld *fld, struct ptlrpc_request *req)
 {
         struct md_fld *in;
         struct md_fld *out;
@@ -309,22 +368,26 @@ static int fld_req_handle0(const struct lu_context *ctx,
                         *out = *in;
 
                         rc = fld_handle(ctx, fld, *opt, out);
-                } else
+                } else {
                         CERROR("Cannot unpack mf\n");
-        } else
+                }
+        } else {
                 CERROR("Cannot unpack option\n");
+        }
         RETURN(rc);
 }
 
 
 static int fld_req_handle(struct ptlrpc_request *req)
 {
-        int result;
+        int fail = OBD_FAIL_FLD_ALL_REPLY_NET;
         const struct lu_context *ctx;
         struct lu_site    *site;
-
+        int result;
         ENTRY;
 
+        OBD_FAIL_RETURN(OBD_FAIL_FLD_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
+
         ctx = req->rq_svc_thread->t_ctx;
         LASSERT(ctx != NULL);
         LASSERT(ctx->lc_thread == req->rq_svc_thread);
@@ -334,16 +397,28 @@ static int fld_req_handle(struct ptlrpc_request *req)
                         site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
                         LASSERT(site != NULL);
                         result = fld_req_handle0(ctx, site->ls_fld, req);
-                } else
+                } else {
                         CERROR("Unconnected request\n");
-        } else
+                        req->rq_status = -ENOTCONN;
+                        GOTO(out, result = -ENOTCONN);
+                }
+        } else {
                 CERROR("Wrong opcode: %d\n", req->rq_reqmsg->opc);
+                req->rq_status = -ENOTSUPP;
+                result = ptlrpc_error(req);
+                RETURN(result);
+        }
 
-        RETURN(result);
+        EXIT;
+out:
+        target_send_reply(req, result, fail);
+        return 0;
 }
 
-int fld_server_init(const struct lu_context *ctx, struct lu_fld *fld,
-                    struct dt_device *dt)
+int
+fld_server_init(const struct lu_context *ctx,
+                struct lu_fld *fld,
+                struct dt_device *dt)
 {
         int result;
         struct ptlrpc_service_conf fld_conf = {
@@ -382,7 +457,9 @@ int fld_server_init(const struct lu_context *ctx, struct lu_fld *fld,
 }
 EXPORT_SYMBOL(fld_server_init);
 
-void fld_server_fini(const struct lu_context *ctx, struct lu_fld *fld)
+void
+fld_server_fini(const struct lu_context *ctx,
+                struct lu_fld *fld)
 {
         struct list_head *pos, *n;
 
@@ -407,8 +484,9 @@ void fld_server_fini(const struct lu_context *ctx, struct lu_fld *fld)
 }
 EXPORT_SYMBOL(fld_server_fini);
 
-static int fld_handle(const struct lu_context *ctx,
-                      struct lu_fld *fld, __u32 opts, struct md_fld *mf)
+static int
+fld_handle(const struct lu_context *ctx,
+           struct lu_fld *fld, __u32 opts, struct md_fld *mf)
 {
         int rc;
         ENTRY;
@@ -435,4 +513,5 @@ MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre fld Prototype");
 MODULE_LICENSE("GPL");
 
-cfs_module(mdd, "0.0.2", fld_mod_init, fld_mod_exit);
+cfs_module(mdd, "0.0.3", fld_mod_init, fld_mod_exit);
+#endif
index 554c75b..3f9186d 100644 (file)
 #ifndef EXPORT_SYMTAB
 # define EXPORT_SYMTAB
 #endif
-#define DEBUG_SUBSYSTEM S_LLITE
-
-#include <linux/module.h>
-#include <linux/jbd.h>
+#define DEBUG_SUBSYSTEM S_FLD
+
+#ifdef __KERNEL__
+# include <libcfs/libcfs.h>
+# include <linux/module.h>
+# include <linux/jbd.h>
+#else /* __KERNEL__ */
+# include <liblustre.h>
+#endif
 
 #include <obd.h>
 #include <obd_class.h>
index c1dfef3..3173cf4 100644 (file)
@@ -34,9 +34,4 @@ struct obd_device;
 int it_disposition(struct lookup_intent *it, int flag);
 void it_set_disposition(struct lookup_intent *it, int flag);
 int it_open_error(int phase, struct lookup_intent *it);
-
-/* mdc/mdc_request.c */
-int mdc_fld(struct obd_export *exp, struct md_fld *mf, 
-            __u32 fld_op);
-
 #endif
index 5c45e85..87c27e4 100644 (file)
@@ -169,10 +169,15 @@ extern cfs_waitq_t obd_race_waitq;
 #define OBD_FAIL_MGS                     0x900
 #define OBD_FAIL_MGS_ALL_REQUEST_NET     0x901
 #define OBD_FAIL_MGS_ALL_REPLY_NET       0x902
+
 #define OBD_FAIL_SEQ                     0x1000
 #define OBD_FAIL_SEQ_ALL_REQUEST_NET     0x1001
 #define OBD_FAIL_SEQ_ALL_REPLY_NET       0x1002
 
+#define OBD_FAIL_FLD                     0x1100
+#define OBD_FAIL_FLD_ALL_REQUEST_NET     0x1101
+#define OBD_FAIL_FLD_ALL_REPLY_NET       0x1102
+
 /* preparation for a more advanced failure testbed (not functional yet) */
 #define OBD_FAIL_MASK_SYS    0x0000FF00
 #define OBD_FAIL_MASK_LOC    (0x000000FF | OBD_FAIL_MASK_SYS)
index a905c18..5d91ff8 100644 (file)
@@ -14,6 +14,7 @@ LUSTRE_LIBS = libllite.a \
               $(top_builddir)/lustre/obdecho/libobdecho.a \
               $(top_builddir)/lustre/osc/libosc.a \
               $(top_builddir)/lustre/fid/libfid.a \
+              $(top_builddir)/lustre/fld/libfld.a \
               $(top_builddir)/lustre/mdc/libmdc.a \
               $(top_builddir)/lustre/ptlrpc/libptlrpc.a \
               $(top_builddir)/lustre/obdclass/liblustreclass.a \
index 2f8f9b4..9d6f695 100755 (executable)
@@ -64,6 +64,7 @@ build_obj_list ../obdecho libobdecho.a
 build_obj_list ../osc libosc.a
 build_obj_list ../mdc libmdc.a
 build_obj_list ../fid libfid.a
+build_obj_list ../fld libfld.a
 build_obj_list ../ptlrpc libptlrpc.a
 build_obj_list ../obdclass liblustreclass.a
 build_obj_list ../lvfs liblvfs.a
index e43081d..4904ab4 100644 (file)
@@ -872,38 +872,6 @@ int mdc_set_info_async(struct obd_export *exp, obd_count keylen,
         RETURN(rc);
 }
 
-int mdc_fld(struct obd_export *exp, struct md_fld *mf, __u32 fld_op)
-{
-        struct ptlrpc_request *req;
-        struct md_fld *pmf;
-        int mf_size = sizeof(*mf);
-        __u32 *op;
-        int size[2] = {sizeof(*op), mf_size}, rc;
-        ENTRY;
-
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              FLD_QUERY, 2, size, NULL);
-        if (req == NULL)
-                RETURN(-ENOMEM);
-
-        op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
-        *op = fld_op;
-
-        pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
-        memcpy(pmf, mf, sizeof(*mf));
-
-        req->rq_replen = lustre_msg_size(1, &mf_size);
-        rc = ptlrpc_queue_wait(req);
-        if (rc)
-                GOTO(out_req, rc);
-
-        pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf), lustre_swab_md_fld);
-        *mf = *pmf; 
-out_req:
-        ptlrpc_req_finished(req);
-        RETURN(rc);
-}
-
 static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                       cfs_time_t max_age)
 {
@@ -1346,8 +1314,6 @@ MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Metadata Client");
 MODULE_LICENSE("GPL");
 
-EXPORT_SYMBOL(mdc_fld);
-
 module_init(mdc_init);
 module_exit(mdc_exit);
 #endif