Whamcloud - gitweb
- move the peter branch changes to the head
authorbraam <braam>
Wed, 31 Jul 2002 20:49:39 +0000 (20:49 +0000)
committerbraam <braam>
Wed, 31 Jul 2002 20:49:39 +0000 (20:49 +0000)
49 files changed:
lustre/archdep.m4
lustre/configure.in
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_lib.h
lustre/include/linux/lustre_lite.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/ldlm/Makefile.am
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_test.c
lustre/lib/l_net.c
lustre/lib/lov_pack.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/llite/recover.c
lustre/llite/rw.c
lustre/llite/super.c
lustre/lov/lov_obd.c
lustre/mdc/Makefile.am
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_extN.c
lustre/mds/mds_reint.c
lustre/obdclass/class_obd.c
lustre/obdecho/echo.c
lustre/obdfilter/filter.c
lustre/osc/Makefile.am
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/service.c
lustre/tests/Makefile.am
lustre/tests/common.sh
lustre/tests/intent-test.sh
lustre/tests/llsetup.sh
lustre/tests/lov.xml
lustre/utils/device.c
lustre/utils/lconf
lustre/utils/lustre.dtd
lustre/utils/obdctl.c
lustre/utils/parser.c

index eae4bf0..f298ede 100644 (file)
@@ -15,7 +15,7 @@ AC_MSG_CHECKING(setting make flags system architecture: )
 case ${host_cpu} in
        um )
        AC_MSG_RESULT($host_cpu)
-       KCFLAGS='-g  -Wall -pipe -Wno-trigraphs -Wstrict-prototypes -fno-strict-aliasing -fno-common '
+       KCFLAGS='-g -Wall -pipe -Wno-trigraphs -Wstrict-prototypes -fno-strict-aliasing -fno-common '
        KCPPFLAGS='-D__KERNEL__ -U__i386__ -Ui386 -DUM_FASTCALL -D__arch_um__ -DSUBARCH="i386" -DNESTING=0 -D_LARGEFILE64_SOURCE  -Derrno=kernel_errno -DPATCHLEVEL=4 -DMODULE -I$(LINUX)/arch/um/include '
         MOD_LINK=elf_i386
 ;;
index 453465a..eea6d94 100644 (file)
@@ -100,7 +100,8 @@ AC_SUBST(docdir)
 demodir='$(docdir)/demo'
 AC_SUBST(demodir)
 
-AM_CONFIG_HEADER(include/config.h)
+# not needed until the AC_CHECK_LIB(readline) above works
+# AM_CONFIG_HEADER(include/config.h)
 
 AC_OUTPUT(Makefile lib/Makefile ldlm/Makefile \
          obdecho/Makefile ptlrpc/Makefile \
index f4d7e3b..cf1e79b 100644 (file)
@@ -33,6 +33,7 @@ typedef enum {
 #define LDLM_FL_CBPENDING      (1 << 4)
 #define LDLM_FL_AST_SENT       (1 << 5)
 #define LDLM_FL_DESTROYED      (1 << 6)
+#define LDLM_FL_WAIT_NOREPROC  (1 << 7)
 
 #define L2B(c) (1 << c)
 
@@ -104,6 +105,8 @@ typedef int (*ldlm_lock_callback)(struct lustre_handle *lockh,
                                   struct ldlm_lock_desc *new, void *data,
                                   __u32 data_len);
 
+typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags); 
+
 struct ldlm_lock {
         __u64                  l_random;
         int                   l_refc;
@@ -116,11 +119,12 @@ struct ldlm_lock {
         ldlm_mode_t           l_req_mode;
         ldlm_mode_t           l_granted_mode;
 
-        ldlm_lock_callback    l_completion_ast;
+        ldlm_completion_callback    l_completion_ast;
         ldlm_lock_callback    l_blocking_ast;
 
         struct ptlrpc_connection *l_connection;
         struct ptlrpc_client *l_client;
+        struct lustre_handle *l_connh;
         __u32                 l_flags;
         struct lustre_handle    l_remote_handle;
         void                 *l_data;
@@ -178,6 +182,7 @@ struct ldlm_ast_work {
         int               w_blocking;
         struct ldlm_lock_desc w_desc;
         struct list_head   w_list;
+        int w_flags;
         void *w_data;
         int w_datalen;
 };
@@ -192,6 +197,7 @@ extern struct obd_ops ldlm_obd_ops;
 
 extern char *ldlm_lockname[];
 extern char *ldlm_typename[];
+extern char *ldlm_it2str(int it);
 
 #define LDLM_DEBUG(lock, format, a...)                                  \
 do {                                                                    \
@@ -225,7 +231,15 @@ do {                                                                    \
 int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *);
 int ldlm_extent_policy(struct ldlm_lock *, void *, ldlm_mode_t, void *);
 
+/* ldlm_lockd.c */
+int ldlm_handle_enqueue(struct ptlrpc_request *req);
+int ldlm_handle_convert(struct ptlrpc_request *req);
+int ldlm_handle_cancel(struct ptlrpc_request *req);
+
 /* ldlm_lock.c */
+void ldlm_register_intent(int (*arg)(struct ldlm_lock *lock, void *req_cookie,
+                                     ldlm_mode_t mode, void *data));
+void ldlm_unregister_intent(void);
 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
 struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle);
 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
@@ -260,7 +274,7 @@ ldlm_lock_create(struct ldlm_namespace *ns,
                  __u32 data_len);
 ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, void *cookie,
                                int cookie_len, int *flags,
-                               ldlm_lock_callback completion,
+                               ldlm_completion_callback completion,
                                ldlm_lock_callback blocking);
 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                                         int *flags);
@@ -290,9 +304,8 @@ void ldlm_resource_dump(struct ldlm_resource *res);
 int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]);
 
 /* ldlm_request.c */
-int ldlm_cli_enqueue(struct ptlrpc_client *cl, 
-                     struct ptlrpc_connection *peer,
-                     struct lustre_handle *connh,
+int ldlm_completion_ast(struct ldlm_lock *lock, int flags);
+int ldlm_cli_enqueue(struct lustre_handle *conn,
                      struct ptlrpc_request *req,
                      struct ldlm_namespace *ns,
                      struct lustre_handle *parent_lock_handle,
@@ -301,13 +314,12 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl,
                      void *cookie, int cookielen,
                      ldlm_mode_t mode,
                      int *flags,
+                     ldlm_completion_callback completion,
                      ldlm_lock_callback callback,
                      void *data,
                      __u32 data_len,
                      struct lustre_handle *lockh);
-int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
-                          struct ptlrpc_connection *conn,
-                          struct lustre_handle *connh, 
+int ldlm_match_or_enqueue(struct lustre_handle *connh, 
                           struct ptlrpc_request *req,
                           struct ldlm_namespace *ns,
                           struct lustre_handle *parent_lock_handle,
@@ -316,14 +328,14 @@ int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
                           void *cookie, int cookielen,
                           ldlm_mode_t mode,
                           int *flags,
+                          ldlm_completion_callback completion,
                           ldlm_lock_callback callback,
                           void *data,
                           __u32 data_len,
                           struct lustre_handle *lockh);
 int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
                     void *data, __u32 data_len);
-int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *,
-                     struct lustre_handle *connh, int new_mode, int *flags);
+int ldlm_cli_convert(struct lustre_handle *, int new_mode, int *flags);
 int ldlm_cli_cancel(struct lustre_handle *lockh);
 
 #endif /* __KERNEL__ */
index 1814325..5fa091e 100644 (file)
@@ -196,12 +196,15 @@ struct lov_object_id { /* per-child structure */
         __u64 l_object_id;
 };
 
+#define LOV_MAGIC  0x0BD00BD0
+
 struct lov_stripe_md {
-        __u64 lmd_magic;
-        __u64 lmd_object_id;     /* lov object id */
-        __u64 lmd_stripe_count;
-        __u32 lmd_size;
-        __u32 lmd_stripe_size;
+        __u32 lmd_magic;
+        __u32 lmd_easize;          /* packed size of extended */
+        __u64 lmd_object_id;       /* lov object id */
+        __u64 lmd_stripe_offset;   /* offset of the stripe */ 
+        __u64 lmd_stripe_size;     /* size of the stripe */
+        __u32 lmd_stripe_count;    /* how many objects are being striped */
         __u32 lmd_stripe_pattern;  /* per-lov object stripe pattern */
         struct lov_object_id lmd_objects[0];
 };
@@ -400,25 +403,26 @@ struct mds_rec_rename {
  *  LOV data structures
  */
 
-#define LOV_RAID0  0
+#define LOV_RAID0   0
+#define LOV_RAIDRR  1
+
 struct lov_desc { 
-        __u32 ld_tgt_count; /* how many OBD's */
-        __u32 ld_default_stripecount;
-        __u32 ld_default_stripesize;   /* in bytes */
-        __u32 ld_pattern; /* RAID 0,1 etc */
+        __u32 ld_tgt_count;                /* how many OBD's */
+        __u32 ld_default_stripe_count;     /* how many objects are used */
+        __u64 ld_default_stripe_size;      /* in bytes */
+        __u64 ld_default_stripe_offset;    /* in bytes */
+        __u32 ld_pattern;                  /* RAID 0,1 etc */
          uuid_t ld_uuid;
 }; 
 
 /*
  *   LDLM requests:
  */
-
-/* opcodes */
-#define LDLM_REPLY         0
-#define LDLM_ENQUEUE       1
-#define LDLM_CONVERT       2
-#define LDLM_CANCEL        3
-#define LDLM_CALLBACK      4
+/* opcodes -- MUST be distinct from OST/MDS opcodes */
+#define LDLM_ENQUEUE       101
+#define LDLM_CONVERT       102
+#define LDLM_CANCEL        103
+#define LDLM_CALLBACK      104
 
 #define RES_NAME_SIZE 3
 #define RES_VERSION_SIZE 4
index 3bb73a0..c58c1e9 100644 (file)
 #else
 # include <asm/semaphore.h>
 #endif
-
 #include <linux/portals_lib.h>
 #include <linux/lustre_idl.h>
 
 #ifdef __KERNEL__
 /* l_net.c */
 struct ptlrpc_request;
+struct obd_device;
 int target_handle_connect(struct ptlrpc_request *req);
 int target_handle_disconnect(struct ptlrpc_request *req);
+int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd);
+int client_obd_disconnect(struct lustre_handle *conn);
+int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf);
+int client_obd_cleanup(struct obd_device * obddev);
+struct client_obd *client_conn2cli(struct lustre_handle *conn); 
 
 /* l_lock.c */
 struct lustre_lock { 
index 41c9093..d74f319 100644 (file)
@@ -82,12 +82,12 @@ static inline struct lustre_handle *ll_s2obdconn(struct super_block *sb)
         return &(ll_s2sbi(sb))->ll_osc_conn;
 }
 
-static inline struct mdc_obd *sbi2mdc(struct ll_sb_info *sbi)
+static inline struct client_obd *sbi2mdc(struct ll_sb_info *sbi)
 {
         struct obd_device *obd = class_conn2obd(&sbi->ll_mdc_conn);
         if (obd == NULL)
                 LBUG();
-        return &obd->u.mdc;
+        return &obd->u.cli;
 }
 
 static inline struct ll_sb_info *ll_i2sbi(struct inode *inode)
index 3ede83d..a834dd3 100644 (file)
@@ -126,12 +126,12 @@ void mds_pack_inode2fid(struct ll_fid *fid, struct inode *inode);
 void mds_pack_inode2body(struct mds_body *body, struct inode *inode);
 
 /* mds/handler.c */
-struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir,
+struct dentry *mds_name2locked_dentry(struct obd_device *obd, struct dentry *dir,
                                       struct vfsmount **mnt, char *name,
                                       int namelen, int lock_mode,
                                       struct lustre_handle *lockh,
                                       int dir_lock_mode);
-struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid,
+struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct vfsmount **mnt, int lock_mode,
                                      struct lustre_handle *lockh);
 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
index db810a1..fffe5a4 100644 (file)
@@ -144,10 +144,6 @@ struct ptlrpc_bulk_page {
         __u32 b_flags;
         struct dentry *b_dentry;
         int (*b_cb)(struct ptlrpc_bulk_page *);
-
-        ptl_md_t b_md;
-        ptl_handle_md_t b_md_h;
-        ptl_handle_me_t b_me_h;
 };
 
 struct ptlrpc_bulk_desc {
@@ -162,10 +158,15 @@ struct ptlrpc_bulk_desc {
         wait_queue_head_t b_waitq;
         struct list_head b_page_list;
         __u32 b_page_count;
-        atomic_t b_pages_remaining;
         atomic_t b_refcount;
         void *b_desc_private;
         struct tq_struct b_queue;
+
+        ptl_md_t b_md;
+        ptl_handle_md_t b_md_h;
+        ptl_handle_me_t b_me_h;
+
+        struct iovec b_iov[16];                 /* self-sized pre-allocated iov */
 };
 
 struct ptlrpc_thread {
@@ -209,9 +210,7 @@ static inline void ptlrpc_hdl2req(struct ptlrpc_request *req, struct lustre_hand
         req->rq_reqmsg->addr = h->addr;
         req->rq_reqmsg->cookie = h->cookie;
 }
-struct ptlrpc_request *ptlrpc_prep_req2(struct ptlrpc_client *cl,
-                                        struct ptlrpc_connection *conn,
-                                        struct lustre_handle *handle,
+struct ptlrpc_request *ptlrpc_prep_req2(struct lustre_handle *conn, 
                                         int opcode, int count, int *lengths,
                                         char **bufs);
 
index ac4a75e..9eebcd3 100644 (file)
@@ -58,27 +58,28 @@ struct filter_obd {
 struct mds_client_info;
 struct mds_server_data;
 
-struct mdc_obd {
-        struct ptlrpc_client *mdc_client;
-        struct ptlrpc_client *mdc_ldlm_client;
-        struct ptlrpc_connection *mdc_conn;
-        int mdc_max_mdsize;
-        __u8 mdc_target_uuid[37];
+struct client_obd {
+        struct ptlrpc_client *cl_client;
+        struct ptlrpc_client *cl_ldlm_client;
+        struct ptlrpc_connection *cl_conn;
+        struct lustre_handle cl_exporth;
+        struct semaphore cl_sem;
+        int cl_conn_count;
+        __u8 cl_target_uuid[37];
+        int cl_max_mdsize;
 };
 
+#if 0
 struct osc_obd {
         struct ptlrpc_client *osc_client;
         struct ptlrpc_client *osc_ldlm_client;
         struct ptlrpc_connection *osc_conn;
         __u8 osc_target_uuid[37];
 };
+#endif
 
 struct mds_obd {
-        struct ldlm_namespace *mds_local_namespace;
         struct ptlrpc_service *mds_service;
-        struct ptlrpc_client *mds_ldlm_client; /* to be an LDLM client */
-        struct ptlrpc_connection *mds_ldlm_conn; /* to be an LDLM client */
-        struct lustre_handle mds_connh; /* to be one's on DLM client */
 
         char *mds_fstype;
         struct super_block *mds_sb;
@@ -195,9 +196,9 @@ struct obd_device {
                 struct ext2_obd ext2;
                 struct filter_obd filter;
                 struct mds_obd mds;
-                struct mdc_obd mdc;
+                struct client_obd cli;
                 struct ost_obd ost;
-                struct osc_obd osc;
+                //                struct osc_obd osc;
                 struct ldlm_obd ldlm;
                 struct echo_obd echo;
                 struct recovd_obd recovd;
@@ -233,8 +234,10 @@ struct obd_ops {
                         struct lov_stripe_md **ea);
         int (*o_destroy)(struct lustre_handle *conn, struct obdo *oa,
                          struct lov_stripe_md *ea);
-        int (*o_setattr)(struct lustre_handle *conn, struct obdo *oa);
-        int (*o_getattr)(struct lustre_handle *conn, struct obdo *oa);
+        int (*o_setattr)(struct lustre_handle *conn, struct obdo *oa,
+                         struct lov_stripe_md *ea);
+        int (*o_getattr)(struct lustre_handle *conn, struct obdo *oa,
+                         struct lov_stripe_md *ea);
         int (*o_open)(struct lustre_handle *conn, struct obdo *oa,
                       struct lov_stripe_md *);
         int (*o_close)(struct lustre_handle *conn, struct obdo *oa,
index 906e42c..927385c 100644 (file)
@@ -238,14 +238,16 @@ static inline int obd_destroy(struct lustre_handle *conn, struct obdo *obdo, str
         RETURN(rc);
 }
 
-static inline int obd_getattr(struct lustre_handle *conn, struct obdo *obdo)
+static inline int obd_getattr(struct lustre_handle *conn, 
+                              struct obdo *obdo,
+                              struct lov_stripe_md *ea)
 {
         int rc;
         struct obd_export *export;
         OBD_CHECK_SETUP(conn, export);
         OBD_CHECK_OP(export->exp_obd,getattr);
 
-        rc = OBP(export->exp_obd, getattr)(conn, obdo);
+        rc = OBP(export->exp_obd, getattr)(conn, obdo, ea);
         RETURN(rc);
 }
 
@@ -271,14 +273,16 @@ static inline int obd_open(struct lustre_handle *conn, struct obdo *obdo,
         RETURN(rc);
 }
 
-static inline int obd_setattr(struct lustre_handle *conn, struct obdo *obdo)
+static inline int obd_setattr(struct lustre_handle *conn, 
+                              struct obdo *obdo,
+                              struct lov_stripe_md *ea)
 {
         int rc;
         struct obd_export *export;
         OBD_CHECK_SETUP(conn, export);
         OBD_CHECK_OP(export->exp_obd,setattr);
 
-        rc = OBP(export->exp_obd, setattr)(conn, obdo);
+        rc = OBP(export->exp_obd, setattr)(conn, obdo, ea);
         RETURN(rc);
 }
 
index 606c1dd..d3ef194 100644 (file)
@@ -10,11 +10,13 @@ EXTRA_PROGRAMS = ldlm
 
 l_lock.c: 
        test -e l_lock.c || ln -sf $(top_srcdir)/lib/l_lock.c
+l_net.c: 
+       test -e l_net.c || ln -sf $(top_srcdir)/lib/l_net.c
 
-LINX=l_lock.c
+LINX=l_lock.c l_net.c
 
 ldlm_SOURCES = l_lock.c ldlm_lock.c ldlm_resource.c ldlm_test.c ldlm_lockd.c \
-ldlm_extent.c ldlm_request.c
+ldlm_extent.c ldlm_request.c l_net.c
 
 include $(top_srcdir)/Rules
 
index 75c3cf3..c10d560 100644 (file)
@@ -35,13 +35,50 @@ char *ldlm_typename[] = {
         [LDLM_MDSINTENT] "INT"
 };
 
+char *ldlm_it2str(int it)
+{
+        switch (it) { 
+        case IT_OPEN:
+                return "open";
+        case IT_CREAT:
+                return "creat";
+        case (IT_OPEN|IT_CREAT):
+                return "open|creat";
+        case IT_MKDIR:
+                return "mkdir";
+        case IT_LINK:
+               return "link";
+        case IT_SYMLINK:
+                return "symlink";
+        case IT_UNLINK:
+                return "unlink";
+        case IT_RMDIR:
+               return "rmdir";
+        case IT_RENAME:
+                return "rename";
+        case IT_RENAME2:
+                return "rename2";
+        case IT_READDIR:
+                return "readdir";
+        case IT_GETATTR:
+                return "getattr";
+        case IT_SETATTR:
+                return "setattr";
+        case IT_READLINK:
+                return "readlink";
+        case IT_MKNOD:
+                return "mknod";
+        case IT_LOOKUP:
+                return "lookup";
+        default:
+                CERROR("Unknown intent %d\n", it); 
+                return "UNKNOWN";
+        }
+}
+
 extern kmem_cache_t *ldlm_lock_slab;
-int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL;
-int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL;
 
 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b);
-static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
-                              ldlm_mode_t mode, void *data);
 
 ldlm_res_compat ldlm_res_compat_table [] = {
         [LDLM_PLAIN] ldlm_plain_compat,
@@ -52,9 +89,19 @@ ldlm_res_compat ldlm_res_compat_table [] = {
 ldlm_res_policy ldlm_res_policy_table [] = {
         [LDLM_PLAIN] NULL,
         [LDLM_EXTENT] ldlm_extent_policy,
-        [LDLM_MDSINTENT] ldlm_intent_policy
+        [LDLM_MDSINTENT] NULL
 };
 
+void ldlm_register_intent(int (*arg)(struct ldlm_lock *lock, void *req_cookie,
+                              ldlm_mode_t mode, void *data))
+{
+        ldlm_res_policy_table[LDLM_MDSINTENT] = arg;
+}
+
+void ldlm_unregister_intent()
+{
+        ldlm_res_policy_table[LDLM_MDSINTENT] = NULL;
+}
 
 /*
  * REFCOUNTED LOCK OBJECTS
@@ -251,143 +298,6 @@ struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle)
 
 
 
-static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
-                              ldlm_mode_t mode, void *data)
-{
-        struct ptlrpc_request *req = req_cookie;
-        int rc = 0;
-        ENTRY;
-
-        if (!req_cookie)
-                RETURN(0);
-
-        if (req->rq_reqmsg->bufcount > 1) {
-                /* an intent needs to be considered */
-                struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
-                struct mds_obd *mds= &req->rq_export->exp_obd->u.mds;
-                struct mds_body *mds_rep;
-                struct ldlm_reply *rep;
-                __u64 new_resid[3] = {0, 0, 0}, old_res;
-                int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply),
-                                                  sizeof(struct mds_body),
-                                                  mds->mds_max_mdsize};
-
-                it->opc = NTOH__u64(it->opc);
-
-                LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc);
-
-                /* prepare reply */
-                switch(it->opc) {
-                case IT_GETATTR:
-                        /* Note that in the negative case you may be returning
-                         * a file and its obdo */
-                case IT_CREAT:
-                case IT_CREAT|IT_OPEN:
-                case IT_LINK:
-                case IT_LOOKUP:
-                case IT_MKDIR:
-                case IT_MKNOD:
-                case IT_OPEN:
-                case IT_READLINK:
-                case IT_RENAME:
-                case IT_RMDIR:
-                case IT_SETATTR:
-                case IT_SYMLINK:
-                case IT_UNLINK:
-                        bufcount = 3;
-                        break;
-                case IT_RENAME2:
-                        bufcount = 1;
-                        break;
-                default:
-                        LBUG();
-                }
-
-                rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
-                                     &req->rq_repmsg);
-                if (rc) {
-                        rc = req->rq_status = -ENOMEM;
-                        RETURN(rc);
-                }
-
-                rep = lustre_msg_buf(req->rq_repmsg, 0);
-                rep->lock_policy_res1 = 1;
-
-                /* execute policy */
-                switch (it->opc) {
-                case IT_CREAT:
-                case IT_CREAT|IT_OPEN:
-                case IT_LINK:
-                case IT_MKDIR:
-                case IT_MKNOD:
-                case IT_RENAME2:
-                case IT_RMDIR:
-                case IT_SYMLINK:
-                case IT_UNLINK:
-                        rc = mds_reint_p(2, req);
-                        if (rc || req->rq_status != 0) {
-                                rep->lock_policy_res2 = req->rq_status;
-                                RETURN(ELDLM_LOCK_ABORTED);
-                        }
-                        break;
-                case IT_GETATTR:
-                case IT_LOOKUP:
-                case IT_OPEN:
-                case IT_READDIR:
-                case IT_READLINK:
-                case IT_RENAME:
-                case IT_SETATTR:
-                        rc = mds_getattr_name_p(2, req);
-                        /* FIXME: we need to sit down and decide on who should
-                         * set req->rq_status, who should return negative and
-                         * positive return values, and what they all mean. */
-                        if (rc || req->rq_status != 0) {
-                                rep->lock_policy_res2 = req->rq_status;
-                                RETURN(ELDLM_LOCK_ABORTED);
-                        }
-                        break;
-                case IT_READDIR|IT_OPEN:
-                        LBUG();
-                        break;
-                default:
-                        CERROR("Unhandled intent\n");
-                        LBUG();
-                }
-
-                if (it->opc == IT_UNLINK || it->opc == IT_RMDIR ||
-                    it->opc == IT_RENAME || it->opc == IT_RENAME2)
-                        RETURN(ELDLM_LOCK_ABORTED);
-
-                rep->lock_policy_res2 = req->rq_status;
-                mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
-                new_resid[0] = NTOH__u32(mds_rep->ino);
-                if (new_resid[0] == 0)
-                        LBUG();
-                old_res = lock->l_resource->lr_name[0];
-
-                CDEBUG(D_INFO, "remote intent: locking %d instead of"
-                       "%ld\n", mds_rep->ino, (long)old_res);
-
-                ldlm_lock_change_resource(lock, new_resid);
-                if (lock->l_resource == NULL) {
-                        LBUG();
-                        RETURN(-ENOMEM);
-                }
-                LDLM_DEBUG(lock, "intent policy, old res %ld",
-                           (long)old_res);
-                RETURN(ELDLM_LOCK_CHANGED);
-        } else {
-                int size = sizeof(struct ldlm_reply);
-                rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
-                                     &req->rq_repmsg);
-                if (rc) {
-                        CERROR("out of memory\n");
-                        LBUG();
-                        RETURN(-ENOMEM);
-                }
-        }
-        RETURN(rc);
-}
 
 static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b)
 {
@@ -572,6 +482,7 @@ void ldlm_grant_lock(struct ldlm_lock *lock)
         EXIT;
 }
 
+/* returns a referenced lock or NULL */ 
 static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                                       struct ldlm_extent *extent)
 {
@@ -604,7 +515,8 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
 /* Must be called with no resource or lock locks held.
  *
  * Returns 1 if it finds an already-existing lock that is compatible; in this
- * case, lockh is filled in with a addref()ed lock */
+ * case, lockh is filled in with a addref()ed lock 
+*/
 int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
                     void *cookie, int cookielen, ldlm_mode_t mode,
                     struct lustre_handle *lockh)
@@ -635,8 +547,8 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type,
 
         if (lock) {
                 ldlm_lock2handle(lock, lockh);
-                wait_event(lock->l_waitq,
-                           lock->l_req_mode == lock->l_granted_mode);
+                if (lock->l_completion_ast)
+                        lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); 
         }
         if (rc)
                 LDLM_DEBUG(lock, "matched");
@@ -681,7 +593,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
 ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock,
                                void *cookie, int cookie_len,
                                int *flags,
-                               ldlm_lock_callback completion,
+                               ldlm_completion_callback completion,
                                ldlm_lock_callback blocking)
 {
         struct ldlm_resource *res;
@@ -799,8 +711,7 @@ void ldlm_run_ast_work(struct list_head *rpc_list)
                         rc = w->w_lock->l_blocking_ast
                                 (&lockh, &w->w_desc, w->w_data, w->w_datalen);
                 else
-                        rc = w->w_lock->l_completion_ast
-                                (&lockh, NULL, w->w_data, w->w_datalen);
+                        rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags);
                 if (rc)
                         CERROR("Failed AST - should clean & disconnect "
                                "client\n");
@@ -885,7 +796,8 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                         res->lr_tmp = NULL;
                         granted = 1;
                         /* FIXME: completion handling not with ns_lock held ! */
-                        wake_up(&lock->l_waitq);
+                        if (lock->l_completion_ast)
+                                lock->l_completion_ast(lock, 0); 
                 }
         } else
                 list_add(&lock->l_res_link, res->lr_converting.prev);
index 36eb21b..bc659f9 100644 (file)
@@ -21,7 +21,40 @@ extern kmem_cache_t *ldlm_lock_slab;
 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
 
-static int ldlm_handle_enqueue(struct ptlrpc_request *req)
+int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
+{
+        struct ldlm_request *body;
+        struct ptlrpc_request *req;
+        struct ptlrpc_client *cl;
+        int rc = 0, size = sizeof(*body);
+        ENTRY;
+
+        if (lock == NULL) {
+                LBUG();
+                RETURN(-EINVAL);
+        }
+
+        cl = &lock->l_resource->lr_namespace->ns_rpc_client;
+        req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
+                              &size, NULL);
+        if (!req)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        memcpy(&body->lock_handle1, &lock->l_remote_handle,
+               sizeof(body->lock_handle1));
+        body->lock_flags = flags;
+
+        LDLM_DEBUG(lock, "server preparing completion AST");
+        req->rq_replen = lustre_msg_size(0, NULL);
+
+        rc = ptlrpc_queue_wait(req);
+        rc = ptlrpc_check_status(req, rc);
+        ptlrpc_free_req(req);
+        RETURN(rc);
+}
+
+int ldlm_handle_enqueue(struct ptlrpc_request *req)
 {
         struct obd_device *obddev = req->rq_export->exp_obd;
         struct ldlm_reply *dlm_rep;
@@ -68,7 +101,7 @@ static int ldlm_handle_enqueue(struct ptlrpc_request *req)
 
         flags = dlm_req->lock_flags;
         err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
-                                ldlm_server_ast, ldlm_server_ast);
+                                ldlm_server_completion_ast, ldlm_server_ast);
         if (err != ELDLM_OK)
                 GOTO(out, err);
 
@@ -104,7 +137,7 @@ static int ldlm_handle_enqueue(struct ptlrpc_request *req)
         return 0;
 }
 
-static int ldlm_handle_convert(struct ptlrpc_request *req)
+int ldlm_handle_convert(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
         struct ldlm_reply *dlm_rep;
@@ -143,7 +176,7 @@ static int ldlm_handle_convert(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int ldlm_handle_cancel(struct ptlrpc_request *req)
+int ldlm_handle_cancel(struct ptlrpc_request *req)
 {
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
@@ -159,6 +192,8 @@ static int ldlm_handle_cancel(struct ptlrpc_request *req)
 
         lock = ldlm_handle2lock(&dlm_req->lock_handle1);
         if (!lock) {
+                LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock %p)",
+                                  (void *)(unsigned long) dlm_req->lock_handle1.addr);
                 req->rq_status = ESTALE;
         } else {
                 LDLM_DEBUG(lock, "server-side cancel handler START");
@@ -173,9 +208,7 @@ static int ldlm_handle_cancel(struct ptlrpc_request *req)
                 ldlm_reprocess_all(lock->l_resource);
                 LDLM_DEBUG(lock, "server-side cancel handler END");
                 LDLM_LOCK_PUT(lock);
-        } else
-                LDLM_DEBUG_NOLOCK("server-side cancel handler END (lock %p)",
-                                  lock);
+        } 
 
         RETURN(0);
 }
@@ -242,29 +275,27 @@ static int ldlm_handle_callback(struct ptlrpc_request *req)
                 }
                 LDLM_LOCK_PUT(lock);
         } else {
-                struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+                struct list_head ast_list = LIST_HEAD_INIT(ast_list);
 
                 l_lock(&lock->l_resource->lr_namespace->ns_lock);
                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
 
                 /* If we receive the completion AST before the actual enqueue
                  * returned, then we might need to switch resources. */
+                ldlm_resource_unlink_lock(lock);
                 if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
                            lock->l_resource->lr_name,
                            sizeof(__u64) * RES_NAME_SIZE) != 0) {
                         ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
                         LDLM_DEBUG(lock, "completion AST, new resource");
                 }
-                lock->l_resource->lr_tmp = &rpc_list;
-                ldlm_resource_unlink_lock(lock);
+                lock->l_resource->lr_tmp = &ast_list;
                 ldlm_grant_lock(lock);
-                /*  FIXME: we want any completion function, not just wake_up */
-                wake_up(&lock->l_waitq);
                 lock->l_resource->lr_tmp = NULL;
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 LDLM_LOCK_PUT(lock);
 
-                ldlm_run_ast_work(&rpc_list);
+                ldlm_run_ast_work(&ast_list);
         }
 
         LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)",
@@ -272,7 +303,8 @@ static int ldlm_handle_callback(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int ldlm_handle(struct ptlrpc_request *req)
+
+static int ldlm_callback_handler(struct ptlrpc_request *req)
 {
         int rc;
         ENTRY;
@@ -288,31 +320,7 @@ static int ldlm_handle(struct ptlrpc_request *req)
                        req->rq_reqmsg->type);
                 GOTO(out, rc = -EINVAL);
         }
-
-        if (!req->rq_export && req->rq_reqmsg->opc == LDLM_ENQUEUE) {
-                CERROR("No export handle for enqueue request.\n");
-                GOTO(out, rc = -ENOTCONN);
-        }
-
         switch (req->rq_reqmsg->opc) {
-        case LDLM_ENQUEUE:
-                CDEBUG(D_INODE, "enqueue\n");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
-                rc = ldlm_handle_enqueue(req);
-                break;
-
-        case LDLM_CONVERT:
-                CDEBUG(D_INODE, "convert\n");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
-                rc = ldlm_handle_convert(req);
-                break;
-
-        case LDLM_CANCEL:
-                CDEBUG(D_INODE, "cancel\n");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
-                rc = ldlm_handle_cancel(req);
-                break;
-
         case LDLM_CALLBACK:
                 CDEBUG(D_INODE, "callback\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
@@ -331,6 +339,7 @@ out:
         return 0;
 }
 
+
 static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
                           void *karg, void *uarg)
 {
@@ -385,29 +394,16 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
         MOD_INC_USE_COUNT;
         ldlm->ldlm_service =
                 ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
-                                LDLM_REPLY_PORTAL, "self", ldlm_handle);
+                                LDLM_REPLY_PORTAL, "self", ldlm_callback_handler);
         if (!ldlm->ldlm_service) {
                 LBUG();
                 GOTO(out_dec, rc = -ENOMEM);
         }
 
-        if (mds_reint_p == NULL)
-                mds_reint_p = inter_module_get_request("mds_reint", "mds");
-        if (IS_ERR(mds_reint_p)) {
-                CERROR("MDSINTENT locks require the MDS module.\n");
-                GOTO(out_dec, rc = -EINVAL);
-        }
-        if (mds_getattr_name_p == NULL)
-                mds_getattr_name_p = inter_module_get_request
-                        ("mds_getattr_name", "mds");
-        if (IS_ERR(mds_getattr_name_p)) {
-                CERROR("MDSINTENT locks require the MDS module.\n");
-                GOTO(out_dec, rc = -EINVAL);
-        }
-
         for (i = 0; i < LDLM_NUM_THREADS; i++) {
-                rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
-                                         "lustre_dlm");
+                char name[32];
+                sprintf(name, "lustre_dlm_%2d", i); 
+                rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
                 if (rc) {
                         CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
                         LBUG();
@@ -421,10 +417,6 @@ out_thread:
         ptlrpc_stop_all_threads(ldlm->ldlm_service);
         ptlrpc_unregister_service(ldlm->ldlm_service);
 out_dec:
-        if (mds_reint_p != NULL)
-                inter_module_put("mds_reint");
-        if (mds_getattr_name_p != NULL)
-                inter_module_put("mds_getattr_name");
         MOD_DEC_USE_COUNT;
         return rc;
 }
@@ -437,11 +429,6 @@ static int ldlm_cleanup(struct obd_device *obddev)
         ptlrpc_stop_all_threads(ldlm->ldlm_service);
         ptlrpc_unregister_service(ldlm->ldlm_service);
 
-        if (mds_reint_p != NULL)
-                inter_module_put("mds_reint");
-        if (mds_getattr_name_p != NULL)
-                inter_module_put("mds_getattr_name");
-
         MOD_DEC_USE_COUNT;
         RETURN(0);
 }
@@ -487,16 +474,24 @@ static void __exit ldlm_exit(void)
                 CERROR("couldn't free ldlm lock slab\n");
 }
 
+EXPORT_SYMBOL(ldlm_completion_ast);
+EXPORT_SYMBOL(ldlm_handle_enqueue);
+EXPORT_SYMBOL(ldlm_handle_cancel);
+EXPORT_SYMBOL(ldlm_handle_convert);
+EXPORT_SYMBOL(ldlm_register_intent);
+EXPORT_SYMBOL(ldlm_unregister_intent); 
 EXPORT_SYMBOL(ldlm_lockname);
 EXPORT_SYMBOL(ldlm_typename);
 EXPORT_SYMBOL(ldlm_handle2lock);
 EXPORT_SYMBOL(ldlm_lock_match);
 EXPORT_SYMBOL(ldlm_lock_addref);
 EXPORT_SYMBOL(ldlm_lock_decref);
+EXPORT_SYMBOL(ldlm_lock_change_resource);
 EXPORT_SYMBOL(ldlm_cli_convert);
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 EXPORT_SYMBOL(ldlm_cli_cancel);
 EXPORT_SYMBOL(ldlm_match_or_enqueue);
+EXPORT_SYMBOL(ldlm_it2str);
 EXPORT_SYMBOL(ldlm_test);
 EXPORT_SYMBOL(ldlm_lock_dump);
 EXPORT_SYMBOL(ldlm_namespace_new);
index 6ca04c0..5f21751 100644 (file)
 
 #include <linux/lustre_dlm.h>
 
-int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
-                     struct lustre_handle *connh, 
+int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
+{
+
+        ENTRY;
+
+        if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
+                      LDLM_FL_BLOCK_CONV)) {
+                /* Go to sleep until the lock is granted. */
+                /* FIXME: or cancelled. */
+                LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
+                           " sleeping");
+                ldlm_lock_dump(lock);
+                ldlm_reprocess_all(lock->l_resource);
+                wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode);
+                LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
+        } else if (flags == LDLM_FL_WAIT_NOREPROC) { 
+                wait_event(lock->l_waitq, lock->l_req_mode == lock->l_granted_mode);
+        } else if (flags == 0) { 
+                wake_up(&lock->l_waitq);
+
+        }
+
+        RETURN(0);
+}
+
+
+static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
+                                  struct lustre_handle *parent_lockh,
+                                  __u64 *res_id,
+                                  __u32 type,
+                                  void *cookie, int cookielen,
+                                  ldlm_mode_t mode,
+                                  int *flags,
+                                  ldlm_completion_callback completion,
+                                  ldlm_lock_callback callback,
+                                  void *data,
+                                  __u32 data_len,
+                                  struct lustre_handle *lockh)
+{
+        struct ldlm_lock *lock; 
+        int err; 
+
+        if (ns->ns_client) { 
+                CERROR("Trying to cancel local lock\n"); 
+                LBUG();
+        }
+
+        lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, NULL, 0);
+        if (!lock)
+                GOTO(out_nolock, err = -ENOMEM);
+        LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
+
+        ldlm_lock_addref_internal(lock, mode);
+        ldlm_lock2handle(lock, lockh);
+        lock->l_connh = NULL; 
+
+        err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, callback);
+        if (err != ELDLM_OK)
+                GOTO(out, err);
+
+        if (type == LDLM_EXTENT)
+                memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent));
+        if ((*flags) & LDLM_FL_LOCK_CHANGED)
+                memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id));
+
+        LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)", lock);
+
+        if (lock->l_completion_ast)
+                lock->l_completion_ast(lock, *flags); 
+
+        LDLM_DEBUG(lock, "client-side local enqueue END");
+        EXIT;
+ out:
+        LDLM_LOCK_PUT(lock);
+ out_nolock:
+        return err;
+}
+
+
+int ldlm_cli_enqueue(struct lustre_handle *connh, 
                      struct ptlrpc_request *req,
                      struct ldlm_namespace *ns,
                      struct lustre_handle *parent_lock_handle,
@@ -23,17 +101,25 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                      void *cookie, int cookielen,
                      ldlm_mode_t mode,
                      int *flags,
+                     ldlm_completion_callback completion,
                      ldlm_lock_callback callback,
                      void *data,
                      __u32 data_len,
                      struct lustre_handle *lockh)
 {
+        struct ptlrpc_connection *connection;
         struct ldlm_lock *lock;
         struct ldlm_request *body;
         struct ldlm_reply *reply;
         int rc, size = sizeof(*body), req_passed_in = 1;
         ENTRY;
 
+        if (connh == NULL) 
+                return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id, 
+                                              type, cookie, cookielen, mode, 
+                                              flags, completion, callback, data, data_len, lockh);
+        connection = client_conn2cli(connh)->cl_conn;
+
         *flags = 0;
         lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode,
                                 data, data_len);
@@ -45,8 +131,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         ldlm_lock2handle(lock, lockh);
 
         if (req == NULL) {
-                req = ptlrpc_prep_req2(cl, conn, connh, 
-                                       LDLM_ENQUEUE, 1, &size, NULL);
+                req = ptlrpc_prep_req2(connh, LDLM_ENQUEUE, 1, &size, NULL);
                 if (!req)
                         GOTO(out, rc = -ENOMEM);
                 req_passed_in = 0;
@@ -72,21 +157,9 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
                 size = sizeof(*reply);
                 req->rq_replen = lustre_msg_size(1, &size);
         }
-
-        lock->l_connection = ptlrpc_connection_addref(conn);
-        lock->l_client = cl;
-
-        /* I apologize in advance for what I am about to do.
-         *
-         * The MDS needs to send lock requests to itself, but it doesn't want to
-         * go through the whole hassle of setting up proper connections.  Soon,
-         * these will just be function calls, but until I have the time, just
-         * set this connection to level FULL so that they go through.
-         *
-         * Since the MDS is the only user of PLAIN locks right now, we can use
-         * that to distinguish.  Sorry. */
-        if (type == LDLM_PLAIN)
-                conn->c_level = LUSTRE_CONN_FULL;
+        lock->l_connh = connh; 
+        lock->l_connection = ptlrpc_connection_addref(connection);
+        lock->l_client = client_conn2cli(connh)->cl_client;
 
         rc = ptlrpc_queue_wait(req);
         /* FIXME: status check here? */
@@ -135,21 +208,11 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         if (!req_passed_in)
                 ptlrpc_free_req(req);
 
-        rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
+        rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
                                callback);
+        if (lock->l_completion_ast)
+                lock->l_completion_ast(lock, *flags); 
 
-        if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
-                      LDLM_FL_BLOCK_CONV)) {
-                /* Go to sleep until the lock is granted. */
-                /* FIXME: or cancelled. */
-                LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
-                           " sleeping");
-                ldlm_lock_dump(lock);
-#warning ldlm needs to time out
-                wait_event(lock->l_waitq,
-                           lock->l_req_mode == lock->l_granted_mode);
-                LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
-        }
         LDLM_DEBUG(lock, "client-side enqueue END");
         EXIT;
  out:
@@ -158,9 +221,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
         return rc;
 }
 
-int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
-                          struct ptlrpc_connection *conn,
-                          struct lustre_handle *connh, 
+int ldlm_match_or_enqueue(struct lustre_handle *connh, 
                           struct ptlrpc_request *req,
                           struct ldlm_namespace *ns,
                           struct lustre_handle *parent_lock_handle,
@@ -169,6 +230,7 @@ int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
                           void *cookie, int cookielen,
                           ldlm_mode_t mode,
                           int *flags,
+                          ldlm_completion_callback completion,
                           ldlm_lock_callback callback,
                           void *data,
                           __u32 data_len,
@@ -178,9 +240,9 @@ int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
         ENTRY;
         rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
         if (rc == 0) {
-                rc = ldlm_cli_enqueue(cl, conn, connh, req, ns,
+                rc = ldlm_cli_enqueue(connh, req, ns,
                                       parent_lock_handle, res_id, type, cookie,
-                                      cookielen, mode, flags, callback, data,
+                                      cookielen, mode, flags, completion, callback, data,
                                       data_len, lockh);
                 if (rc != ELDLM_OK)
                         CERROR("ldlm_cli_enqueue: err: %d\n", rc);
@@ -237,11 +299,29 @@ int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
         return rc;
 }
 
-int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
-                     struct lustre_handle *connh, 
-                     int new_mode, int *flags)
+
+
+static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, int *flags)
+{
+
+        if (lock->l_resource->lr_namespace->ns_client) { 
+                CERROR("Trying to cancel local lock\n"); 
+                LBUG();
+        }
+        LDLM_DEBUG(lock, "client-side local convert");
+
+        ldlm_lock_convert(lock, new_mode, flags); 
+        ldlm_reprocess_all(lock->l_resource);
+
+        LDLM_DEBUG(lock, "client-side local convert handler END");
+        LDLM_LOCK_PUT(lock);
+        RETURN(0);
+}
+
+int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
 {
         struct ldlm_request *body;
+        struct lustre_handle *connh;
         struct ldlm_reply *reply;
         struct ldlm_lock *lock;
         struct ldlm_resource *res;
@@ -255,11 +335,14 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
                 RETURN(-EINVAL);
         }
         *flags = 0;
+        connh = lock->l_connh; 
+
+        if (!connh)
+                return ldlm_cli_convert_local(lock, new_mode, flags);
 
         LDLM_DEBUG(lock, "client-side convert");
 
-        req = ptlrpc_prep_req(cl, lock->l_connection,
-                               LDLM_CONVERT, 1, &size, NULL);
+        req = ptlrpc_prep_req2(connh, LDLM_CONVERT, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -282,15 +365,10 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh,
         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
         if (res != NULL)
                 ldlm_reprocess_all(res);
-        if (lock->l_req_mode != lock->l_granted_mode) {
-                /* Go to sleep until the lock is granted. */
-                /* FIXME: or cancelled. */
-                CDEBUG(D_NET, "convert returned a blocked lock, "
-                       "going to sleep.\n");
-                wait_event(lock->l_waitq,
-                           lock->l_req_mode == lock->l_granted_mode);
-                CDEBUG(D_NET, "waking up, the lock must be granted.\n");
-        }
+        /* Go to sleep until the lock is granted. */
+        /* FIXME: or cancelled. */
+        if (lock->l_completion_ast)
+                lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); 
         EXIT;
  out:
         LDLM_LOCK_PUT(lock);
@@ -303,7 +381,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
         struct ptlrpc_request *req;
         struct ldlm_lock *lock;
         struct ldlm_request *body;
-        int rc, size = sizeof(*body);
+        int rc = 0, size = sizeof(*body);
         ENTRY;
 
         lock = ldlm_handle2lock(lockh); 
@@ -317,25 +395,36 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
                 RETURN(-EINVAL);
         }
 
-        LDLM_DEBUG(lock, "client-side cancel");
-        req = ptlrpc_prep_req(lock->l_client, lock->l_connection,
-                              LDLM_CANCEL, 1, &size, NULL);
-        if (!req)
-                GOTO(out, rc = -ENOMEM);
-
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
-        memcpy(&body->lock_handle1, &lock->l_remote_handle,
-               sizeof(body->lock_handle1));
-
-        req->rq_replen = lustre_msg_size(0, NULL);
-
-        rc = ptlrpc_queue_wait(req);
-        rc = ptlrpc_check_status(req, rc);
-        ptlrpc_free_req(req);
-        if (rc != ELDLM_OK)
-                GOTO(out, rc);
+        if (lock->l_connh) { 
+                LDLM_DEBUG(lock, "client-side cancel");
+                req = ptlrpc_prep_req2(lock->l_connh, LDLM_CANCEL, 1, &size, NULL);
+                if (!req)
+                        GOTO(out, rc = -ENOMEM);
+                
+                body = lustre_msg_buf(req->rq_reqmsg, 0);
+                memcpy(&body->lock_handle1, &lock->l_remote_handle,
+                       sizeof(body->lock_handle1));
+                
+                req->rq_replen = lustre_msg_size(0, NULL);
+                
+                rc = ptlrpc_queue_wait(req);
+                rc = ptlrpc_check_status(req, rc);
+                ptlrpc_free_req(req);
+                if (rc != ELDLM_OK)
+                        GOTO(out, rc);
+                
+                ldlm_lock_cancel(lock);
+        } else { 
+                LDLM_DEBUG(lock, "client-side local cancel");
+                if (lock->l_resource->lr_namespace->ns_client) { 
+                        CERROR("Trying to cancel local lock\n"); 
+                        LBUG();
+                }
+                ldlm_lock_cancel(lock); 
+                ldlm_reprocess_all(lock->l_resource); 
+                LDLM_DEBUG(lock, "client-side local cancel handler END");
+        }
 
-        ldlm_lock_cancel(lock);
         EXIT;
  out:
         LDLM_LOCK_PUT(lock);
index 8cd970a..6b5daba 100644 (file)
@@ -26,8 +26,6 @@ struct ldlm_test_thread {
 static spinlock_t ctl_lock = SPIN_LOCK_UNLOCKED;
 static struct list_head ctl_threads;
 static int regression_running = 0;
-static struct ptlrpc_client ctl_client;
-static struct ptlrpc_connection *ctl_conn;
 
 static int ldlm_test_callback(struct lustre_handle *lockh,
                               struct ldlm_lock_desc *new,
@@ -63,7 +61,7 @@ int ldlm_test_basics(struct obd_device *obddev)
         if (lock1 == NULL)
                 LBUG();
         err = ldlm_lock_enqueue(lock1, NULL, 0, &flags,
-                                ldlm_test_callback, ldlm_test_callback);
+                                ldlm_completion_ast, ldlm_test_callback);
         if (err != ELDLM_OK)
                 LBUG();
 
@@ -71,7 +69,7 @@ int ldlm_test_basics(struct obd_device *obddev)
         if (lock == NULL)
                 LBUG();
         err = ldlm_lock_enqueue(lock, NULL, 0, &flags,
-                                ldlm_test_callback, ldlm_test_callback);
+                                ldlm_completion_ast, ldlm_test_callback);
         if (err != ELDLM_OK)
                 LBUG();
         if (!(flags & LDLM_FL_BLOCK_GRANTED))
@@ -162,7 +160,6 @@ int ldlm_test_extents(struct obd_device *obddev)
 static int ldlm_test_network(struct obd_device *obddev,
                              struct ptlrpc_connection *conn)
 {
-        struct ldlm_obd *ldlm = &obddev->u.ldlm;
 
         __u64 res_id[RES_NAME_SIZE] = {1, 2, 3};
         struct ldlm_extent ext = {4, 6};
@@ -172,9 +169,8 @@ static int ldlm_test_network(struct obd_device *obddev,
 
         /* FIXME: this needs a connh as 3rd paramter, before it will work */
 
-        err = ldlm_cli_enqueue(ldlm->ldlm_client, conn, NULL, NULL,
-                               obddev->obd_namespace, NULL, res_id, LDLM_EXTENT,
-                               &ext, sizeof(ext), LCK_PR, &flags, NULL, NULL, 0,
+        err = ldlm_cli_enqueue(NULL, NULL, obddev->obd_namespace, NULL, res_id, LDLM_EXTENT,
+                               &ext, sizeof(ext), LCK_PR, &flags, NULL, NULL, NULL, 0,
                                &lockh1);
         CERROR("ldlm_cli_enqueue: %d\n", err);
         if (err == ELDLM_OK)
@@ -223,9 +219,9 @@ static int ldlm_test_main(void *data)
                 get_random_bytes(&random, sizeof(random));
                 lock_mode = random % LCK_NL + 1;
 
-                rc = ldlm_cli_enqueue(&ctl_client, ctl_conn, NULL, NULL, ns, NULL,
+                rc = ldlm_cli_enqueue(NULL, NULL, ns, NULL,
                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                      &flags, ldlm_test_callback, NULL, 0,
+                                      &flags, ldlm_completion_ast, ldlm_test_callback, NULL, 0,
                                       &lockh);
                 if (rc < 0) {
                         CERROR("ldlm_cli_enqueue: %d\n", rc);
index f08a626..b7ed75c 100644 (file)
 #include <linux/lustre_net.h>
 #include <linux/lustre_dlm.h>
 
+struct client_obd *client_conn2cli(struct lustre_handle *conn) 
+{
+        struct obd_export *export = class_conn2export(conn);
+        if (!export)
+                LBUG();
+        return &export->exp_obd->u.cli;
+}
+extern struct recovd_obd *ptlrpc_connmgr;
+
+int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
+{
+        struct obd_ioctl_data* data = buf;
+        int rq_portal = (obddev->obd_type->typ_ops->o_getattr) ? OST_REQUEST_PORTAL : MDS_REQUEST_PORTAL;
+        int rp_portal = (obddev->obd_type->typ_ops->o_getattr) ? OSC_REPLY_PORTAL : MDC_REPLY_PORTAL;
+        struct client_obd *mdc = &obddev->u.cli;
+        char server_uuid[37];
+        int rc;
+        ENTRY;
+
+        if (data->ioc_inllen1 < 1) {
+                CERROR("requires a TARGET UUID\n");
+                RETURN(-EINVAL);
+        }
+
+        if (data->ioc_inllen1 > 37) {
+                CERROR("client UUID must be less than 38 characters\n");
+                RETURN(-EINVAL);
+        }
+
+        if (data->ioc_inllen2 < 1) {
+                CERROR("setup requires a SERVER UUID\n");
+               RETURN(-EINVAL);
+        }
+
+        if (data->ioc_inllen2 > 37) {
+                CERROR("target UUID must be less than 38 characters\n");
+                RETURN(-EINVAL);
+        }
+
+        sema_init(&mdc->cl_sem, 1);
+        mdc->cl_conn_count = 0;
+        memcpy(mdc->cl_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
+        memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
+                                                   sizeof(server_uuid)));
+
+        mdc->cl_conn = ptlrpc_uuid_to_connection(server_uuid);
+        if (!mdc->cl_conn)
+                RETURN(-ENOENT); 
+
+        OBD_ALLOC(mdc->cl_client, sizeof(*mdc->cl_client));
+        if (mdc->cl_client == NULL)
+                GOTO(out_conn, rc = -ENOMEM);
+
+        OBD_ALLOC(mdc->cl_ldlm_client, sizeof(*mdc->cl_ldlm_client));
+        if (mdc->cl_ldlm_client == NULL)
+                GOTO(out_client, rc = -ENOMEM);
+
+        /* XXX get recovery hooked in here again */
+        //ptlrpc_init_client(ptlrpc_connmgr, ll_recover,...
+
+        ptlrpc_init_client(NULL, NULL, rq_portal, rp_portal, mdc->cl_client);
+        ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+                           mdc->cl_ldlm_client);
+        mdc->cl_client->cli_name = "mdc";
+        mdc->cl_ldlm_client->cli_name = "ldlm";
+        mdc->cl_max_mdsize = sizeof(struct lov_stripe_md);
+
+        MOD_INC_USE_COUNT;
+        RETURN(0);
+
+ out_client:
+        OBD_FREE(mdc->cl_client, sizeof(*mdc->cl_client));
+ out_conn:
+        ptlrpc_put_connection(mdc->cl_conn);
+        return rc;
+}
+
+int client_obd_cleanup(struct obd_device * obddev)
+{
+        struct client_obd *mdc = &obddev->u.cli;
+
+        ptlrpc_cleanup_client(mdc->cl_client);
+        OBD_FREE(mdc->cl_client, sizeof(*mdc->cl_client));
+        ptlrpc_cleanup_client(mdc->cl_ldlm_client);
+        OBD_FREE(mdc->cl_ldlm_client, sizeof(*mdc->cl_ldlm_client));
+        ptlrpc_put_connection(mdc->cl_conn);
+
+        MOD_DEC_USE_COUNT;
+        return 0;
+}
+
+int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd)
+{
+        struct client_obd *cli = &obd->u.cli;
+        struct ptlrpc_request *request;
+        int rc, size = sizeof(cli->cl_target_uuid);
+        char *tmp = cli->cl_target_uuid;
+        int rq_opc = (obd->obd_type->typ_ops->o_getattr) ? OST_CONNECT : MDS_CONNECT;
+
+        ENTRY;
+        down(&cli->cl_sem); 
+        MOD_INC_USE_COUNT;
+        rc = class_connect(conn, obd);
+        if (!rc) 
+                cli->cl_conn_count++;
+        else { 
+                MOD_DEC_USE_COUNT;
+                up(&cli->cl_sem);
+                RETURN(rc);
+        }
+                
+        if (cli->cl_conn_count > 1) { 
+                up(&cli->cl_sem);
+                RETURN(0);
+        }
+
+        obd->obd_namespace =
+                ldlm_namespace_new("cli", LDLM_NAMESPACE_CLIENT);
+        if (obd->obd_namespace == NULL) { 
+                up(&cli->cl_sem);
+                RETURN(-ENOMEM);
+        }
+
+        request = ptlrpc_prep_req(cli->cl_client, cli->cl_conn, rq_opc, 1, &size, &tmp);
+        if (!request)
+                GOTO(out_disco, -ENOMEM);
+
+        request->rq_level = LUSTRE_CONN_NEW;
+        request->rq_replen = lustre_msg_size(0, NULL);
+        /* Sending our local connection info breaks for local connections
+        request->rq_reqmsg->addr = conn->addr;
+        request->rq_reqmsg->cookie = conn->cookie;
+         */
+
+        rc = ptlrpc_queue_wait(request);
+        rc = ptlrpc_check_status(request, rc);
+        if (rc)
+                GOTO(out, rc);
+
+        request->rq_connection->c_level = LUSTRE_CONN_FULL;
+        cli->cl_exporth = *(struct lustre_handle *)request->rq_repmsg;
+
+        EXIT;
+ out:
+        ptlrpc_free_req(request);
+ out_disco:
+        if (rc) {
+                class_disconnect(conn);
+                MOD_DEC_USE_COUNT;
+        }
+        up(&cli->cl_sem);
+        return rc;
+}
+
+int client_obd_disconnect(struct lustre_handle *conn)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        int rq_opc = (obd->obd_type->typ_ops->o_getattr) ? OST_DISCONNECT : MDS_DISCONNECT;
+        struct ptlrpc_request *request = NULL;
+        int rc;
+        ENTRY;
+
+        down(&obd->u.cli.cl_sem);
+        if (!obd->u.cli.cl_conn_count) { 
+                CERROR("disconnecting disconnected device (%s)\n", 
+                       obd->obd_name); 
+                RETURN(0);
+        }
+
+        obd->u.cli.cl_conn_count--; 
+        if (obd->u.cli.cl_conn_count)
+                GOTO(class_only, 0);
+
+        ldlm_namespace_free(obd->obd_namespace);
+        obd->obd_namespace = NULL; 
+        request = ptlrpc_prep_req2(conn, rq_opc, 0, NULL, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
+
+        request->rq_replen = lustre_msg_size(0, NULL);
+
+        rc = ptlrpc_queue_wait(request);
+        if (rc) 
+                GOTO(out, rc);
+ class_only:
+        rc = class_disconnect(conn);
+        if (!rc)
+                MOD_DEC_USE_COUNT;
+ out:
+        if (request) 
+                ptlrpc_free_req(request);
+        up(&obd->u.cli.cl_sem);
+        RETURN(rc);
+}
+
 int target_handle_connect(struct ptlrpc_request *req)
 {
         struct obd_device *target;
index ad51941..d8b9b3e 100644 (file)
 void lov_packdesc(struct lov_desc *ld)
 {
         ld->ld_tgt_count = HTON__u32(ld->ld_tgt_count); 
-        ld->ld_default_stripecount = HTON__u32(ld->ld_default_stripecount); 
-        ld->ld_default_stripesize = HTON__u32(ld->ld_default_stripesize); 
+        ld->ld_default_stripe_count = HTON__u32(ld->ld_default_stripe_count); 
+        ld->ld_default_stripe_size = HTON__u32(ld->ld_default_stripe_size); 
         ld->ld_pattern = HTON__u32(ld->ld_pattern); 
 }
 
 void lov_unpackdesc(struct lov_desc *ld)
 {
         ld->ld_tgt_count = NTOH__u32(ld->ld_tgt_count); 
-        ld->ld_default_stripecount = HTON__u32(ld->ld_default_stripecount); 
-        ld->ld_default_stripesize = HTON__u32(ld->ld_default_stripesize); 
+        ld->ld_default_stripe_count = HTON__u32(ld->ld_default_stripe_count); 
+        ld->ld_default_stripe_size = HTON__u32(ld->ld_default_stripe_size); 
         ld->ld_pattern = HTON__u32(ld->ld_pattern); 
 }
index cca46cc..41ff0b8 100644 (file)
@@ -49,7 +49,7 @@ static int ll_file_open(struct inode *inode, struct file *file)
         /*  XXX object needs to be cleaned up if mdc_open fails */
         /*  XXX error handling appropriate here? */
         if (lli->lli_smd == NULL || lli->lli_smd->lmd_object_id == 0) {
-                struct mdc_obd *mdc = sbi2mdc(ll_s2sbi(inode->i_sb));
+                struct client_obd *mdc = sbi2mdc(ll_s2sbi(inode->i_sb));
                 struct inode * inode = file->f_dentry->d_inode;
 
                 lli->lli_smd = NULL; 
@@ -59,7 +59,7 @@ static int ll_file_open(struct inode *inode, struct file *file)
                 }
                 oa->o_valid = OBD_MD_FLMODE;
                 oa->o_mode = S_IFREG | 0600;
-                oa->o_easize = mdc->mdc_max_mdsize;
+                oa->o_easize = mdc->cl_max_mdsize;
                 rc = obd_create(ll_i2obdconn(inode), oa, &lli->lli_smd);
                 if (rc)
                         RETURN(rc);
index 1ed4375..e8a8ee9 100644 (file)
@@ -356,7 +356,7 @@ static int ll_create(struct inode * dir, struct dentry * dentry, int mode)
         struct obdo oa;
         struct inode *inode;
         struct lov_stripe_md *smd;
-        struct ll_inode_info *ii;
+        struct ll_inode_info *ii = NULL;
 
         if (dentry->d_it->it_disposition == 0) {
                 memset(&oa, 0, sizeof(oa));
@@ -395,7 +395,7 @@ static int ll_create(struct inode * dir, struct dentry * dentry, int mode)
         RETURN(rc);
 
 out_destroy:
-        oa.o_easize = ii->lli_smd->lmd_size;
+        oa.o_easize = ii->lli_smd->lmd_easize;
         err = obd_destroy(ll_i2obdconn(dir), &oa, ii->lli_smd);
         if (err)
                 CERROR("error destroying object %Ld in error path: err = %d\n",
index d642e3d..122ff36 100644 (file)
@@ -31,15 +31,15 @@ static int ll_reconnect(struct ll_sb_info *sbi)
         int err;
         struct ptlrpc_request *request; 
 
-        ptlrpc_readdress_connection(sbi2mdc(sbi)->mdc_conn, "mds");
+        ptlrpc_readdress_connection(sbi2mdc(sbi)->cl_conn, "mds");
 
-        err = connmgr_connect(ptlrpc_connmgr, sbi2mdc(sbi)->mdc_conn);
+        err = connmgr_connect(ptlrpc_connmgr, sbi2mdc(sbi)->cl_conn);
         if (err) {
                 CERROR("cannot connect to MDS: rc = %d\n", err);
-                ptlrpc_put_connection(sbi2mdc(sbi)->mdc_conn);
+                ptlrpc_put_connection(sbi2mdc(sbi)->cl_conn);
                 GOTO(out_disc, err = -ENOTCONN);
         }
-        sbi2mdc(sbi)->mdc_conn->c_level = LUSTRE_CONN_CON;
+        sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_CON;
 
         /* XXX: need to store the last_* values somewhere */
         err = mdc_getstatus(&sbi->ll_mdc_conn,
@@ -51,8 +51,8 @@ static int ll_reconnect(struct ll_sb_info *sbi)
                 CERROR("cannot mds_connect: rc = %d\n", err);
                 GOTO(out_disc, err = -ENOTCONN);
         }
-        sbi2mdc(sbi)->mdc_client->cli_last_rcvd = last_xid;
-        sbi2mdc(sbi)->mdc_conn->c_level = LUSTRE_CONN_RECOVD;
+        sbi2mdc(sbi)->cl_client->cli_last_rcvd = last_xid;
+        sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_RECOVD;
 
  out_disc:
         return err;
@@ -126,7 +126,7 @@ int ll_recover(struct ptlrpc_client *cli)
 
         }
 
-        sbi2mdc(sbi)->mdc_conn->c_level = LUSTRE_CONN_FULL;
+        sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_FULL;
         recovd_cli_fixed(cli);
 
         /* Finally, continue what we delayed since recovery started */
index 32b45de..5ce6087 100644 (file)
@@ -200,11 +200,14 @@ void ll_truncate(struct inode *inode)
 
         CDEBUG(D_INFO, "calling punch for %ld (all bytes after %Ld)\n",
                (long)oa.o_id, (unsigned long long)oa.o_size);
+
         oa.o_size = inode->i_size;
         oa.o_id = md->lmd_object_id;
-        oa.o_valid = OBD_MD_FLSIZE | OBD_MD_FLID;
-        /* truncate == punch from i_size onwards */
-        err = obd_punch(ll_i2obdconn(inode), &oa, md, -1 - oa.o_size, oa.o_size);
+        oa.o_valid = OBD_MD_FLID;
+        /* truncate == punch to/from start from/to end:
+           set end to -1 for that. */
+        err = obd_punch(ll_i2obdconn(inode), &oa, md, oa.o_size, 
+                        0xffffffffffffffff);
         if (err)
                 CERROR("obd_truncate fails (%d)\n", err);
         else
index bb32159..4e22cdc 100644 (file)
@@ -64,8 +64,8 @@ static void ll_options(char *options, char **ost, char **mds)
              this_char != NULL;
              this_char = strtok (NULL, ",")) {
                 CDEBUG(D_SUPER, "this_char %s\n", this_char);
-                if ( (!*ost && (*ost = ll_read_opt("ost", this_char)))||
-                     (!*mds && (*mds = ll_read_opt("mds", this_char))) )
+                if ( (!*ost && (*ost = ll_read_opt("osc", this_char)))||
+                     (!*mds && (*mds = ll_read_opt("mdc", this_char))) )
                         continue;
         }
         EXIT;
@@ -81,8 +81,8 @@ static struct super_block * ll_read_super(struct super_block *sb,
         struct inode *root = 0;
         struct obd_device *obd;
         struct ll_sb_info *sbi;
-        char *ost = NULL;
-        char *mds = NULL;
+        char *osc = NULL;
+        char *mdc = NULL;
         int err;
         struct ll_fid rootfid;
         struct statfs sfs;
@@ -102,47 +102,47 @@ static struct super_block * ll_read_super(struct super_block *sb,
 
         sb->u.generic_sbp = sbi;
 
-        ll_options(data, &ost, &mds);
+        ll_options(data, &osc, &mdc);
 
-        if (!ost) {
-                CERROR("no ost\n");
+        if (!osc) {
+                CERROR("no osc\n");
                 GOTO(out_free, sb = NULL);
         }
 
-        if (!mds) {
-                CERROR("no mds\n");
+        if (!mdc) {
+                CERROR("no mdc\n");
                 GOTO(out_free, sb = NULL);
         }
 
-        obd = class_uuid2obd(mds); 
+        obd = class_uuid2obd(mdc); 
         if (!obd) {
-                CERROR("MDS %s: not setup or attached\n", mds);
+                CERROR("MDC %s: not setup or attached\n", mdc);
                 GOTO(out_free, sb = NULL);
         }
 
 #if 0
-        err = connmgr_connect(ptlrpc_connmgr, sbi->ll_mds_conn);
+        err = connmgr_connect(ptlrpc_connmgr, sbi->ll_mdc_conn);
         if (err) {
-                CERROR("cannot connect to MDS: rc = %d\n", err);
+                CERROR("cannot connect to MDC: rc = %d\n", err);
                 GOTO(out_rpc, sb = NULL);
         }
 #endif 
 
         err = obd_connect(&sbi->ll_mdc_conn, obd);
         if (err) {
-                CERROR("cannot connect to %s: rc = %d\n", mds, err);
+                CERROR("cannot connect to %s: rc = %d\n", mdc, err);
                 GOTO(out_free, sb = NULL);
         }
-        sbi2mdc(sbi)->mdc_conn->c_level = LUSTRE_CONN_FULL;
+        sbi2mdc(sbi)->cl_conn->c_level = LUSTRE_CONN_FULL;
 
-        obd = class_uuid2obd(ost);
+        obd = class_uuid2obd(osc);
         if (!obd) {
-                CERROR("OST %s: not setup or attached\n", ost);
+                CERROR("OSC %s: not setup or attached\n", osc);
                 GOTO(out_mdc, sb = NULL);
         }
         err = obd_connect(&sbi->ll_osc_conn, obd);
         if (err) {
-                CERROR("cannot connect to %s: rc = %d\n", ost, err);
+                CERROR("cannot connect to %s: rc = %d\n", osc, err);
                 GOTO(out_mdc, sb = NULL);
         }
 
@@ -150,7 +150,7 @@ static struct super_block * ll_read_super(struct super_block *sb,
         err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid, &last_committed,
                             &last_rcvd, &last_xid, &request);
         if (err) {
-                CERROR("cannot mds_connect: rc = %d\n", err);
+                CERROR("cannot mdc_connect: rc = %d\n", err);
                 GOTO(out_request, sb = NULL);
         }
         CDEBUG(D_SUPER, "rootfid %Ld\n", (unsigned long long)rootfid.id);
@@ -199,10 +199,10 @@ static struct super_block * ll_read_super(struct super_block *sb,
         ptlrpc_free_req(request);
 
 out_dev:
-        if (mds)
-                OBD_FREE(mds, strlen(mds) + 1);
-        if (ost)
-                OBD_FREE(ost, strlen(ost) + 1);
+        if (mdc)
+                OBD_FREE(mdc, strlen(mdc) + 1);
+        if (osc)
+                OBD_FREE(osc, strlen(osc) + 1);
 
         RETURN(sb);
 
@@ -240,7 +240,7 @@ static void ll_clear_inode(struct inode *inode)
                 struct lov_stripe_md *md = lli->lli_smd;
 
                 if (md) {
-                        OBD_FREE(md, md->lmd_size); 
+                        OBD_FREE(md, md->lmd_easize); 
                         lli->lli_smd = NULL;
                 }
                 if (lli->lli_symlink_name) {
@@ -262,7 +262,7 @@ static void ll_delete_inode(struct inode *inode)
                         GOTO(out, -EINVAL);
 
                 oa.o_id = md->lmd_object_id;
-                oa.o_easize = md->lmd_size;
+                oa.o_easize = md->lmd_easize;
                 if (oa.o_id == 0) {
                         CERROR("This really happens\n");
                         /* No obdo was ever created */
@@ -382,8 +382,8 @@ out:
 
 inline int ll_stripe_md_size(struct super_block *sb)
 {
-        struct mdc_obd *mdc = sbi2mdc(ll_s2sbi(sb));
-        return mdc->mdc_max_mdsize;
+        struct client_obd *mdc = sbi2mdc(ll_s2sbi(sb));
+        return mdc->cl_max_mdsize;
 }
 
 static void ll_to_inode(struct inode *dst, struct ll_inode_md *md)
@@ -419,7 +419,7 @@ static void ll_to_inode(struct inode *dst, struct ll_inode_md *md)
         if (md && md->md && md->md->lmd_stripe_count) { 
                 struct lov_stripe_md *smd = md->md;
                 int size = ll_stripe_md_size(dst->i_sb);
-                if (md->md->lmd_size != size) { 
+                if (md->md->lmd_easize != size) { 
                         CERROR("Striping metadata size error %ld\n",
                                dst->i_ino); 
                         LBUG();
index 2cd84ab..a6cae4b 100644 (file)
@@ -5,6 +5,7 @@
  *
  * Copyright (C) 2002 Cluster File Systems, Inc.
  * Author: Phil Schwan <phil@off.net>
+ *         Peter Braam <braam@clusterfs.com>
  *
  * This code is issued under the GNU General Public License.
  * See the file COPYING in this distribution
 extern struct obd_device obd_dev[MAX_OBD_DEVICES];
 
 /* obd methods */
-
 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
 {
         struct ptlrpc_request *req;
         struct lov_obd *lov = &obd->u.lov;
         struct lustre_handle mdc_conn;
         uuid_t *uuidarray;
-        int rc;
+        int rc, rc2;
         int i;
 
         MOD_INC_USE_COUNT;
@@ -44,6 +44,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
                 RETURN(rc);
         }
 
+        /* retrieve LOV metadata from MDS */
         rc = obd_connect(&mdc_conn, lov->mdcobd);
         if (rc) {
                 CERROR("cannot connect to mdc: rc = %d\n", rc);
@@ -51,26 +52,23 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
         }
 
         rc = mdc_getlovinfo(obd, &mdc_conn, &uuidarray, &req);
-        obd_disconnect(&mdc_conn);
-
-        if (rc) {
-                CERROR("cannot get lov info %d\n", rc);
-                GOTO(out, rc);
-        }
-
-
-        if (lov->desc.ld_tgt_count > 1000) {
-                CERROR("configuration error: target count > 1000 (%d)\n",
-                       lov->desc.ld_tgt_count);
-                GOTO(out, rc = -EINVAL);
+        rc2 = obd_disconnect(&mdc_conn);
+        if (rc || rc2) {
+                CERROR("cannot get lov info or disconnect %d/%d\n", rc, rc2);
+                GOTO(out, (rc) ? rc : rc2 );
         }
 
+        /* sanity... */
         if (strcmp(obd->obd_uuid, lov->desc.ld_uuid)) {
                 CERROR("lov uuid %s not on mds device (%s)\n",
                        obd->obd_uuid, lov->desc.ld_uuid);
                 GOTO(out, rc = -EINVAL);
         }
-
+        if (lov->desc.ld_tgt_count > 1000) {
+                CERROR("configuration error: target count > 1000 (%d)\n",
+                       lov->desc.ld_tgt_count);
+                GOTO(out, rc = -EINVAL);
+        }
         if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[1] <
             sizeof(uuid_t) * lov->desc.ld_tgt_count) {
                 CERROR("invalid uuid array returned\n");
@@ -91,9 +89,13 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
         for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
                 struct obd_device *tgt = class_uuid2obd(uuidarray[i]);
                 if (!tgt) {
-                        CERROR("Target %s not configured\n", uuidarray[i]);
+                        CERROR("Target %s not attached\n", uuidarray[i]);
                         GOTO(out_mem, rc = -EINVAL);
                 }
+                if (!(tgt->obd_flags & OBD_SET_UP)) {
+                        CERROR("Target %s not set up\n", uuidarray[i]);
+                        GOTO(out_mem, rc = -EINVAL);
+                }            
                 rc = obd_connect(&lov->tgts[i].conn, tgt);
                 if (rc) {
                         CERROR("Target %s connect error %d\n",
@@ -105,7 +107,6 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
  out_mem:
         if (rc) {
                 for (i = 0 ; i < lov->desc.ld_tgt_count; i++) {
-                        int rc2;
                         rc2 = obd_disconnect(&lov->tgts[i].conn);
                         if (rc2)
                                 CERROR("BAD: Target %s disconnect error %d\n",
@@ -116,7 +117,6 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd)
  out:
         if (rc)
                 class_disconnect(conn);
-
         ptlrpc_free_req(req);
         return rc;
 }
@@ -144,11 +144,9 @@ static int lov_disconnect(struct lustre_handle *conn)
         lov->tgts = NULL; 
 
  out_local:
-
         rc = class_disconnect(conn);
         if (!rc)
                 MOD_DEC_USE_COUNT;
-
         return rc;
 }
 
@@ -169,8 +167,6 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
                 RETURN(-EINVAL);
         }
 
-        /* FIXME: we should make a connection instead perhaps to avoid
-           the mdc from walking away? The fs guarantees this. */ 
         lov->mdcobd = class_uuid2obd(data->ioc_inlbuf1); 
         if (!lov->mdcobd) { 
                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid, 
@@ -191,6 +187,7 @@ static inline int lov_stripe_md_size(struct obd_device *obd)
         return size;
 }
 
+/* the LOV counts on oa->o_id to be set as the LOV object id */
 static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea)
 {
         int rc = 0, i;
@@ -204,7 +201,6 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_st
                 CERROR("lov_create needs EA for striping information\n"); 
                 RETURN(-EINVAL); 
         }
-
         if (!export)
                 RETURN(-EINVAL);
         lov = &export->exp_obd->u.lov;
@@ -217,10 +213,10 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_st
         }
 
         md = *ea;
-        md->lmd_size = oa->o_easize;
+        md->lmd_easize = oa->o_easize;
         md->lmd_object_id = oa->o_id;
         if (!md->lmd_stripe_count) { 
-                md->lmd_stripe_count = lov->desc.ld_default_stripecount;
+                md->lmd_stripe_count = lov->desc.ld_default_stripe_count;
         }
 
         for (i = 0; i < md->lmd_stripe_count; i++) {
@@ -252,16 +248,15 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_st
 }
 
 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, 
-struct lov_stripe_md *ea)
+                       struct lov_stripe_md *md)
 {
         int rc = 0, i;
         struct obdo tmp;
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
-        struct lov_stripe_md *md;
         ENTRY;
 
-        if (!ea) { 
+        if (!md) { 
                 CERROR("LOV requires striping ea for desctruction\n"); 
                 RETURN(-EINVAL); 
         }
@@ -270,8 +265,6 @@ struct lov_stripe_md *ea)
                 RETURN(-ENODEV); 
 
         lov = &export->exp_obd->u.lov;
-        md = ea;
-
         for (i = 0; i < md->lmd_stripe_count; i++) {
                 /* create data objects with "parent" OA */ 
                 memcpy(&tmp, oa, sizeof(tmp));
@@ -285,105 +278,167 @@ struct lov_stripe_md *ea)
         RETURN(rc);
 }
 
-#if 0
-static int lov_getattr(struct lustre_handle *conn, struct obdo *oa)
+static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, 
+                       struct lov_stripe_md *md)
 {
-        int rc;
+        int rc = 0, i;
+        struct obdo tmp;
+        struct obd_export *export = class_conn2export(conn);
+        struct lov_obd *lov;
         ENTRY;
 
-        if (!class_conn2export(conn))
-                RETURN(-EINVAL);
+        if (!md) { 
+                CERROR("LOV requires striping ea for desctruction\n"); 
+                RETURN(-EINVAL); 
+        }
+
+        if (!export || !export->exp_obd) 
+                RETURN(-ENODEV); 
+
+        lov = &export->exp_obd->u.lov;
+        for (i = 0; i < md->lmd_stripe_count; i++) {
+                /* create data objects with "parent" OA */ 
+                memcpy(&tmp, oa, sizeof(tmp));
+                oa->o_id = md->lmd_objects[i].l_object_id; 
 
-        rc = obd_getattr(&conn->oc_dev->obd_multi_conn[0], oa);
+                rc = obd_getattr(&lov->tgts[i].conn, &tmp, NULL);
+                if (!rc) { 
+                        CERROR("Error getattr object %Ld on %d\n",
+                               oa->o_id, i); 
+                }
+                /* XXX can do something more sophisticated here... */
+                if (i == 0 ) {
+                        obd_id id = oa->o_id;
+                        memcpy(oa, &tmp, sizeof(tmp));
+                        oa->o_id = id;
+                }
+        }
         RETURN(rc);
 }
 
-static int lov_setattr(struct lustre_handle *conn, struct obdo *oa)
+static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, 
+                       struct lov_stripe_md *md)
 {
-        int rc, retval, i;
+        int rc = 0, i;
+        struct obdo tmp;
+        struct obd_export *export = class_conn2export(conn);
+        struct lov_obd *lov;
         ENTRY;
 
-        if (!class_conn2export(conn))
-                RETURN(-EINVAL);
-
-        for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
-                rc = obd_setattr(&conn->oc_dev->obd_multi_conn[i], oa);
-                if (i == 0)
-                        retval = rc;
-                else if (retval != rc)
-                        CERROR("different results on multiple OBDs!\n");
+        if (!md) { 
+                CERROR("LOV requires striping ea for desctruction\n"); 
+                RETURN(-EINVAL); 
         }
 
+        if (!export || !export->exp_obd) 
+                RETURN(-ENODEV); 
+
+        lov = &export->exp_obd->u.lov;
+        for (i = 0; i < md->lmd_stripe_count; i++) {
+                /* create data objects with "parent" OA */ 
+                memcpy(&tmp, oa, sizeof(tmp));
+                oa->o_id = md->lmd_objects[i].l_object_id; 
+
+                rc = obd_setattr(&lov->tgts[i].conn, &tmp, NULL);
+                if (!rc) { 
+                        CERROR("Error setattr object %Ld on %d\n",
+                               oa->o_id, i); 
+                }
+        }
         RETURN(rc);
 }
 
-static int lov_open(struct lustre_handle *conn, struct obdo *oa)
+static int lov_open(struct lustre_handle *conn, struct obdo *oa, 
+                    struct lov_stripe_md *md)
 {
-        int rc, retval, i;
+        int rc = 0, i;
+        struct obdo tmp;
+        struct obd_export *export = class_conn2export(conn);
+        struct lov_obd *lov;
         ENTRY;
 
-        if (!class_conn2export(conn))
-                RETURN(-EINVAL);
-
-        for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
-                rc = obd_open(&conn->oc_dev->obd_multi_conn[i], oa);
-                if (i == 0)
-                        retval = rc;
-                else if (retval != rc)
-                        CERROR("different results on multiple OBDs!\n");
+        if (!md) { 
+                CERROR("LOV requires striping ea for desctruction\n"); 
+                RETURN(-EINVAL); 
         }
 
+        if (!export || !export->exp_obd) 
+                RETURN(-ENODEV); 
+
+        lov = &export->exp_obd->u.lov;
+        for (i = 0; i < md->lmd_stripe_count; i++) {
+                /* create data objects with "parent" OA */ 
+                memcpy(&tmp, oa, sizeof(tmp));
+                oa->o_id = md->lmd_objects[i].l_object_id; 
+
+                rc = obd_open(&lov->tgts[i].conn, &tmp, NULL);
+                if (!rc) { 
+                        CERROR("Error getattr object %Ld on %d\n",
+                               oa->o_id, i); 
+                }
+        }
         RETURN(rc);
 }
 
-static int lov_close(struct lustre_handle *conn, struct obdo *oa)
+
+static int lov_close(struct lustre_handle *conn, struct obdo *oa, 
+                     struct lov_stripe_md *md)
 {
-        int rc, retval, i;
+        int rc = 0, i;
+        struct obdo tmp;
+        struct obd_export *export = class_conn2export(conn);
+        struct lov_obd *lov;
         ENTRY;
 
-        if (!class_conn2export(conn))
-                RETURN(-EINVAL);
-
-        for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
-                rc = obd_close(&conn->oc_dev->obd_multi_conn[i], oa);
-                if (i == 0)
-                        retval = rc;
-                else if (retval != rc)
-                        CERROR("different results on multiple OBDs!\n");
+        if (!md) { 
+                CERROR("LOV requires striping ea for desctruction\n"); 
+                RETURN(-EINVAL); 
         }
 
-        RETURN(rc);
-}
+        if (!export || !export->exp_obd) 
+                RETURN(-ENODEV); 
 
+        lov = &export->exp_obd->u.lov;
+        for (i = 0; i < md->lmd_stripe_count; i++) {
+                /* create data objects with "parent" OA */ 
+                memcpy(&tmp, oa, sizeof(tmp));
+                oa->o_id = md->lmd_objects[i].l_object_id; 
 
+                rc = obd_close(&lov->tgts[i].conn, &tmp, NULL);
+                if (!rc) { 
+                        CERROR("Error getattr object %Ld on %d\n",
+                               oa->o_id, i); 
+                }
+        }
+        RETURN(rc);
+}
 
-/* FIXME: maybe we'll just make one node the authoritative attribute node, then
- * we can send this 'punch' to just the authoritative node and the nodes
- * that the punch will affect. */
-static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
-                     obd_size count, obd_off offset)
+/* compute offset in stripe i corresponds to offset "in" */
+__u64 lov_offset(struct lov_stripe_md *md, __u64 in, int i)
 {
-        int rc, retval, i;
-        ENTRY;
-
-        if (!class_conn2export(conn))
-                RETURN(-EINVAL);
+        __u32 ssz = md->lmd_stripe_size;
+        /* full stripes across all * stripe size */
+        __u32 out = ( ((__u32)in) / (md->lmd_stripe_count * ssz)) * ssz;
+        __u32 off = (__u32)in % (md->lmd_stripe_count * ssz);
 
-        for (i = 0; i < conn->oc_dev->obd_multi_count; i++) {
-                rc = obd_punch(&conn->oc_dev->obd_multi_conn[i], oa, count,
-                               offset);
-                if (i == 0)
-                        retval = rc;
-                else if (retval != rc)
-                        CERROR("different results on multiple OBDs!\n");
+        if ( in == 0xffffffffffffffff ) {
+                return 0xffffffffffffffff;
         }
 
-        RETURN(rc);
+        if ( (i+1) * ssz < off ) 
+                out += ssz;
+        else if ( i * ssz > off ) 
+                out += 0;
+        else 
+                out += (off - (i * ssz)) % ssz;
+        
+        return (__u64) out;
 }
 
+
 struct lov_callback_data {
         atomic_t count;
-        wait_queue_head waitq;
+        wait_queue_head_t waitq;
 };
 
 static void lov_read_callback(struct ptlrpc_bulk_desc *desc, void *data)
@@ -400,7 +455,8 @@ static int lov_read_check_status(struct lov_callback_data *cb_data)
         if (sigismember(&(current->pending.signal), SIGKILL) ||
             sigismember(&(current->pending.signal), SIGTERM) ||
             sigismember(&(current->pending.signal), SIGINT)) {
-                cb_data->flags |= PTL_RPC_FL_INTR;
+                // FIXME XXX what here 
+                // cb_data->flags |= PTL_RPC_FL_INTR;
                 RETURN(1);
         }
         if (atomic_read(&cb_data->count) == 0)
@@ -408,7 +464,49 @@ static int lov_read_check_status(struct lov_callback_data *cb_data)
         RETURN(0);
 }
 
-/* buffer must lie in user memory here */
+
+/* FIXME: maybe we'll just make one node the authoritative attribute node, then
+ * we can send this 'punch' to just the authoritative node and the nodes
+ * that the punch will affect. */
+static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
+                     struct lov_stripe_md *md, 
+                     obd_off start, obd_off end)
+{
+        int rc = 0, i;
+        struct obdo tmp;
+        struct obd_export *export = class_conn2export(conn);
+        struct lov_obd *lov;
+        ENTRY;
+
+        if (!md) { 
+                CERROR("LOV requires striping ea for desctruction\n"); 
+                RETURN(-EINVAL); 
+        }
+
+        if (!export || !export->exp_obd) 
+                RETURN(-ENODEV); 
+
+        lov = &export->exp_obd->u.lov;
+        for (i = 0; i < md->lmd_stripe_count; i++) {
+                __u64 starti = lov_offset(md, start, i); 
+                __u64 endi = lov_offset(md, end, i); 
+                        
+                /* create data objects with "parent" OA */ 
+                memcpy(&tmp, oa, sizeof(tmp));
+                oa->o_id = md->lmd_objects[i].l_object_id; 
+
+                rc = obd_punch(&lov->tgts[i].conn, &tmp, NULL,
+                               starti, endi);
+                if (!rc) { 
+                        CERROR("Error punch object %Ld on %d\n",
+                               oa->o_id, i); 
+                }
+        }
+        RETURN(rc);
+}
+
+
+#if 0
 static int lov_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
                    struct obdo **oa,
                    obd_count *oa_bufs, struct page **buf,
@@ -568,11 +666,11 @@ struct obd_ops lov_obd_ops = {
         o_disconnect:  lov_disconnect,
         o_create:      lov_create,
         o_destroy:     lov_destroy,
-#if 0
         o_getattr:     lov_getattr,
         o_setattr:     lov_setattr,
         o_open:        lov_open,
         o_close:       lov_close,
+#if 0
         o_brw:         lov_pgcache_brw,
         o_punch:       lov_punch,
         o_enqueue:     lov_enqueue,
index 1b5d201..ab5fa58 100644 (file)
@@ -9,7 +9,7 @@ MODULE = mdc
 modulefs_DATA = mdc.o
 EXTRA_PROGRAMS = mdc
 
-LINX=mds_updates.c ll_pack.c lov_pack.c
+LINX=l_net.c mds_updates.c ll_pack.c lov_pack.c 
 mdc_SOURCES =  mdc_request.c mdc_reint.c $(LINX)
 lov_pack.c: 
        test -e lov_pack.c || ln -sf $(top_srcdir)/lib/lov_pack.c .
@@ -17,6 +17,9 @@ lov_pack.c:
 ll_pack.c: 
        test -e ll_pack.c || ln -sf $(top_srcdir)/lib/ll_pack.c .
 
+l_net.c: 
+       test -e l_net.c || ln -sf $(top_srcdir)/lib/l_net.c .
+
 mds_updates.c: 
        test -e mds_updates.c || ln -sf $(top_srcdir)/lib/mds_updates.c .
 
index c735ece..621fb35 100644 (file)
@@ -58,7 +58,7 @@ int mdc_setattr(struct lustre_handle *conn,
         ENTRY;
 
         mdc_con2cl(conn, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(cl, connection, rconn,
+        req = ptlrpc_prep_req2(conn, 
                                MDS_REINT, 1, &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
@@ -83,9 +83,6 @@ int mdc_create(struct lustre_handle *conn,
                struct ptlrpc_request **request)
 {
         struct mds_rec_create *rec;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct ptlrpc_request *req;
         int rc, size[3] = {sizeof(struct mds_rec_create), namelen + 1, 0};
         char *tmp, *bufs[3] = {NULL, NULL, NULL};
@@ -98,7 +95,7 @@ int mdc_create(struct lustre_handle *conn,
                                dir->i_ino, namelen, name); 
                         LBUG();
                 }
-                size[2] = smd->lmd_size;
+                size[2] = smd->lmd_easize;
                 bufs[2] = (char *)smd;
                 bufcount = 3;
         } else if (S_ISLNK(mode)) {
@@ -106,9 +103,7 @@ int mdc_create(struct lustre_handle *conn,
                 bufcount = 3;
         }
 
-        mdc_con2cl(conn, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(cl, connection, rconn,
-                               MDS_REINT, bufcount, size, bufs);
+        req = ptlrpc_prep_req2(conn, MDS_REINT, bufcount, size, bufs);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -119,7 +114,7 @@ int mdc_create(struct lustre_handle *conn,
 
         if (S_ISREG(mode)) {
                 tmp = lustre_msg_buf(req->rq_reqmsg, 2);
-                memcpy(tmp, smd, smd->lmd_size);
+                memcpy(tmp, smd, smd->lmd_easize);
         } else if (S_ISLNK(mode)) {
                 tmp = lustre_msg_buf(req->rq_reqmsg, 2);
                 LOGL0(tgt, tgtlen, tmp);
@@ -149,16 +144,11 @@ int mdc_unlink(struct lustre_handle *conn,
                struct inode *dir, struct inode *child, const char *name,
                int namelen, struct ptlrpc_request **request)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct ptlrpc_request *req;
         int rc, size[2] = {sizeof(struct mds_rec_unlink), namelen + 1};
         ENTRY;
 
-        mdc_con2cl(conn, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(cl, connection, rconn,
-                               MDS_REINT, 2, size, NULL);
+        req = ptlrpc_prep_req2(conn, MDS_REINT, 2, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -179,16 +169,11 @@ int mdc_link(struct lustre_handle *conn,
              struct dentry *src, struct inode *dir, const char *name,
              int namelen, struct ptlrpc_request **request)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct ptlrpc_request *req;
         int rc, size[2] = {sizeof(struct mds_rec_link), namelen + 1};
         ENTRY;
 
-        mdc_con2cl(conn, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(cl, connection, rconn,
-                               MDS_REINT, 2, size, NULL);
+        req = ptlrpc_prep_req2(conn, MDS_REINT, 2, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -210,17 +195,12 @@ int mdc_rename(struct lustre_handle *conn,
                int oldlen, const char *new, int newlen,
                struct ptlrpc_request **request)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct ptlrpc_request *req;
         int rc, size[3] = {sizeof(struct mds_rec_rename), oldlen + 1,
                            newlen + 1};
         ENTRY;
 
-        mdc_con2cl(conn, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(cl, connection, rconn,
-                               MDS_REINT, 3, size, NULL);
+        req = ptlrpc_prep_req2(conn, MDS_REINT, 3, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
index 4116df1..b81e961 100644 (file)
@@ -39,57 +39,31 @@ int mdc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
                struct lustre_handle **rconn)
 {
         struct obd_export *export;
-        struct mdc_obd *mdc;
+        struct client_obd *mdc;
 
         export = class_conn2export(conn);
         if (!export)
                 return -ENOTCONN;
 
-        mdc = &export->exp_obd->u.mdc;
+        mdc = &export->exp_obd->u.cli;
 
-        *cl = mdc->mdc_client;
-        *connection = mdc->mdc_conn;
+        *cl = mdc->cl_client;
+        *connection = mdc->cl_conn;
         *rconn = &export->exp_rconnh;
 
         return 0;
 }
 
-static int mdc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
-                         struct ptlrpc_connection **connection,
-                         struct lustre_handle **rconn)
-{
-        struct obd_export *export;
-        struct mdc_obd *mdc;
-
-        export = class_conn2export(conn);
-        if (!export)
-                return -ENOTCONN;
-
-        mdc = &export->exp_obd->u.mdc;
-
-        *cl = mdc->mdc_ldlm_client;
-        *connection = mdc->mdc_conn;
-        *rconn = &export->exp_rconnh;
-
-        return 0;
-}
-
-
 int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid,
                   __u64 *last_committed, __u64 *last_rcvd,
                   __u32 *last_xid, struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        mdc_con2cl(conn, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(cl, connection, rconn,
-                               MDS_GETSTATUS, 1, &size, NULL);
+        req = ptlrpc_prep_req2(conn, MDS_GETSTATUS, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -129,17 +103,12 @@ int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
         struct ptlrpc_request *req;
         struct mds_status_req *streq;
         struct lov_obd *lov = &obd->u.lov;
-        struct mdc_obd *mdc = &lov->mdcobd->u.mdc;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
+        struct client_obd *mdc = &lov->mdcobd->u.cli;
         struct lov_desc *desc = &lov->desc;
         int rc, size[2] = {sizeof(*streq)};
         ENTRY;
 
-        mdc_con2cl(mdc_connh, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(cl, connection, rconn,
-                               MDS_GETLOVINFO, 1, size, NULL);
+        req = ptlrpc_prep_req2(mdc_connh, MDS_GETLOVINFO, 1, size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -162,7 +131,7 @@ int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
                 *uuids = lustre_msg_buf(req->rq_repmsg, 1);
                 lov_unpackdesc(desc);
         }
-        mdc->mdc_max_mdsize = sizeof(*desc) +
+        mdc->cl_max_mdsize = sizeof(*desc) +
                 desc->ld_tgt_count * sizeof(uuid_t);
 
         EXIT;
@@ -184,8 +153,7 @@ int mdc_getattr(struct lustre_handle *conn,
         ENTRY;
 
         mdc_con2cl(conn, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(cl, connection, rconn,
-                               MDS_GETATTR, 1, size, NULL);
+        req = ptlrpc_prep_req2(conn, MDS_GETATTR, 1, size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -194,9 +162,9 @@ int mdc_getattr(struct lustre_handle *conn,
         body->valid = valid;
 
         if (S_ISREG(type)) {
-                struct mdc_obd *mdc = &class_conn2obd(conn)->u.mdc;
+                struct client_obd *mdc = &class_conn2obd(conn)->u.cli;
                 bufcount = 2;
-                size[1] = mdc->mdc_max_mdsize;
+                size[1] = mdc->cl_max_mdsize;
         } else if (valid & OBD_MD_LINKNAME) {
                 bufcount = 2;
                 size[1] = ea_size;
@@ -260,20 +228,17 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
 {
         struct ptlrpc_request *req;
         struct obd_device *obddev = class_conn2obd(conn);
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         __u64 res_id[RES_NAME_SIZE] = {dir->i_ino};
         int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
         int rc, flags;
         int repsize[3] = {sizeof(struct ldlm_reply), 
                           sizeof(struct mds_body),
-                          obddev->u.mdc.mdc_max_mdsize};
+                          obddev->u.cli.cl_max_mdsize};
         struct ldlm_reply *dlm_rep;
         struct ldlm_intent *lit;
         ENTRY;
 
-        LDLM_DEBUG_NOLOCK("mdsintent %d dir %ld", it->it_op, dir->i_ino);
+        LDLM_DEBUG_NOLOCK("mdsintent %s dir %ld", ldlm_it2str(it->it_op), dir->i_ino);
 
         switch (it->it_op) { 
         case IT_MKDIR:
@@ -290,13 +255,11 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 break;
         }
 
-        mdc_con2dlmcl(conn, &cl, &connection, &rconn);
         if (it->it_op & (IT_MKDIR | IT_CREAT | IT_SYMLINK | IT_MKNOD)) {
                 size[2] = sizeof(struct mds_rec_create);
                 size[3] = de->d_name.len + 1;
                 size[4] = tgtlen + 1;
-                req = ptlrpc_prep_req2(cl, connection, rconn,
-                                       LDLM_ENQUEUE, 5, size, NULL);
+                req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 5, size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -315,8 +278,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 size[2] = sizeof(struct mds_rec_rename);
                 size[3] = old_de->d_name.len + 1;
                 size[4] = de->d_name.len + 1;
-                req = ptlrpc_prep_req2(cl, connection, rconn,
-                                       LDLM_ENQUEUE, 5, size, NULL);
+                req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 5, size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -332,8 +294,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
         } else if (it->it_op == IT_UNLINK || it->it_op == IT_RMDIR) {
                 size[2] = sizeof(struct mds_rec_unlink);
                 size[3] = de->d_name.len + 1;
-                req = ptlrpc_prep_req2(cl, connection, rconn,
-                                       LDLM_ENQUEUE, 4, size, NULL);
+                req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 4, size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -352,8 +313,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 size[2] = sizeof(struct mds_body);
                 size[3] = de->d_name.len + 1;
 
-                req = ptlrpc_prep_req2(cl, connection, rconn,
-                                       LDLM_ENQUEUE, 4, size, NULL);
+                req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 4, size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -367,8 +327,7 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 /* get ready for the reply */
                 req->rq_replen = lustre_msg_size(3, repsize);
         } else if (it->it_op == IT_READDIR) {
-                req = ptlrpc_prep_req2(cl, connection, rconn,
-                                       LDLM_ENQUEUE, 1, size, NULL);
+                req = ptlrpc_prep_req2(conn, LDLM_ENQUEUE, 1, size, NULL);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -379,9 +338,8 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 RETURN(-1);
         }
 #warning FIXME: the data here needs to be different if a lock was granted for a different inode
-        rc = ldlm_cli_enqueue(cl, connection, rconn, req,
-                              obddev->obd_namespace, NULL, res_id, lock_type,
-                              NULL, 0, lock_mode, &flags,
+        rc = ldlm_cli_enqueue(conn, req, obddev->obd_namespace, NULL, res_id, lock_type,
+                              NULL, 0, lock_mode, &flags, ldlm_completion_ast,
                               (void *)mdc_lock_callback, data, datalen, lockh);
         if (rc == -ENOENT || rc == ELDLM_LOCK_ABORTED) {
                 lock_mode = 0;
@@ -414,12 +372,11 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
 
         if (smd != NULL) {
                 bufcount = 2;
-                size[1] = smd->lmd_size;
+                size[1] = smd->lmd_easize;
         }
 
         mdc_con2cl(conn, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(cl, connection, rconn,
-                               MDS_OPEN, bufcount, size, NULL);
+        req = ptlrpc_prep_req2(conn, MDS_OPEN, bufcount, size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -431,7 +388,7 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
         body->extra = cookie;
 
         if (smd != NULL)
-                memcpy(lustre_msg_buf(req->rq_reqmsg, 1), smd, smd->lmd_size);
+                memcpy(lustre_msg_buf(req->rq_reqmsg, 1), smd, smd->lmd_easize);
 
         req->rq_replen = lustre_msg_size(1, size);
 
@@ -452,16 +409,11 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
 int mdc_close(struct lustre_handle *conn, 
               obd_id ino, int type, __u64 fh, struct ptlrpc_request **request)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct mds_body *body;
         int rc, size = sizeof(*body);
         struct ptlrpc_request *req;
 
-        mdc_con2cl(conn, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(cl, connection, rconn,
-                               MDS_CLOSE, 1, &size, NULL);
+        req = ptlrpc_prep_req2(conn, MDS_CLOSE, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
@@ -483,9 +435,7 @@ int mdc_close(struct lustre_handle *conn,
 int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
                  char *addr, struct ptlrpc_request **request)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
+        struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
         struct ptlrpc_request *req = NULL;
         struct ptlrpc_bulk_desc *desc = NULL;
         struct ptlrpc_bulk_page *bulk = NULL;
@@ -495,13 +445,11 @@ int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
 
         CDEBUG(D_INODE, "inode: %ld\n", (long)ino);
 
-        mdc_con2cl(conn, &cl, &connection, &rconn);
         desc = ptlrpc_prep_bulk(connection);
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        req = ptlrpc_prep_req2(cl, connection, rconn,
-                               MDS_READPAGE, 1, &size, NULL);
+        req = ptlrpc_prep_req2(conn, MDS_READPAGE, 1, &size, NULL);
         if (!req)
                 GOTO(out2, rc = -ENOMEM);
 
@@ -544,17 +492,12 @@ int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
 int mdc_statfs(struct lustre_handle *conn, struct statfs *sfs,
                struct ptlrpc_request **request)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct obd_statfs *osfs;
         struct ptlrpc_request *req;
         int rc, size = sizeof(*osfs);
         ENTRY;
 
-        mdc_con2cl(conn, &cl, &connection, &rconn);
-        req = ptlrpc_prep_req2(cl, connection, rconn,
-                               MDS_STATFS, 0, NULL, NULL);
+        req = ptlrpc_prep_req2(conn, MDS_STATFS, 0, NULL, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
         req->rq_replen = lustre_msg_size(1, &size);
@@ -575,294 +518,11 @@ out:
         return rc;
 }
 
-static int mdc_ioctl(long cmd, struct lustre_handle *conn, int len, void *karg,
-                     void *uarg)
-{
-#if 0
-        /* FIXME XXX : This should use the new ioc_data to pass args in */
-        int err = 0;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
-        struct ptlrpc_request *request;
-
-        ENTRY;
-
-        if (_IOC_TYPE(cmd) != IOC_REQUEST_TYPE ||
-            _IOC_NR(cmd) < IOC_REQUEST_MIN_NR  ||
-            _IOC_NR(cmd) > IOC_REQUEST_MAX_NR ) {
-                CDEBUG(D_IOCTL, "invalid ioctl ( type %d, nr %d, size %d )\n",
-                       _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
-                RETURN(-EINVAL);
-        }
-
-        ptlrpc_init_client(NULL, NULL, 
-                           MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, &cl);
-
-        mdc_con2cl(conn, &cl, &connection, &rconn);
-        switch (cmd) {
-        case IOC_REQUEST_GETATTR: {
-                CERROR("-- getting attr for ino %lu\n", arg);
-                err = mdc_getattr(&cl, connection, (obd_id)arg, S_IFDIR, ~0, 0,
-                                  &request);
-                CERROR("-- done err %d\n", err);
-
-                GOTO(out, err);
-        }
-
-        case IOC_REQUEST_READPAGE: {
-                char *buf;
-                OBD_ALLOC(buf, PAGE_SIZE);
-                if (!buf) {
-                        err = -ENOMEM;
-                        GOTO(out, err);
-                }
-                CERROR("-- readpage 0 for ino %lu\n", arg);
-                err = mdc_readpage(&cl, connection, arg, S_IFDIR, 0, buf,
-                                   &request);
-                CERROR("-- done err %d\n", err);
-                OBD_FREE(buf, PAGE_SIZE);
-
-                GOTO(out, err);
-        }
-
-        case IOC_REQUEST_SETATTR: {
-                struct inode inode;
-                struct iattr iattr;
-
-                inode.i_ino = arg;
-                inode.i_generation = 0;
-                iattr.ia_mode = 040777;
-                iattr.ia_atime = 0;
-                iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
-
-                err = mdc_setattr(&cl, connection, &inode, &iattr, &request);
-                CERROR("-- done err %d\n", err);
-
-                GOTO(out, err);
-        }
-
-        case IOC_REQUEST_CREATE: {
-                struct inode inode;
-                struct iattr iattr;
-
-                inode.i_ino = arg;
-                inode.i_generation = 0;
-                iattr.ia_mode = 040777;
-                iattr.ia_atime = 0;
-                iattr.ia_valid = ATTR_MODE | ATTR_ATIME;
-
-                err = mdc_create(&cl, connection, &inode,
-                                 "foofile", strlen("foofile"),
-                                 NULL, 0, 0100707, 47114711,
-                                 11, 47, 0, NULL, &request);
-                CERROR("-- done err %d\n", err);
-
-                GOTO(out, err);
-        }
-
-        case IOC_REQUEST_OPEN: {
-                __u64 fh, ino;
-                copy_from_user(&ino, (__u64 *)arg, sizeof(ino));
-                CERROR("-- opening ino %llu\n", (unsigned long long)ino);
-                err = mdc_open(&cl, connection, ino, S_IFDIR, O_RDONLY, 4711,
-                               &fh, &request);
-                copy_to_user((__u64 *)arg, &fh, sizeof(fh));
-                CERROR("-- done err %d (fh=%Lu)\n", err,
-                       (unsigned long long)fh);
-
-                GOTO(out, err);
-        }
-
-        case IOC_REQUEST_CLOSE: {
-                CERROR("-- closing ino 2, filehandle %lu\n", arg);
-                err = mdc_close(&cl, connection, 2, S_IFDIR, arg, &request);
-                CERROR("-- done err %d\n", err);
-
-                GOTO(out, err);
-        }
-
-        default:
-                GOTO(out, err = -EINVAL);
-        }
-
- out:
-        ptlrpc_free_req(request);
-        ptlrpc_put_connection(connection);
-        ptlrpc_cleanup_client(&cl);
-
-        RETURN(err);
-#endif
-        return 0;
-}
-
-static int mdc_setup(struct obd_device *obddev, obd_count len, void *buf)
-{
-        struct obd_ioctl_data* data = buf;
-        struct mdc_obd *mdc = &obddev->u.mdc;
-        char server_uuid[37];
-        int rc;
-        ENTRY;
-
-        if (data->ioc_inllen1 < 1) {
-                CERROR("osc setup requires a TARGET UUID\n");
-                RETURN(-EINVAL);
-        }
-
-        if (data->ioc_inllen1 > 37) {
-                CERROR("mdc UUID must be less than 38 characters\n");
-                RETURN(-EINVAL);
-        }
-
-        if (data->ioc_inllen2 < 1) {
-                CERROR("mdc setup requires a SERVER UUID\n");
-               RETURN(-EINVAL);
-        }
-
-        if (data->ioc_inllen2 > 37) {
-                CERROR("mdc UUID must be less than 38 characters\n");
-                RETURN(-EINVAL);
-        }
-
-        memcpy(mdc->mdc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
-        memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
-                                                   sizeof(server_uuid)));
-
-        mdc->mdc_conn = ptlrpc_uuid_to_connection(server_uuid);
-        if (!mdc->mdc_conn)
-                RETURN(-ENOENT); 
-
-        OBD_ALLOC(mdc->mdc_client, sizeof(*mdc->mdc_client));
-        if (mdc->mdc_client == NULL)
-                GOTO(out_conn, rc = -ENOMEM);
-
-        OBD_ALLOC(mdc->mdc_ldlm_client, sizeof(*mdc->mdc_ldlm_client));
-        if (mdc->mdc_ldlm_client == NULL)
-                GOTO(out_client, rc = -ENOMEM);
-
-        ptlrpc_init_client(NULL, NULL, MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
-                           mdc->mdc_client);
-        ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
-                           mdc->mdc_ldlm_client);
-        mdc->mdc_client->cli_name = "mdc";
-        mdc->mdc_ldlm_client->cli_name = "ldlm";
-        mdc->mdc_max_mdsize = sizeof(struct lov_stripe_md);
-        /* XXX get recovery hooked in here again */
-        //ptlrpc_init_client(ptlrpc_connmgr, ll_recover,...
-
-        ptlrpc_init_client(ptlrpc_connmgr, NULL,
-                           MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
-                           mdc->mdc_client);
-
-        MOD_INC_USE_COUNT;
-        RETURN(0);
-
- out_client:
-        OBD_FREE(mdc->mdc_client, sizeof(*mdc->mdc_client));
- out_conn:
-        ptlrpc_put_connection(mdc->mdc_conn);
-        return rc;
-}
-
-static int mdc_cleanup(struct obd_device * obddev)
-{
-        struct mdc_obd *mdc = &obddev->u.mdc;
-
-        ptlrpc_cleanup_client(mdc->mdc_client);
-        OBD_FREE(mdc->mdc_client, sizeof(*mdc->mdc_client));
-        ptlrpc_cleanup_client(mdc->mdc_ldlm_client);
-        OBD_FREE(mdc->mdc_ldlm_client, sizeof(*mdc->mdc_ldlm_client));
-        ptlrpc_put_connection(mdc->mdc_conn);
-
-        MOD_DEC_USE_COUNT;
-        return 0;
-}
-
-static int mdc_connect(struct lustre_handle *conn, struct obd_device *obd)
-{
-        struct mdc_obd *mdc = &obd->u.mdc;
-        struct ptlrpc_request *request;
-        int rc, size = sizeof(mdc->mdc_target_uuid);
-        char *tmp = mdc->mdc_target_uuid;
-
-        ENTRY;
-
-        obd->obd_namespace =
-                ldlm_namespace_new("mdc", LDLM_NAMESPACE_CLIENT);
-        if (obd->obd_namespace == NULL)
-                RETURN(-ENOMEM);
-
-        MOD_INC_USE_COUNT;
-        rc = class_connect(conn, obd);
-        if (rc) 
-                RETURN(rc); 
-
-        request = ptlrpc_prep_req(mdc->mdc_client, mdc->mdc_conn,
-                                  MDS_CONNECT, 1, &size, &tmp);
-        if (!request)
-                GOTO(out_disco, -ENOMEM);
-
-        request->rq_level = LUSTRE_CONN_NEW;
-        request->rq_replen = lustre_msg_size(0, NULL);
-        /* Sending our local connection info breaks for local connections
-        request->rq_reqmsg->addr = conn->addr;
-        request->rq_reqmsg->cookie = conn->cookie;
-         */
-
-        rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
-        if (rc)
-                GOTO(out, rc);
-
-        class_rconn2export(conn, (struct lustre_handle *)request->rq_repmsg);
-
-        EXIT;
- out:
-        ptlrpc_free_req(request);
- out_disco:
-        if (rc) {
-                class_disconnect(conn);
-                MOD_DEC_USE_COUNT;
-        }
-        return rc;
-}
-
-static int mdc_disconnect(struct lustre_handle *conn)
-{
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
-        struct obd_device *obd = class_conn2obd(conn);
-        struct ptlrpc_request *request;
-        int rc;
-        ENTRY;
-
-        ldlm_namespace_free(obd->obd_namespace);
-        mdc_con2cl(conn, &cl, &connection, &rconn);
-        request = ptlrpc_prep_req2(cl, connection, rconn,
-                                   MDS_DISCONNECT, 0, NULL, NULL);
-        if (!request)
-                RETURN(-ENOMEM);
-
-        request->rq_replen = lustre_msg_size(0, NULL);
-
-        rc = ptlrpc_queue_wait(request);
-        if (rc) 
-                GOTO(out, rc);
-        rc = class_disconnect(conn);
-        if (!rc)
-                MOD_DEC_USE_COUNT;
- out:
-        ptlrpc_free_req(request);
-        return rc;
-}
-
 struct obd_ops mdc_obd_ops = {
-        o_setup:   mdc_setup,
-        o_cleanup: mdc_cleanup,
-        o_connect: mdc_connect,
-        o_disconnect: mdc_disconnect,
-        o_iocontrol: mdc_ioctl
+        o_setup:   client_obd_setup,
+        o_cleanup: client_obd_cleanup,
+        o_connect: client_obd_connect,
+        o_disconnect: client_obd_disconnect,
 };
 
 static int __init ptlrpc_request_init(void)
index 48667cc..dfc469b 100644 (file)
@@ -13,8 +13,6 @@
  *  by Peter Braam <braam@clusterfs.com> &
  *     Andreas Dilger <braam@clusterfs.com>
  *
- *  This server is single threaded at present (but can easily be multi threaded)
- *
  */
 
 #define EXPORT_SYMTAB
@@ -37,6 +35,7 @@ inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
         return &req->rq_export->exp_obd->u.mds;
 }
 
+
 /* Assumes caller has already pushed into the kernel filesystem context */
 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
                         __u64 offset)
@@ -95,7 +94,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
 }
 
 /* 'dir' is a inode for which a lock has already been taken */
-struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir,
+struct dentry *mds_name2locked_dentry(struct obd_device *obd, struct dentry *dir,
                                       struct vfsmount **mnt, char *name,
                                       int namelen, int lock_mode,
                                       struct lustre_handle *lockh,
@@ -123,11 +122,9 @@ struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir,
                 RETURN(dchild);
 
         res_id[0] = dchild->d_inode->i_ino;
-        rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
-                                   (struct lustre_handle *)&mds->mds_connh,
-                                   NULL, mds->mds_local_namespace, NULL,
+        rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                                    res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                   &flags, (void *)mds_lock_callback, NULL,
+                                   &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
                                    0, lockh);
         if (rc != ELDLM_OK) {
                 l_dput(dchild);
@@ -138,10 +135,11 @@ struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir,
         RETURN(dchild);
 }
 
-struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid,
+struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct vfsmount **mnt, int lock_mode,
                                      struct lustre_handle *lockh)
 {
+        struct mds_obd *mds = &obd->u.mds;
         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
         int flags, rc;
         __u64 res_id[3] = {0};
@@ -151,11 +149,9 @@ struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid,
                 RETURN(de);
 
         res_id[0] = de->d_inode->i_ino;
-        rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
-                                   (struct lustre_handle *)&mds->mds_connh,
-                                   NULL, mds->mds_local_namespace, NULL,
+        rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                                    res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                   &flags, (void *)mds_lock_callback, NULL,
+                                   &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
                                    0, lockh);
         if (rc != ELDLM_OK) {
                 l_dput(de);
@@ -197,9 +193,7 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
                 RETURN(ERR_PTR(-ESTALE));
         }
 
-        /* now to find a dentry.
-         * If possible, get a well-connected one
-         */
+        /* now to find a dentry. If possible, get a well-connected one */
         if (mnt)
                 *mnt = mds->mds_vfsmnt;
         spin_lock(&dcache_lock);
@@ -374,6 +368,7 @@ int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
 static int mds_getattr_name(int offset, struct ptlrpc_request *req)
 {
         struct mds_obd *mds = mds_req2mds(req);
+        struct obd_device *obd = req->rq_export->exp_obd;
         struct obd_run_ctxt saved;
         struct mds_body *body;
         struct dentry *de = NULL, *dchild = NULL;
@@ -412,15 +407,13 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
         res_id[0] = dir->i_ino;
 
-        rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
+        rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
                              NULL, 0, lock_mode, &lockh);
         if (rc == 0) {
                 LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
-                rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
-                                      (struct lustre_handle *)&mds->mds_connh, 
-                                      NULL, mds->mds_local_namespace, NULL,
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                      &flags, (void *)mds_lock_callback,
+                                      &flags, ldlm_completion_ast, (void *)mds_lock_callback,
                                       NULL, 0, &lockh);
                 if (rc != ELDLM_OK) {
                         CERROR("lock enqueue: err: %d\n", rc);
@@ -450,7 +443,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req)
                 if (S_ISREG(inode->i_mode)) {
                         struct lov_stripe_md *md;
                         md = lustre_msg_buf(req->rq_repmsg, offset + 1);
-                        md->lmd_size = mds->mds_max_mdsize;
+                        md->lmd_easize = mds->mds_max_mdsize;
                         mds_fs_get_md(mds, inode, md);
                 }
                 /* now a normal case for intent locking */
@@ -634,11 +627,9 @@ static int mds_open(struct ptlrpc_request *req)
                 void *handle;
                 struct lov_stripe_md *md;
                 struct inode *inode = de->d_inode;
-                //struct iattr iattr;
                 int rc;
 
                 md = lustre_msg_buf(req->rq_reqmsg, 1);
-                //iattr.ia_mode = inode->i_mode;
 
                 handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
                 if (!handle) {
@@ -648,7 +639,6 @@ static int mds_open(struct ptlrpc_request *req)
 
                 /* XXX error handling */
                 rc = mds_fs_set_md(mds, inode, handle, md);
-                //                rc = mds_fs_setattr(mds, de, handle, &iattr);
                 if (!rc) {
                         struct obd_run_ctxt saved;
                         push_ctxt(&saved, &mds->mds_ctxt);
@@ -881,6 +871,37 @@ int mds_handle(struct ptlrpc_request *req)
                 rc = mds_close(req);
                 break;
 
+        case LDLM_ENQUEUE:
+                CDEBUG(D_INODE, "enqueue\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
+                rc = ldlm_handle_enqueue(req);
+                if (rc)
+                        break;
+                RETURN(0);
+
+        case LDLM_CONVERT:
+                CDEBUG(D_INODE, "convert\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
+                rc = ldlm_handle_convert(req);
+                if (rc)
+                        break;
+                RETURN(0);
+
+        case LDLM_CANCEL:
+                CDEBUG(D_INODE, "cancel\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
+                rc = ldlm_handle_cancel(req);
+                if (rc)
+                        break;
+                RETURN(0);
+        case LDLM_CALLBACK:
+                CDEBUG(D_INODE, "callback\n");
+                CERROR("callbacks should not happen on MDS\n");
+                LBUG();
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
+                break;
+
+
         default:
                 rc = ptlrpc_error(req->rq_svc, req);
                 RETURN(rc);
@@ -963,12 +984,12 @@ static int mds_recover(struct obd_device *obddev)
         return rc;
 }
 
-
+#define MDS_NUM_THREADS 8
 /* mount the file system (secretly) */
 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
+        int i;
         struct obd_ioctl_data* data = buf;
-        struct obd_export *export;
         struct mds_obd *mds = &obddev->u.mds;
         struct vfsmount *mnt;
         int rc = 0;
@@ -1008,58 +1029,30 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
                 GOTO(err_fs, rc = -EINVAL);
         }
 
-        rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
-        if (rc) {
-                CERROR("cannot start thread: rc = %d\n", rc);
-                GOTO(err_svc, rc);
-        }
-
         rc = -ENOENT;
-        mds->mds_ldlm_conn = ptlrpc_uuid_to_connection("self");
-        if (!mds->mds_ldlm_conn) {
-                mds_cleanup(obddev);
-                GOTO(err_thread, rc);
-        }
-
         obddev->obd_namespace =
                 ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
         if (obddev->obd_namespace == NULL) {
                 LBUG();
                 mds_cleanup(obddev);
-                GOTO(err_thread, rc);
-        }
-
-        mds->mds_local_namespace =
-                ldlm_namespace_new("mds_client", LDLM_NAMESPACE_CLIENT);
-        if (mds->mds_local_namespace == NULL) {
-                LBUG();
-                mds_cleanup(obddev);
-                GOTO(err_thread, rc);
+                GOTO(err_svc, rc);
         }
 
-        OBD_ALLOC(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client));
-        if (mds->mds_ldlm_client == NULL) {
-                LBUG();
-                mds_cleanup(obddev);
-                GOTO(err_thread, rc);
+        for (i = 0; i < MDS_NUM_THREADS; i++) {
+                char name[32];
+                sprintf(name, "lustre_MDS_%2d", i); 
+                rc = ptlrpc_start_thread(obddev, mds->mds_service, name);
+                if (rc) {
+                        CERROR("cannot start MDS thread #%d: rc %d\n", i, rc);
+                        LBUG();
+                        GOTO(err_thread, rc);
+                }
         }
-        ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
-                           mds->mds_ldlm_client);
-        mds->mds_ldlm_client->cli_target_devno = obddev->obd_minor;
-        mds->mds_ldlm_client->cli_name = "mds ldlm";
 
         rc = mds_recover(obddev);
         if (rc)
                 GOTO(err_thread, rc);
 
-        rc = class_connect(&mds->mds_connh, obddev);
-        if (rc)
-                GOTO(err_thread, rc);
-        export = class_conn2export(&mds->mds_connh);
-        if (!export)
-                LBUG();
-        export->exp_connection = mds->mds_ldlm_conn;
-
         RETURN(0);
 
         
@@ -1088,8 +1081,6 @@ static int mds_cleanup(struct obd_device * obddev)
         struct mds_obd *mds = &obddev->u.mds;
 
         ENTRY;
-        class_disconnect(&mds->mds_connh);
-
 
         if ( !list_empty(&obddev->obd_exports) ) {
                 CERROR("still has exports!\n");
@@ -1118,14 +1109,8 @@ static int mds_cleanup(struct obd_device * obddev)
         mds->mds_sb = 0;
         kfree(mds->mds_fstype);
 
-        ldlm_namespace_free(mds->mds_local_namespace);
         ldlm_namespace_free(obddev->obd_namespace);
 
-        if (mds->mds_ldlm_conn != NULL)
-                ptlrpc_put_connection(mds->mds_ldlm_conn);
-
-        OBD_FREE(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client));
-
         lock_kernel();
 #ifdef CONFIG_DEV_RDONLY
         dev_clear_rdonly(2);
@@ -1136,6 +1121,145 @@ static int mds_cleanup(struct obd_device * obddev)
         RETURN(0);
 }
 
+static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
+                              ldlm_mode_t mode, void *data)
+{
+        struct ptlrpc_request *req = req_cookie;
+        int rc = 0;
+        ENTRY;
+
+        if (!req_cookie)
+                RETURN(0);
+
+        if (req->rq_reqmsg->bufcount > 1) {
+                /* an intent needs to be considered */
+                struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
+                struct mds_obd *mds= &req->rq_export->exp_obd->u.mds;
+                struct mds_body *mds_rep;
+                struct ldlm_reply *rep;
+                __u64 new_resid[3] = {0, 0, 0}, old_res;
+                int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply),
+                                                  sizeof(struct mds_body),
+                                                  mds->mds_max_mdsize};
+
+                it->opc = NTOH__u64(it->opc);
+
+                LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
+
+                /* prepare reply */
+                switch((long)it->opc) {
+                case IT_GETATTR:
+                        /* Note that in the negative case you may be returning
+                         * a file and its obdo */
+                case IT_CREAT:
+                case IT_CREAT|IT_OPEN:
+                case IT_LINK:
+                case IT_LOOKUP:
+                case IT_MKDIR:
+                case IT_MKNOD:
+                case IT_OPEN:
+                case IT_READLINK:
+                case IT_RENAME:
+                case IT_RMDIR:
+                case IT_SETATTR:
+                case IT_SYMLINK:
+                case IT_UNLINK:
+                        bufcount = 3;
+                        break;
+                case IT_RENAME2:
+                        bufcount = 1;
+                        break;
+                default:
+                        LBUG();
+                }
+
+                rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
+                                     &req->rq_repmsg);
+                if (rc) {
+                        rc = req->rq_status = -ENOMEM;
+                        RETURN(rc);
+                }
+
+                rep = lustre_msg_buf(req->rq_repmsg, 0);
+                rep->lock_policy_res1 = 1;
+
+                /* execute policy */
+                switch ((long)it->opc) {
+                case IT_CREAT:
+                case IT_CREAT|IT_OPEN:
+                case IT_LINK:
+                case IT_MKDIR:
+                case IT_MKNOD:
+                case IT_RENAME2:
+                case IT_RMDIR:
+                case IT_SYMLINK:
+                case IT_UNLINK:
+                        rc = mds_reint(2, req);
+                        if (rc || req->rq_status != 0) {
+                                rep->lock_policy_res2 = req->rq_status;
+                                RETURN(ELDLM_LOCK_ABORTED);
+                        }
+                        break;
+                case IT_GETATTR:
+                case IT_LOOKUP:
+                case IT_OPEN:
+                case IT_READDIR:
+                case IT_READLINK:
+                case IT_RENAME:
+                case IT_SETATTR:
+                        rc = mds_getattr_name(2, req);
+                        /* FIXME: we need to sit down and decide on who should
+                         * set req->rq_status, who should return negative and
+                         * positive return values, and what they all mean. */
+                        if (rc || req->rq_status != 0) {
+                                rep->lock_policy_res2 = req->rq_status;
+                                RETURN(ELDLM_LOCK_ABORTED);
+                        }
+                        break;
+                case IT_READDIR|IT_OPEN:
+                        LBUG();
+                        break;
+                default:
+                        CERROR("Unhandled intent\n");
+                        LBUG();
+                }
+
+                if (it->opc == IT_UNLINK || it->opc == IT_RMDIR ||
+                    it->opc == IT_RENAME || it->opc == IT_RENAME2)
+                        RETURN(ELDLM_LOCK_ABORTED);
+
+                rep->lock_policy_res2 = req->rq_status;
+                mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
+                new_resid[0] = NTOH__u32(mds_rep->ino);
+                if (new_resid[0] == 0)
+                        LBUG();
+                old_res = lock->l_resource->lr_name[0];
+
+                CDEBUG(D_INFO, "remote intent: locking %d instead of"
+                       "%ld\n", mds_rep->ino, (long)old_res);
+
+                ldlm_lock_change_resource(lock, new_resid);
+                if (lock->l_resource == NULL) {
+                        LBUG();
+                        RETURN(-ENOMEM);
+                }
+                LDLM_DEBUG(lock, "intent policy, old res %ld",
+                           (long)old_res);
+                RETURN(ELDLM_LOCK_CHANGED);
+        } else {
+                int size = sizeof(struct ldlm_reply);
+                rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
+                                     &req->rq_repmsg);
+                if (rc) {
+                        CERROR("out of memory\n");
+                        LBUG();
+                        RETURN(-ENOMEM);
+                }
+        }
+        RETURN(rc);
+}
+
+
 extern int mds_iocontrol(long cmd, struct lustre_handle *conn, 
                           int len, void *karg, void *uarg);
 
@@ -1150,17 +1274,14 @@ static struct obd_ops mds_obd_ops = {
 
 static int __init mds_init(void)
 {
-        inter_module_register("mds_reint", THIS_MODULE, &mds_reint);
-        inter_module_register("mds_getattr_name", THIS_MODULE,
-                              &mds_getattr_name);
         class_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
+        ldlm_register_intent(ldlm_intent_policy);
         return 0;
 }
 
 static void __exit mds_exit(void)
 {
-        inter_module_unregister("mds_reint");
-        inter_module_unregister("mds_getattr_name");
+        ldlm_unregister_intent();
         class_unregister_type(LUSTRE_MDS_NAME);
 }
 
index fcf9faf..f9cfb36 100644 (file)
@@ -113,7 +113,7 @@ static int mds_extN_set_md(struct inode *inode, void *handle,
                 md->lmd_magic = cpu_to_le32(XATTR_MDS_MO_MAGIC);
                 rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE,
                                     XATTR_LUSTRE_MDS_OBJID, md, 
-                                    md->lmd_size, XATTR_CREATE);
+                                    md->lmd_easize, XATTR_CREATE);
         }
         up(&inode->i_sem);
         unlock_kernel();
@@ -127,7 +127,7 @@ static int mds_extN_set_md(struct inode *inode, void *handle,
 static int mds_extN_get_md(struct inode *inode, struct lov_stripe_md *md)
 {
         int rc;
-        int size = md->lmd_size;
+        int size = md->lmd_easize;
 
         lock_kernel();
         down(&inode->i_sem);
index edbe1b6..f35a9bf 100644 (file)
@@ -101,6 +101,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                              struct ptlrpc_request *req)
 {
         struct mds_obd *mds = mds_req2mds(req);
+        struct obd_device *obd = req->rq_export->exp_obd;
         struct dentry *de;
         void *handle;
         struct lustre_handle child_lockh;
@@ -113,7 +114,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 int namelen;
 
                 /* a name was supplied by the client; fid1 is the directory */
-                dir = mds_fid2locked_dentry(mds, rec->ur_fid1, NULL, LCK_PR,
+                dir = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PR,
                                             &dir_lockh);
                 if (!dir || IS_ERR(dir)) {
                         l_dput(dir);
@@ -123,7 +124,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
 
                 name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
                 namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
-                de = mds_name2locked_dentry(mds, dir, NULL, name, namelen,
+                de = mds_name2locked_dentry(obd, dir, NULL, name, namelen,
                                             0, &child_lockh, LCK_PR);
                 l_dput(dir);
                 if (!de || IS_ERR(de)) {
@@ -222,6 +223,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
 {
         struct dentry *de = NULL;
         struct mds_obd *mds = mds_req2mds(req);
+        struct obd_device *obd = req->rq_export->exp_obd;
         struct dentry *dchild = NULL;
         struct inode *dir;
         void *handle;
@@ -249,15 +251,13 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
         res_id[0] = dir->i_ino;
 
-        rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
+        rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
                              NULL, 0, lock_mode, &lockh);
         if (rc == 0) {
                 LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
-                rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
-                                      (struct lustre_handle *)&mds->mds_connh,
-                                      NULL, mds->mds_local_namespace, NULL,
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                      &flags, (void *)mds_lock_callback, NULL,
+                                      &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
                                       0, &lockh);
                 if (rc != ELDLM_OK) {
                         CERROR("lock enqueue: err: %d\n", rc);
@@ -289,7 +289,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 /* If i_file_acl is set, this inode has an EA */
                 if (S_ISREG(inode->i_mode) && inode->u.ext3_i.i_file_acl) {
                         md = lustre_msg_buf(req->rq_repmsg, offset + 1);
-                        md->lmd_size = mds->mds_max_mdsize;
+                        md->lmd_easize = mds->mds_max_mdsize;
                         mds_fs_get_md(mds, inode, md);
                 }
                 /* now a normal case for intent locking */
@@ -435,6 +435,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         struct dentry *de = NULL;
         struct dentry *dchild = NULL;
         struct mds_obd *mds = mds_req2mds(req);
+        struct obd_device *obd = req->rq_export->exp_obd;
         char *name;
         int namelen;
         struct inode *dir, *inode;
@@ -447,7 +448,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         /* a name was supplied by the client; fid1 is the directory */
         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
-        de = mds_fid2locked_dentry(mds, rec->ur_fid1, NULL, lock_mode,
+        de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, lock_mode,
                                     &lockh);
         if (IS_ERR(de)) {
                 LBUG();
@@ -457,9 +458,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
         namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
-
-        dchild = mds_name2locked_dentry(mds, de, NULL, name, namelen,
-                                        LCK_EX, &child_lockh, lock_mode);
+        dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen,
+                                    LCK_EX, &child_lockh, lock_mode);
 
         if (IS_ERR(dchild) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) {
                 LBUG();
@@ -498,7 +498,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                         if (rc < 0) { 
                                 CDEBUG(D_INFO, "No md for ino %ld err %d\n",
                                        inode->i_ino, rc);
-                                memset(md, 0, md->lmd_size); 
+                                memset(md, 0, md->lmd_easize); 
                         }
                 }
         default:
@@ -608,6 +608,7 @@ out_link:
 static int mds_reint_rename(struct mds_update_record *rec, int offset,
                             struct ptlrpc_request *req)
 {
+        struct obd_device *obd = req->rq_export->exp_obd;
         struct dentry *de_srcdir = NULL;
         struct dentry *de_tgtdir = NULL;
         struct dentry *de_old = NULL;
@@ -626,15 +627,13 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
         res_id[0] = de_srcdir->d_inode->i_ino;
 
-        rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
+        rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
                                    NULL, 0, lock_mode, &srclockh);
         if (rc == 0) {
                 LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
-                rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
-                                      (struct lustre_handle *)&mds->mds_connh,
-                                      NULL, mds->mds_local_namespace, NULL,
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                      &flags, (void *)mds_lock_callback, NULL,
+                                      &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
                                       0, &srclockh);
                 if (rc != ELDLM_OK) {
                         CERROR("lock enqueue: err: %d\n", rc);
@@ -650,15 +649,13 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
         res_id[0] = de_tgtdir->d_inode->i_ino;
 
-        rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
+        rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN,
                                    NULL, 0, lock_mode, &tgtlockh);
         if (rc == 0) {
                 LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
-                rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
-                                      (struct lustre_handle *)&mds->mds_connh,
-                                      NULL, mds->mds_local_namespace, NULL,
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
-                                      &flags, (void *)mds_lock_callback, NULL,
+                                      &flags, ldlm_completion_ast, (void *)mds_lock_callback, NULL,
                                       0, &tgtlockh);
                 if (rc != ELDLM_OK) {
                         CERROR("lock enqueue: err: %d\n", rc);
@@ -714,12 +711,10 @@ out_rename_deold:
                  * about to free, to force everyone to drop their
                  * locks. */
                 LDLM_DEBUG_NOLOCK("getting EX lock res %Lu", res_id[0]);
-                rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
-                                      (struct lustre_handle *)&mds->mds_connh,
-                                      NULL, mds->mds_local_namespace, NULL, 
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, 
                                       res_id,
                                       LDLM_PLAIN, NULL, 0, LCK_EX, &flags,
-                                      (void *)mds_lock_callback, NULL, 0, 
+                                      ldlm_completion_ast, (void *)mds_lock_callback, NULL, 0, 
                                       &oldhandle);
                 if (rc) 
                         CERROR("failed to get child inode lock (child ino %Ld, "
index 73eb518..41d4793 100644 (file)
@@ -440,7 +440,7 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
         case OBD_IOC_GETATTR: {
 
                 obd_data2conn(&conn, data);
-                err = obd_getattr(&conn, &data->ioc_obdo1);
+                err = obd_getattr(&conn, &data->ioc_obdo1, NULL);
                 if (err)
                         GOTO(out, err);
 
@@ -450,7 +450,7 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
 
         case OBD_IOC_SETATTR: {
                 obd_data2conn(&conn, data);
-                err = obd_setattr(&conn, &data->ioc_obdo1);
+                err = obd_setattr(&conn, &data->ioc_obdo1, NULL);
                 if (err)
                         GOTO(out, err);
 
index 0cd3708..3c0c33f 100644 (file)
@@ -11,8 +11,8 @@
  * by Peter Braam <braam@clusterfs.com>
  */
 
-static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.18 2002/07/26 16:54:55 rread Exp $";
-#define OBDECHO_VERSION "$Revision: 1.18 $"
+static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.19 2002/07/31 20:49:37 braam Exp $";
+#define OBDECHO_VERSION "$Revision: 1.19 $"
 
 #define EXPORT_SYMTAB
 
@@ -23,6 +23,7 @@ static char rcsid[] __attribute ((unused)) = "$Id: echo.c,v 1.18 2002/07/26 16:5
 #include <linux/locks.h>
 #include <linux/ext2_fs.h>
 #include <linux/quotaops.h>
+#include <linux/proc_fs.h>
 #include <linux/init.h>
 #include <asm/unistd.h>
 
@@ -37,6 +38,60 @@ static struct obdo OA;
 static obd_count GEN;
 static long echo_pages = 0;
 
+static atomic_t echo_page_rws;
+static atomic_t echo_getattrs;
+
+#define ECHO_PROC_STAT "sys/obdecho"
+
+int
+echo_proc_read (char *page, char **start, off_t off, int count, int *eof, void *data)
+{
+       int                len;
+        int                attrs = atomic_read (&echo_getattrs);
+        int                pages = atomic_read (&echo_page_rws);
+       
+       *eof = 1;
+       if (off != 0)
+               return (0);
+       
+       len = sprintf (page, "%d %d\n", pages, attrs);
+       
+       *start = page;
+       return (len);
+}
+
+int
+echo_proc_write (struct file *file, const char *ubuffer, unsigned long count, void *data)
+{
+       /* Ignore what we've been asked to write, and just zero the stats counters */
+        atomic_set (&echo_page_rws, 0);
+        atomic_set (&echo_getattrs, 0);
+        
+       return (count);
+}
+
+void
+echo_proc_init(void)
+{
+        struct proc_dir_entry *entry = create_proc_entry (ECHO_PROC_STAT, S_IFREG | S_IRUGO | S_IWUSR, NULL);
+
+        if (entry == NULL) 
+       {
+                CERROR("couldn't create proc entry %s\n", ECHO_PROC_STAT);
+                return;
+        }
+
+        entry->data = NULL;
+        entry->read_proc = echo_proc_read;
+       entry->write_proc = echo_proc_write;
+}
+
+void 
+echo_proc_fini(void)
+{
+        remove_proc_entry(ECHO_PROC_STAT, 0);
+}
+
 static int echo_connect(struct lustre_handle *conn, struct obd_device *obd)
 {
         int rc;
@@ -61,11 +116,14 @@ static int echo_disconnect(struct lustre_handle *conn)
         return rc;
 }
 
-static int echo_getattr(struct lustre_handle *conn, struct obdo *oa)
+static int echo_getattr(struct lustre_handle *conn, struct obdo *oa, 
+                        struct lov_stripe_md *md)
 {
         memcpy(oa, &OA, sizeof(*oa));
         oa->o_mode = ++GEN;
 
+        atomic_inc (&echo_getattrs);
+
         return 0;
 }
 
@@ -160,6 +218,8 @@ int echo_commitrw(int cmd, struct lustre_handle *conn, int objcount,
                                 GOTO(commitrw_cleanup, rc = -EFAULT);
                         }
 
+                        atomic_inc (&echo_page_rws);
+                        
                         kunmap(page);
                         __free_pages(page, 0);
                         echo_pages--;
@@ -193,11 +253,15 @@ static int __init obdecho_init(void)
 {
         printk(KERN_INFO "Echo OBD driver " OBDECHO_VERSION " info@clusterfs.com\n");
 
+        echo_proc_init();
+        
         return class_register_type(&echo_obd_ops, OBD_ECHO_DEVICENAME);
 }
 
 static void __exit obdecho_exit(void)
 {
+        echo_proc_fini ();
+        
         CERROR("%ld prep/commitrw pages leaked\n", echo_pages);
         class_unregister_type(OBD_ECHO_DEVICENAME);
 }
index 4e1ec72..22ba1f2 100644 (file)
@@ -490,7 +490,8 @@ static inline void filter_from_inode(struct obdo *oa, struct inode *inode)
         EXIT;
 }
 
-static int filter_getattr(struct lustre_handle *conn, struct obdo *oa)
+static int filter_getattr(struct lustre_handle *conn, struct obdo *oa, 
+                          struct lov_stripe_md *md)
 {
         struct obd_device *obddev = class_conn2obd(conn);
         struct dentry *dentry;
@@ -514,7 +515,8 @@ static int filter_getattr(struct lustre_handle *conn, struct obdo *oa)
         RETURN(0);
 }
 
-static int filter_setattr(struct lustre_handle *conn, struct obdo *oa)
+static int filter_setattr(struct lustre_handle *conn, struct obdo *oa, 
+                          struct lov_stripe_md *md)
 {
         struct obd_run_ctxt saved;
         struct obd_device *obd = class_conn2obd(conn);
@@ -698,15 +700,19 @@ out:
 /* NB count and offset are used for punch, but not truncate */
 static int filter_truncate(struct lustre_handle *conn, struct obdo *oa,
                            struct lov_stripe_md *md,
-                           obd_size count, obd_off offset)
+                           obd_off start, obd_off end)
 {
         int error;
         ENTRY;
 
-        CDEBUG(D_INODE, "calling truncate for object #%Ld, valid = %x, "
-               "o_size = %Ld\n", oa->o_id, oa->o_valid, oa->o_size);
-        error = filter_setattr(conn, oa);
+        if ( end != 0xffffffffffffffff ) { 
+                CERROR("PUNCH not supported, only truncate works\n"); 
+        }
 
+        CDEBUG(D_INODE, "calling truncate for object #%Ld, valid = %x, "
+               "o_size = %Ld\n", oa->o_id, oa->o_valid, start);
+        oa->o_size = start;
+        error = filter_setattr(conn, oa, NULL);
         RETURN(error);
 }
 
index 45253da..17276bc 100644 (file)
@@ -9,7 +9,7 @@ MODULE = osc
 modulefs_DATA = osc.o
 EXTRA_PROGRAMS = osc
 
-LINX= obd_pack.c ll_pack.c
+LINX= obd_pack.c ll_pack.c l_net.c
 osc_SOURCES = osc_request.c $(LINX)
 
 obd_pack.c: 
@@ -18,6 +18,9 @@ obd_pack.c:
 ll_pack.c: 
        test -e ll_pack.c || ln -sf $(top_srcdir)/lib/ll_pack.c
 
+l_net.c: 
+       test -e l_net.c || ln -sf $(top_srcdir)/lib/l_net.c .
+
 dist-hook:
        list='$(LINX)'; for f in $$list; do rm -f $(distdir)/$$f; done
 
index 96dedd4..006b745 100644 (file)
 #include <linux/obd_lov.h>
 #include <linux/init.h>
 
-static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
-                       struct ptlrpc_connection **connection,
-                       struct lustre_handle **rconn)
-{
-        struct obd_export *export = class_conn2export(conn);
-        struct osc_obd *osc = &export->exp_obd->u.osc;
-
-        *cl = osc->osc_client;
-        *connection = osc->osc_conn;
-        *rconn = &export->exp_rconnh;
-}
-
-static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
-                          struct ptlrpc_connection **connection,
-                          struct lustre_handle **rconn)
-{
-        struct obd_export *export = class_conn2export(conn);
-        struct osc_obd *osc = &export->exp_obd->u.osc;
-
-        *cl = osc->osc_ldlm_client;
-        *connection = osc->osc_conn;
-        *rconn = &export->exp_rconnh;
-}
-
-static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
-{
-        struct osc_obd *osc = &obd->u.osc;
-        //struct obd_import *import;
-        struct ptlrpc_request *request;
-        char *tmp = osc->osc_target_uuid;
-        int rc, size = sizeof(osc->osc_target_uuid);
-        ENTRY;
-
-        /* not used yet
-        OBD_ALLOC(import, sizeof(*import));
-        if (!import)
-                RETURN(-ENOMEM);
-         */
-
-        MOD_INC_USE_COUNT;
-        rc = class_connect(conn, obd);
-        if (rc)
-                RETURN(rc);
-
-        request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn,
-                                  OST_CONNECT, 1, &size, &tmp);
-        if (!request)
-                GOTO(out_disco, rc = -ENOMEM);
-
-        request->rq_level = LUSTRE_CONN_NEW;
-        request->rq_replen = lustre_msg_size(0, NULL);
-        request->rq_reqmsg->addr = -1;
-        /* Sending our local connection info breaks for local connections
-        request->rq_reqmsg->addr = conn->addr;
-        request->rq_reqmsg->cookie = conn->cookie;
-         */
-
-        rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
-        if (rc) {
-                CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
-                GOTO(out, rc);
-        }
-
-        /* XXX eventually maybe more refinement */
-        osc->osc_conn->c_level = LUSTRE_CONN_FULL;
-
-        class_rconn2export(conn, (struct lustre_handle *)request->rq_repmsg);
-
-        EXIT;
- out:
-        ptlrpc_free_req(request);
- out_disco:
-        if (rc) {
-                class_disconnect(conn);
-                MOD_DEC_USE_COUNT;
-        }
-        return rc;
-}
-
-static int osc_disconnect(struct lustre_handle *conn)
-{
-        struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
-        int rc;
-        ENTRY;
-
-        osc_con2cl(conn, &cl, &connection, &rconn);
-        request = ptlrpc_prep_req2(cl, connection, rconn,
-                                   OST_DISCONNECT, 0, NULL, NULL);
-        if (!request)
-                RETURN(-ENOMEM);
-        request->rq_replen = lustre_msg_size(0, NULL);
 
-        rc = ptlrpc_queue_wait(request);
-        if (rc)
-                GOTO(out, rc);
-        rc = class_disconnect(conn);
-        if (!rc)
-                MOD_DEC_USE_COUNT;
 
- out:
-        ptlrpc_free_req(request);
-        return rc;
-}
-
-static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
+static int osc_getattr(struct lustre_handle *conn, struct obdo *oa, 
+                       struct lov_stripe_md *md)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        osc_con2cl(conn, &cl, &connection, &rconn);
-        request = ptlrpc_prep_req2(cl, connection, rconn,
-                                   OST_GETATTR, 1, &size, NULL);
+        request = ptlrpc_prep_req2(conn, OST_GETATTR, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -178,16 +68,11 @@ static int osc_open(struct lustre_handle *conn, struct obdo *oa,
                     struct lov_stripe_md *md)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        osc_con2cl(conn, &cl, &connection, &rconn);
-        request = ptlrpc_prep_req2(cl, connection, rconn,
-                                   OST_OPEN, 1, &size, NULL);
+        request = ptlrpc_prep_req2(conn, OST_OPEN, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -217,16 +102,11 @@ static int osc_close(struct lustre_handle *conn, struct obdo *oa,
                      struct lov_stripe_md *md)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        osc_con2cl(conn, &cl, &connection, &rconn);
-        request = ptlrpc_prep_req2(cl, connection, rconn,
-                                   OST_CLOSE, 1, &size, NULL);
+        request = ptlrpc_prep_req2(conn, OST_CLOSE, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -254,19 +134,15 @@ static int osc_close(struct lustre_handle *conn, struct obdo *oa,
         return rc;
 }
 
-static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
+static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
+                       struct lov_stripe_md *md)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        osc_con2cl(conn, &cl, &connection, &rconn);
-        request = ptlrpc_prep_req2(cl, connection, rconn,
-                                  OST_SETATTR, 1, &size, NULL);
+        request = ptlrpc_prep_req2(conn, OST_SETATTR, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -288,9 +164,6 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa,
                       struct lov_stripe_md **ea)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -308,12 +181,10 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa,
                 OBD_ALLOC(*ea, oa->o_easize);
                 if (!*ea)
                         RETURN(-ENOMEM);
-                (*ea)->lmd_size = oa->o_easize;
+                (*ea)->lmd_easize = oa->o_easize;
         }
 
-        osc_con2cl(conn, &cl, &connection, &rconn);
-        request = ptlrpc_prep_req2(cl, connection, rconn,
-                                  OST_CREATE, 1, &size, NULL);
+        request = ptlrpc_prep_req2(conn, OST_CREATE, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -339,13 +210,10 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa,
 }
 
 static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
-                     struct lov_stripe_md *md, obd_size count,
-                     obd_off offset)
+                     struct lov_stripe_md *md, obd_size start,
+                     obd_size end)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -354,16 +222,18 @@ static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
                 CERROR("oa NULL\n");
                 RETURN(-EINVAL);
         }
-        osc_con2cl(conn, &cl, &connection, &rconn);
-        request = ptlrpc_prep_req2(cl, connection, rconn,
-                                   OST_PUNCH, 1, &size, NULL);
+
+        request = ptlrpc_prep_req2(conn, OST_PUNCH, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
         body = lustre_msg_buf(request->rq_reqmsg, 0);
         memcpy(&body->oa, oa, sizeof(*oa));
-        body->oa.o_blocks = count;
-        body->oa.o_valid |= OBD_MD_FLBLOCKS;
+
+        /* overload the blocks and size fields in the oa with start/end */ 
+        body->oa.o_blocks = start;
+        body->oa.o_size = end;
+        body->oa.o_valid |= OBD_MD_FLBLOCKS | OBD_MD_FLSIZE;
 
         request->rq_replen = lustre_msg_size(1, &size);
 
@@ -385,9 +255,6 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
                        struct lov_stripe_md *ea)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -396,9 +263,7 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
                 CERROR("oa NULL\n");
                 RETURN(-EINVAL);
         }
-        osc_con2cl(conn, &cl, &connection, &rconn);
-        request = ptlrpc_prep_req2(cl, connection, rconn,
-                                   OST_DESTROY, 1, &size, NULL);
+        request = ptlrpc_prep_req2(conn, OST_DESTROY, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -476,9 +341,7 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
                         obd_size *count, obd_off *offset, obd_flag *flags,
                         brw_callback_t callback, void *data)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
+        struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
         struct ptlrpc_request *request = NULL;
         struct ptlrpc_bulk_desc *desc = NULL;
         struct ost_body *body;
@@ -486,14 +349,13 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
         int rc, size[3] = {sizeof(*body)};
         void *iooptr, *nioptr;
         int mapped = 0;
+        __u32 xid;
         ENTRY;
 
         size[1] = sizeof(struct obd_ioobj);
         size[2] = page_count * sizeof(struct niobuf_remote);
 
-        osc_con2cl(conn, &cl, &connection, &rconn);
-        request = ptlrpc_prep_req2(cl, connection, rconn,
-                                   OST_READ, 3, size, NULL);
+        request = ptlrpc_prep_req2(conn, OST_READ, 3, size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -517,14 +379,16 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *md,
         ost_pack_ioo(&iooptr, md, page_count);
         /* end almost identical to brw_write case */
 
+        spin_lock(&connection->c_lock);
+        xid = ++connection->c_xid_out;       /* single xid for all pages */
+        spin_unlock(&connection->c_lock);
+
         for (mapped = 0; mapped < page_count; mapped++) {
                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
                 if (bulk == NULL)
                         GOTO(out_unmap, rc = -ENOMEM);
 
-                spin_lock(&connection->c_lock);
-                bulk->b_xid = ++connection->c_xid_out;
-                spin_unlock(&connection->c_lock);
+                bulk->b_xid = xid;           /* single xid for all pages */
 
                 bulk->b_buf = kmap(page_array[mapped]);
                 bulk->b_page = page_array[mapped];
@@ -603,9 +467,7 @@ static int osc_brw_write(struct lustre_handle *conn,
                          obd_off *offset, obd_flag *flags,
                          brw_callback_t callback, void *data)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
+        struct ptlrpc_connection *connection = client_conn2cli(conn)->cl_conn;
         struct ptlrpc_request *request = NULL;
         struct ptlrpc_bulk_desc *desc = NULL;
         struct ost_body *body;
@@ -620,9 +482,7 @@ static int osc_brw_write(struct lustre_handle *conn,
         size[1] = sizeof(struct obd_ioobj);
         size[2] = page_count * sizeof(*remote);
 
-        osc_con2cl(conn, &cl, &connection, &rconn);
-        request = ptlrpc_prep_req2(cl, connection, rconn,
-                                   OST_WRITE, 3, size, NULL);
+        request = ptlrpc_prep_req2(conn, OST_WRITE, 3, size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -754,16 +614,13 @@ static int osc_brw(int cmd, struct lustre_handle *conn,
                                     offset, flags, callback, data);
 }
 
-static int osc_enqueue(struct lustre_handle *conn,
+static int osc_enqueue(struct lustre_handle *connh,
                        struct lustre_handle *parent_lock, __u64 *res_id,
                        __u32 type, void *extentp, int extent_len, __u32 mode,
                        int *flags, void *callback, void *data, int datalen,
                        struct lustre_handle *lockh)
 {
-        struct obd_device *obddev = class_conn2obd(conn);
-        struct ptlrpc_connection *connection;
-        struct ptlrpc_client *cl;
-        struct lustre_handle *rconn;
+        struct obd_device *obddev = class_conn2obd(connh);
         struct ldlm_extent *extent = extentp;
         int rc;
         __u32 mode2;
@@ -774,7 +631,7 @@ static int osc_enqueue(struct lustre_handle *conn,
         extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
 
         /* Next, search for already existing extent locks that will cover us */
-        osc_con2dlmcl(conn, &cl, &connection, &rconn);
+        //osc_con2dlmcl(conn, &cl, &connection, &rconn);
         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
                              sizeof(extent), mode, lockh);
         if (rc == 1) {
@@ -804,16 +661,16 @@ static int osc_enqueue(struct lustre_handle *conn,
                 if (mode == LCK_PR)
                         return 0;
 
-                rc = ldlm_cli_convert(cl, lockh, rconn, mode, &flags);
+                rc = ldlm_cli_convert(lockh, mode, &flags);
                 if (rc)
                         LBUG();
 
                 return rc;
         }
 
-        rc = ldlm_cli_enqueue(cl, connection, rconn, NULL,obddev->obd_namespace,
+        rc = ldlm_cli_enqueue(connh, NULL,obddev->obd_namespace,
                               parent_lock, res_id, type, extent, sizeof(extent),
-                              mode, flags, callback, data, datalen, lockh);
+                              mode, flags, ldlm_completion_ast, callback, data, datalen, lockh);
         return rc;
 }
 
@@ -827,103 +684,14 @@ static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
         RETURN(0);
 }
 
-static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
-{
-        struct obd_ioctl_data* data = buf;
-        struct osc_obd *osc = &obddev->u.osc;
-        char server_uuid[37];
-        int rc;
-        ENTRY;
-
-        if (data->ioc_inllen1 < 1) {
-                CERROR("osc setup requires a TARGET UUID\n");
-                RETURN(-EINVAL);
-        }
-
-        if (data->ioc_inllen1 > 37) {
-                CERROR("osc TARGET UUID must be less than 38 characters\n");
-                RETURN(-EINVAL);
-        }
-
-        if (data->ioc_inllen2 < 1) {
-                CERROR("osc setup requires a SERVER UUID\n");
-                RETURN(-EINVAL);
-        }
-
-        if (data->ioc_inllen2 > 37) {
-                CERROR("osc SERVER UUID must be less than 38 characters\n");
-                RETURN(-EINVAL);
-        }
-
-        memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
-        memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
-                                                   sizeof(server_uuid)));
-
-        osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
-        if (!osc->osc_conn)
-                RETURN(-ENOENT);
-
-        obddev->obd_namespace =
-                ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
-        if (obddev->obd_namespace == NULL)
-                GOTO(out_conn, rc = -ENOMEM);
-
-        OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
-        if (osc->osc_client == NULL)
-                GOTO(out_ns, rc = -ENOMEM);
-
-        OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
-        if (osc->osc_ldlm_client == NULL)
-                GOTO(out_client, rc = -ENOMEM);
-
-        ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
-                           osc->osc_client);
-        ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
-                           osc->osc_ldlm_client);
-        osc->osc_client->cli_name = "osc";
-        osc->osc_ldlm_client->cli_name = "ldlm";
-
-        MOD_INC_USE_COUNT;
-        RETURN(0);
-
- out_client:
-        OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
- out_ns:
-        ldlm_namespace_free(obddev->obd_namespace);
- out_conn:
-        ptlrpc_put_connection(osc->osc_conn);
-        return rc;
-}
-
-static int osc_cleanup(struct obd_device * obddev)
-{
-        struct osc_obd *osc = &obddev->u.osc;
-
-        ldlm_namespace_free(obddev->obd_namespace);
-
-        ptlrpc_cleanup_client(osc->osc_client);
-        OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
-        ptlrpc_cleanup_client(osc->osc_ldlm_client);
-        OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
-        ptlrpc_put_connection(osc->osc_conn);
-
-        MOD_DEC_USE_COUNT;
-        return 0;
-}
-
 static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct lustre_handle *rconn;
         struct obd_statfs *osfs;
         int rc, size = sizeof(*osfs);
         ENTRY;
 
-        osc_con2cl(conn, &cl, &connection, &rconn);
-        request = ptlrpc_prep_req2(cl, connection, rconn,
-                                   OST_STATFS, 0, NULL, NULL);
+        request = ptlrpc_prep_req2(conn, OST_STATFS, 0, NULL, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -946,8 +714,8 @@ static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs)
 }
 
 struct obd_ops osc_obd_ops = {
-        o_setup:        osc_setup,
-        o_cleanup:      osc_cleanup,
+        o_setup:        client_obd_setup,
+        o_cleanup:      client_obd_cleanup,
         o_statfs:       osc_statfs,
         o_create:       osc_create,
         o_destroy:      osc_destroy,
@@ -955,8 +723,8 @@ struct obd_ops osc_obd_ops = {
         o_setattr:      osc_setattr,
         o_open:         osc_open,
         o_close:        osc_close,
-        o_connect:      osc_connect,
-        o_disconnect:   osc_disconnect,
+        o_connect:      client_obd_connect,
+        o_disconnect:   client_obd_disconnect,
         o_brw:          osc_brw,
         o_punch:        osc_punch,
         o_enqueue:      osc_enqueue,
index d86e05f..333ca79 100644 (file)
@@ -71,7 +71,7 @@ static int ost_getattr(struct ptlrpc_request *req)
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0);
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_getattr(conn, &repbody->oa);
+        req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
         RETURN(0);
 }
 
@@ -192,7 +192,7 @@ static int ost_setattr(struct ptlrpc_request *req)
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0);
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_setattr(conn, &repbody->oa);
+        req->rq_status = obd_setattr(conn, &repbody->oa, NULL);
         RETURN(0);
 }
 
@@ -297,6 +297,8 @@ static int ost_brw_write(struct ptlrpc_request *req)
         void *tmp1, *tmp2, *end2;
         void *desc_priv = NULL;
         int reply_sent = 0;
+        struct ptlrpc_service *srv;
+        __u32 xid;
         ENTRY;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
@@ -343,17 +345,19 @@ static int ost_brw_write(struct ptlrpc_request *req)
         desc->b_desc_private = desc_priv;
         memcpy(&(desc->b_conn), &conn, sizeof(conn));
 
+        srv = req->rq_obd->u.ost.ost_service;
+        spin_lock(&srv->srv_lock);
+        xid = srv->srv_xid++;                   /* single xid for all pages */
+        spin_unlock(&srv->srv_lock);
+
         for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
-                struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
                 struct ptlrpc_bulk_page *bulk;
 
                 bulk = ptlrpc_prep_bulk_page(desc);
                 if (bulk == NULL)
                         GOTO(fail_bulk, rc = -ENOMEM);
 
-                spin_lock(&srv->srv_lock);
-                bulk->b_xid = srv->srv_xid++;
-                spin_unlock(&srv->srv_lock);
+                bulk->b_xid = xid;              /* single xid for all pages */
 
                 bulk->b_buf = lnb->addr;
                 bulk->b_page = lnb->page;
@@ -489,6 +493,36 @@ static int ost_handle(struct ptlrpc_request *req)
                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
                 rc = ost_statfs(req);
                 break;
+        case LDLM_ENQUEUE:
+                CDEBUG(D_INODE, "enqueue\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
+                rc = ldlm_handle_enqueue(req);
+                if (rc)
+                        break;
+                RETURN(0);
+        case LDLM_CONVERT:
+                CDEBUG(D_INODE, "convert\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
+                rc = ldlm_handle_convert(req);
+                if (rc)
+                        break;
+                RETURN(0);
+        case LDLM_CANCEL:
+                CDEBUG(D_INODE, "cancel\n");
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
+                rc = ldlm_handle_cancel(req);
+                if (rc)
+                        break;
+                RETURN(0);
+        case LDLM_CALLBACK:
+                CDEBUG(D_INODE, "callback\n");
+                CERROR("callbacks should not happen on MDS\n");
+                LBUG();
+                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
+                break;
+
+
+
         default:
                 req->rq_status = -ENOTSUPP;
                 rc = ptlrpc_error(req->rq_svc, req);
@@ -526,12 +560,18 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
         int i;
         ENTRY;
 
-        if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
-                RETURN(-ENODEV);
+        if (data->ioc_inllen1 < 1) {
+                CERROR("requires a TARGET OBD UUID\n");
+                RETURN(-EINVAL);
+        }
+        if (data->ioc_inllen1 > 37) {
+                CERROR("OBD UUID must be less than 38 characters\n");
+                RETURN(-EINVAL);
+        }
 
         MOD_INC_USE_COUNT;
-        tgt = &obd_dev[data->ioc_dev];
-        if (!(tgt->obd_flags & OBD_ATTACHED) ||
+        tgt = class_uuid2obd(data->ioc_inlbuf1);
+        if (!tgt || !(tgt->obd_flags & OBD_ATTACHED) ||
             !(tgt->obd_flags & OBD_SET_UP)) {
                 CERROR("device not attached or not set up (%d)\n",
                        data->ioc_dev);
@@ -552,8 +592,9 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
         }
 
         for (i = 0; i < OST_NUM_THREADS; i++) {
-                err = ptlrpc_start_thread(obddev, ost->ost_service,
-                                          "lustre_ost");
+                char name[32];
+                sprintf(name, "lustre_ost_%2d", i);
+                err = ptlrpc_start_thread(obddev, ost->ost_service, name);
                 if (err) {
                         CERROR("error starting thread #%d: rc %d\n", i, err);
                         GOTO(error_disc, err = -EINVAL);
index b135afe..983b071 100644 (file)
@@ -23,6 +23,7 @@
 #define DEBUG_SUBSYSTEM S_RPC
 
 #include <linux/obd_support.h>
+#include <linux/obd_class.h>
 #include <linux/lustre_lib.h>
 #include <linux/lustre_ha.h>
 
@@ -94,10 +95,11 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
         OBD_ALLOC(desc, sizeof(*desc));
         if (desc != NULL) {
                 desc->b_connection = ptlrpc_connection_addref(conn);
-                atomic_set(&desc->b_pages_remaining, 0);
                 atomic_set(&desc->b_refcount, 1);
                 init_waitqueue_head(&desc->b_waitq);
                 INIT_LIST_HEAD(&desc->b_page_list);
+                ptl_set_inv_handle(&desc->b_md_h);
+                ptl_set_inv_handle(&desc->b_me_h);
         }
 
         return desc;
@@ -110,11 +112,8 @@ struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc)
         OBD_ALLOC(bulk, sizeof(*bulk));
         if (bulk != NULL) {
                 bulk->b_desc = desc;
-                ptl_set_inv_handle(&bulk->b_md_h);
-                ptl_set_inv_handle(&bulk->b_me_h);
                 list_add_tail(&bulk->b_link, &desc->b_page_list);
                 desc->b_page_count++;
-                atomic_inc(&desc->b_pages_remaining);
         }
         return bulk;
 }
@@ -198,15 +197,25 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
 
         RETURN(request);
 }
-struct ptlrpc_request *ptlrpc_prep_req2(struct ptlrpc_client *cl,
-                                        struct ptlrpc_connection *conn,
-                                        struct lustre_handle *handle, 
+struct ptlrpc_request *ptlrpc_prep_req2(struct lustre_handle *conn, 
                                        int opcode, int count, int *lengths,
                                        char **bufs)
 {
+        struct client_obd *clobd; 
         struct ptlrpc_request *req;
-        req = ptlrpc_prep_req(cl, conn, opcode, count, lengths, bufs);
-        ptlrpc_hdl2req(req, handle);
+        struct obd_export *export;
+
+        export = class_conn2export(conn);
+        if (!export) { 
+                LBUG();
+                CERROR("NOT connected\n"); 
+                return NULL;
+        }
+
+        clobd = &export->exp_obd->u.cli;
+        req = ptlrpc_prep_req(clobd->cl_client, clobd->cl_conn, 
+                              opcode, count, lengths, bufs);
+        ptlrpc_hdl2req(req, &clobd->cl_exporth);
         return req;
 }
 
index 3bf879f..4cc7a74 100644 (file)
@@ -37,6 +37,8 @@ static int request_out_callback(ptl_event_t *ev)
 {
         ENTRY;
 
+        LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */
+
         if (ev->type != PTL_EVENT_SENT) {
                 // XXX make sure we understand all events, including ACK's
                 CERROR("Unknown event %d\n", ev->type);
@@ -54,6 +56,8 @@ static int reply_out_callback(ptl_event_t *ev)
 {
         ENTRY;
 
+        LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* replies always contiguous */
+
         if (ev->type == PTL_EVENT_SENT) {
                 OBD_FREE(ev->mem_desc.start, ev->mem_desc.length);
         } else {
@@ -73,6 +77,8 @@ static int reply_in_callback(ptl_event_t *ev)
         struct ptlrpc_request *req = ev->mem_desc.user_ptr;
         ENTRY;
 
+        LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* replies always contiguous */
+
         if (req->rq_xid == 0x5a5a5a5a5a5a5a5a) {
                 CERROR("Reply received for freed request!  Probably a missing "
                        "ptlrpc_abort()\n");
@@ -101,6 +107,8 @@ int request_in_callback(ptl_event_t *ev)
 {
         struct ptlrpc_service *service = ev->mem_desc.user_ptr;
 
+        LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */
+        
         if (ev->rlength != ev->mlength)
                 CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
                        ev->mlength, ev->rlength);
@@ -115,22 +123,30 @@ int request_in_callback(ptl_event_t *ev)
 
 static int bulk_source_callback(ptl_event_t *ev)
 {
-        struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
-        struct ptlrpc_bulk_desc *desc = bulk->b_desc;
+        struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
+        struct ptlrpc_bulk_page *bulk;
+        struct list_head        *tmp;
+        struct list_head        *next;
         ENTRY;
 
+        /* 1 fragment for each page always */
+        LASSERT (ev->mem_desc.niov == desc->b_page_count);
+        
         if (ev->type == PTL_EVENT_SENT) {
                 CDEBUG(D_NET, "got SENT event\n");
         } else if (ev->type == PTL_EVENT_ACK) {
                 CDEBUG(D_NET, "got ACK event\n");
-                if (bulk->b_cb != NULL)
-                        bulk->b_cb(bulk);
-                if (atomic_dec_and_test(&desc->b_pages_remaining)) {
-                        desc->b_flags |= PTL_BULK_FL_SENT;
-                        wake_up(&desc->b_waitq);
-                        if (desc->b_cb != NULL)
-                                desc->b_cb(desc, desc->b_cb_data);
+                
+                list_for_each_safe(tmp, next, &desc->b_page_list) {
+                        bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+                        
+                        if (bulk->b_cb != NULL)
+                                bulk->b_cb(bulk);
                 }
+                desc->b_flags |= PTL_BULK_FL_SENT;
+                wake_up(&desc->b_waitq);
+                if (desc->b_cb != NULL)
+                        desc->b_cb(desc, desc->b_cb_data);
         } else {
                 CERROR("Unexpected event type!\n");
                 LBUG();
@@ -141,21 +157,36 @@ static int bulk_source_callback(ptl_event_t *ev)
 
 static int bulk_sink_callback(ptl_event_t *ev)
 {
-        struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr;
-        struct ptlrpc_bulk_desc *desc = bulk->b_desc;
+        struct ptlrpc_bulk_desc *desc = ev->mem_desc.user_ptr;
+        struct ptlrpc_bulk_page *bulk;
+        struct list_head        *tmp;
+        struct list_head        *next;
+        ptl_size_t               total = 0;
         ENTRY;
 
         if (ev->type == PTL_EVENT_PUT) {
-                if (bulk->b_buf != ev->mem_desc.start + ev->offset)
-                        CERROR("bulkbuf != mem_desc -- why?\n");
-                if (bulk->b_cb != NULL)
-                        bulk->b_cb(bulk);
-                if (atomic_dec_and_test(&desc->b_pages_remaining)) {
-                        desc->b_flags |= PTL_BULK_FL_RCVD;
-                        wake_up(&desc->b_waitq);
-                        if (desc->b_cb != NULL)
-                                desc->b_cb(desc, desc->b_cb_data);
+                /* put with zero offset */
+                LASSERT (ev->offset == 0);
+                /* used iovs */
+                LASSERT ((ev->mem_desc.options & PTL_MD_IOV) != 0);
+                /* 1 fragment for each page always */
+                LASSERT (ev->mem_desc.niov == desc->b_page_count);
+
+                list_for_each_safe (tmp, next, &desc->b_page_list) {
+                        bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
+
+                        total += bulk->b_buflen;
+                        
+                        if (bulk->b_cb != NULL)
+                                bulk->b_cb(bulk);
                 }
+
+                LASSERT (ev->mem_desc.length == total);
+                
+                desc->b_flags |= PTL_BULK_FL_RCVD;
+                wake_up(&desc->b_waitq);
+                if (desc->b_cb != NULL)
+                        desc->b_cb(desc, desc->b_cb_data);
         } else {
                 CERROR("Unexpected event type!\n");
                 LBUG();
index 408939c..0f7c955 100644 (file)
@@ -119,47 +119,96 @@ static int ptl_send_buf(struct ptlrpc_request *request,
         return rc;
 }
 
+static inline struct iovec *
+ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc)
+{
+        struct iovec *iov;
+        
+        if (desc->b_page_count <= sizeof (desc->b_iov)/sizeof (struct iovec))
+                return (desc->b_iov);
+        
+        OBD_ALLOC (iov, desc->b_page_count * sizeof (struct iovec));
+        if (iov == NULL)
+                LBUG();
+        
+        return (iov);
+}
+
+static inline void
+ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, struct iovec *iov)
+{
+        if (desc->b_page_count <= sizeof (desc->b_iov)/sizeof (struct iovec))
+                return;
+
+        OBD_FREE (iov, desc->b_page_count * sizeof (struct iovec));
+}
+
 int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc)
 {
         int rc;
         struct list_head *tmp, *next;
         ptl_process_id_t remote_id;
+        __u32 xid = 0;
+        struct iovec *iov;
         ENTRY;
 
+        iov = ptlrpc_get_bulk_iov (desc);
+        if (iov == NULL)
+                RETURN (-ENOMEM);
+
+        desc->b_md.start = iov;
+        desc->b_md.niov = 0;
+        desc->b_md.length = 0;
+        desc->b_md.eventq = bulk_source_eq;
+        desc->b_md.threshold = 2; /* SENT and ACK */
+        desc->b_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
+        desc->b_md.user_ptr = desc;
+
         list_for_each_safe(tmp, next, &desc->b_page_list) {
                 struct ptlrpc_bulk_page *bulk;
                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
 
-                bulk->b_md.start = bulk->b_buf;
-                bulk->b_md.length = bulk->b_buflen;
-                bulk->b_md.eventq = bulk_source_eq;
-                bulk->b_md.threshold = 2; /* SENT and ACK */
-                bulk->b_md.options = PTL_MD_OP_PUT;
-                bulk->b_md.user_ptr = bulk;
-
-                rc = PtlMDBind(desc->b_connection->c_peer.peer_ni, bulk->b_md,
-                               &bulk->b_md_h);
-                if (rc != 0) {
-                        CERROR("PtlMDBind failed: %d\n", rc);
-                        LBUG();
-                        RETURN(rc);
-                }
-
-                remote_id.nid = desc->b_connection->c_peer.peer_nid;
-                remote_id.pid = 0;
-
-                CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n",
-                       bulk->b_md.length, desc->b_portal, bulk->b_xid);
-
-                rc = PtlPut(bulk->b_md_h, PTL_ACK_REQ, remote_id,
-                            desc->b_portal, 0, bulk->b_xid, 0, 0);
-                if (rc != PTL_OK) {
-                        CERROR("PtlPut(%Lu, %d, %d) failed: %d\n",
-                               remote_id.nid, desc->b_portal, bulk->b_xid, rc);
-                        PtlMDUnlink(bulk->b_md_h);
-                        LBUG();
-                        RETURN(rc);
-                }
+                LASSERT (desc->b_md.niov < desc->b_page_count);
+
+                if (desc->b_md.niov == 0)
+                        xid = bulk->b_xid;
+                LASSERT (xid == bulk->b_xid);   /* should all be the same */
+                
+                iov[desc->b_md.niov].iov_base = bulk->b_buf;
+                iov[desc->b_md.niov].iov_len = bulk->b_buflen;
+                desc->b_md.niov++;
+                desc->b_md.length += bulk->b_buflen;
+        }
+        
+        LASSERT (desc->b_md.niov == desc->b_page_count);
+        LASSERT (desc->b_md.niov != 0);
+        
+        rc = PtlMDBind(desc->b_connection->c_peer.peer_ni, desc->b_md,
+                       &desc->b_md_h);
+
+        ptlrpc_put_bulk_iov (desc, iov);        /* move down to reduce latency to send */
+
+        if (rc != PTL_OK) {
+                CERROR("PtlMDBind failed: %d\n", rc);
+                LBUG();
+                RETURN(rc);
+        }
+        
+        remote_id.nid = desc->b_connection->c_peer.peer_nid;
+        remote_id.pid = 0;
+        
+        CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d nid %Lx pid %d xid %d\n",
+               desc->b_md.niov, desc->b_md.length, 
+               desc->b_portal, remote_id.nid, remote_id.pid, xid);
+        
+        rc = PtlPut(desc->b_md_h, PTL_ACK_REQ, remote_id,
+                    desc->b_portal, 0, xid, 0, 0);
+        if (rc != PTL_OK) {
+                CERROR("PtlPut(%Lu, %d, %d) failed: %d\n",
+                       remote_id.nid, desc->b_portal, xid, rc);
+                PtlMDUnlink(desc->b_md_h);
+                LBUG();
+                RETURN(rc);
         }
 
         RETURN(0);
@@ -169,40 +218,64 @@ int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *desc)
 {
         struct list_head *tmp, *next;
         int rc;
+        __u32 xid = 0;
+        struct iovec *iov;
         ENTRY;
 
+        iov = ptlrpc_get_bulk_iov (desc);
+        if (iov == NULL)
+                return (-ENOMEM);
+
+        desc->b_md.start = iov;
+        desc->b_md.niov = 0;
+        desc->b_md.length = 0;
+        desc->b_md.threshold = 1;
+        desc->b_md.options = PTL_MD_OP_PUT | PTL_MD_IOV;
+        desc->b_md.user_ptr = desc;
+        desc->b_md.eventq = bulk_sink_eq;
+
         list_for_each_safe(tmp, next, &desc->b_page_list) {
                 struct ptlrpc_bulk_page *bulk;
                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
 
-                rc = PtlMEAttach(desc->b_connection->c_peer.peer_ni,
-                                 desc->b_portal, local_id, bulk->b_xid, 0,
-                                 PTL_UNLINK, PTL_INS_AFTER, &bulk->b_me_h);
-                if (rc != PTL_OK) {
-                        CERROR("PtlMEAttach failed: %d\n", rc);
-                        LBUG();
-                        GOTO(cleanup, rc);
-                }
-
-                bulk->b_md.start = bulk->b_buf;
-                bulk->b_md.length = bulk->b_buflen;
-                bulk->b_md.threshold = 1;
-                bulk->b_md.options = PTL_MD_OP_PUT;
-                bulk->b_md.user_ptr = bulk;
-                bulk->b_md.eventq = bulk_sink_eq;
-
-                rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK,
-                                 &bulk->b_md_h);
-                if (rc != PTL_OK) {
-                        CERROR("PtlMDAttach failed: %d\n", rc);
-                        LBUG();
-                        GOTO(cleanup, rc);
-                }
-
-                CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, "
-                       "portal %u\n", bulk->b_buflen, bulk->b_xid,
-                       desc->b_portal);
+                LASSERT (desc->b_md.niov < desc->b_page_count);
+
+                if (desc->b_md.niov == 0)
+                        xid = bulk->b_xid;
+                LASSERT (xid == bulk->b_xid);   /* should all be the same */
+                
+                iov[desc->b_md.niov].iov_base = bulk->b_buf;
+                iov[desc->b_md.niov].iov_len = bulk->b_buflen;
+                desc->b_md.niov++;
+                desc->b_md.length += bulk->b_buflen;
+        }
+
+        LASSERT (desc->b_md.niov == desc->b_page_count);
+        LASSERT (desc->b_md.niov != 0);
+        
+        rc = PtlMEAttach(desc->b_connection->c_peer.peer_ni,
+                         desc->b_portal, local_id, xid, 0,
+                         PTL_UNLINK, PTL_INS_AFTER, &desc->b_me_h);
+
+        ptlrpc_put_bulk_iov (desc, iov);
+
+        if (rc != PTL_OK) {
+                CERROR("PtlMEAttach failed: %d\n", rc);
+                LBUG();
+                GOTO(cleanup, rc);
         }
+        
+        rc = PtlMDAttach(desc->b_me_h, desc->b_md, PTL_UNLINK,
+                         &desc->b_md_h);
+        if (rc != PTL_OK) {
+                CERROR("PtlMDAttach failed: %d\n", rc);
+                LBUG();
+                GOTO(cleanup, rc);
+        }
+        
+        CDEBUG(D_NET, "Setup bulk sink buffers: %u pages %u bytes, xid %u, "
+               "portal %u\n", desc->b_md.niov, desc->b_md.length, 
+               xid, desc->b_portal);
 
         RETURN(0);
 
@@ -214,17 +287,10 @@ int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *desc)
 
 int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc)
 {
-        struct list_head *tmp, *next;
-
-        list_for_each_safe(tmp, next, &desc->b_page_list) {
-                struct ptlrpc_bulk_page *bulk;
-                bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
-
-                /* This should be safe: these handles are initialized to be
-                 * invalid in ptlrpc_prep_bulk_page() */
-                PtlMDUnlink(bulk->b_md_h);
-                PtlMEUnlink(bulk->b_me_h);
-        }
+        /* This should be safe: these handles are initialized to be
+         * invalid in ptlrpc_prep_bulk() */
+        PtlMDUnlink(desc->b_md_h);
+        PtlMEUnlink(desc->b_me_h);
 
         return 0;
 }
index 6aa5f65..36e4b7c 100644 (file)
@@ -138,6 +138,7 @@ static int handle_incoming_request(struct obd_device *obddev,
 
         /* FIXME: If we move to an event-driven model, we should put the request
          * on the stack of mds_handle instead. */
+        LASSERT ((event->mem_desc.options & PTL_MD_IOV) == 0);
         start = event->mem_desc.start;
 
         memset(&request, 0, sizeof(request));
@@ -199,6 +200,8 @@ void ptlrpc_rotate_reqbufs(struct ptlrpc_service *service,
 {
         int index;
 
+        LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0);
+
         for (index = 0; index < service->srv_ring_length; index++)
                 if (service->srv_buf[index] == ev->mem_desc.start)
                         break;
index f50b8ef..39a89c1 100644 (file)
@@ -17,7 +17,7 @@ noinst_DATA = ext2_10000.gz ext2_25000.gz ext3_10000.gz lov.xml
 noinst_SCRIPTS = fs.sh intent-test.sh intent-test2.sh leak_finder.pl \
        lldlm.sh llecho.sh llext3.sh llmodules.sh llmount-client.sh \
        llmount-server.sh llmount.sh llmountcleanup.sh llrext3.sh \
-       llrmount.sh llsimple.sh mdcreq.sh mdcreqcleanup.sh \
+       llrmount.sh llsimple.sh  mdcreq.sh mdcreqcleanup.sh \
        ostreq.sh runfailure-client-mds-recover.sh runfailure-mds \
        runfailure-net runfailure-ost runiozone runregression-net.sh \
        runtests runvmstat snaprun.sh
index 2801464..87078a8 100644 (file)
@@ -398,7 +398,7 @@ setup_ost() {
        $OBDCTL <<- EOF || return $?
        newdev
        attach ost OSTDEV OSTUUID
-       setup \$OBDDEV
+       setup OBDUUID
        quit
        EOF
 }
@@ -482,8 +482,8 @@ setup_mount() {
                fi
 
                [ ! -d $MTPT ] && mkdir $MTPT
-               echo mount -t lustre_lite -o ost=${THEOSC}-UUID,mds=${THEMDC}-UUID none $MTPT
-               mount -t lustre_lite -o ost=${THEOSC}-UUID,mds=${THEMDC}-UUID none $MTPT
+               echo mount -t lustre_lite -o osc=${THEOSC}-UUID,mdc=${THEMDC}-UUID none $MTPT
+                mount -t lustre_lite -o osc=${THEOSC}-UUID,mdc=${THEMDC}-UUID none $MTPT
            done
        done
 }
index 3e6cfbb..9113f17 100755 (executable)
@@ -5,7 +5,7 @@ MTPT=/mnt/lustre
 remount() {
     umount $MTPT || exit -1
     debugctl clear
-    mount -t lustre_lite -o ost=OSCDEV-UUID,mds=MDCDEV-UUID none $MTPT
+    mount -t lustre_lite -o osc=OSCDEV-UUID,mdc=MDCDEV-UUID none $MTPT
 }
 
 # Test mkdir
index 3df8172..4828f26 100644 (file)
@@ -1,4 +1,4 @@
-#!/bin/sh
+#!/bin/sh -vx
 
 SRCDIR="`dirname $0`/"
 [ -f $SRCDIR/common.sh ] || SRCDIR="/lib/lustre"
index 0af48e8..f636d69 100644 (file)
@@ -29,7 +29,7 @@
 
   <lov name="lov1" uuid="lov1-UUID">
     <mdc_ref uuidref="mdc1-UUID"/>
-    <devices stripesize="65536" pattern="0">
+    <devices stripesize="65536" stripeoffset="0" pattern="0">
       <osc_ref uuidref="osc1-UUID"/>
       <osc_ref uuidref="osc2-UUID"/>
     </devices>
index d5e61ba..24e05b3 100644 (file)
@@ -501,7 +501,7 @@ int jt_dev_lov_config(int argc, char **argv)
         int rc, size, i;
 
         IOCINIT(data);
-        if (argc <= 5)
+        if (argc <= 6)
                 return CMD_HELP;
 
         if (strlen(argv[1]) > sizeof(uuid_t) - 1) { 
@@ -512,10 +512,11 @@ int jt_dev_lov_config(int argc, char **argv)
             
         memset(&desc, 0, sizeof(desc)); 
         strcpy(desc.ld_uuid, argv[1]); 
-        desc.ld_default_stripecount = strtoul(argv[2], NULL, 0); 
-        desc.ld_default_stripesize = strtoul(argv[3], NULL, 0); 
-        desc.ld_pattern = strtoul(argv[4], NULL, 0); 
-        desc.ld_tgt_count = argc - 5;
+        desc.ld_default_stripe_count = strtoul(argv[2], NULL, 0); 
+        desc.ld_default_stripe_size = (__u64) strtoul(argv[3], NULL, 0); 
+        desc.ld_default_stripe_offset = (__u64) strtoul(argv[3], NULL, 0); 
+        desc.ld_pattern = strtoul(argv[5], NULL, 0); 
+        desc.ld_tgt_count = argc - 6;
 
 
         size = sizeof(uuid_t) * desc.ld_tgt_count;
@@ -526,8 +527,8 @@ int jt_dev_lov_config(int argc, char **argv)
                 return -ENOMEM;
         }
         memset(uuidarray, 0, size); 
-        for (i=5 ; i < argc ; i++) { 
-                char *buf = (char *) (uuidarray + i -5 );
+        for (i=6 ; i < argc ; i++) { 
+                char *buf = (char *) (uuidarray + i -6 );
                 if (strlen(argv[i]) >= sizeof(uuid_t)) { 
                         fprintf(stderr, "lov_config: arg %d (%s) too long\n",  
                                 i, argv[i]);
index a5c93fd..b5b536a 100755 (executable)
@@ -187,12 +187,12 @@ class LCTLInterface:
         self.run(cmds)
 
     # create an lov
-    def lovconfig(self, uuid, mdcuuid, stripe_cnt, stripe_sz, pattern, devlist):
+    def lovconfig(self, uuid, mdcuuid, stripe_cnt, stripe_sz, stripe_off, pattern, devlist):
         cmds = """
   device $%s
   probe
-  lovconfig %s %d %d %s %s
-  quit""" % (mdcuuid, uuid, stripe_cnt, stripe_sz, pattern, devlist)
+  lovconfig %s %d %d %d %s %s
+  quit""" % (mdcuuid, uuid, stripe_cnt, stripe_sz, stripe_off, pattern, devlist)
         self.run(cmds)
 
 # ============================================================
@@ -329,9 +329,9 @@ def prepare_ldlm(node):
                 setup ="")
     
 def prepare_lov(node):
-    (name, uuid, mdcuuid, stripe_cnt, strip_sz, pattern, devlist, mdsname) = getLOVInfo(node)
-    print 'LOV:', name, uuid, mdcuuid, stripe_cnt, strip_sz, pattern, devlist, mdsname
-    lctl.lovconfig(uuid, mdsname, stripe_cnt, strip_sz, pattern, devlist)
+    (name, uuid, mdcuuid, stripe_cnt, strip_sz, stripe_off, pattern, devlist, mdsname) = getLOVInfo(node)
+    print 'LOV:', name, uuid, mdcuuid, stripe_cnt, strip_sz, stripe_off, pattern, devlist, mdsname
+    lctl.lovconfig(uuid, mdsname, stripe_cnt, strip_sz, stripe_off, pattern, devlist)
     lctl.newdev(attach="lov %s %s" % (name, uuid),
                 setup ="%s" % (mdcuuid))
 
@@ -362,7 +362,7 @@ def prepare_ost(ost):
     name, uuid, obd = getOSTInfo(ost)
     print 'OST:', name, uuid, obd
     lctl.newdev(attach="ost %s %s" % (name, uuid),
-                setup ="$%s" % (obd))
+                setup ="%s" % (obd))
 
 def prepare_mds(node):
     (name, uuid, dev, size, fstype, format) = getMDSInfo(node)
@@ -386,6 +386,9 @@ def prepare_mdc(node):
     print 'MDC:', name, uuid, mdsuuid, netuuid
     net = lookup(node.parentNode, netuuid)
     srvname, srvuuid, net, server, port = getNetworkInfo(net)
+    mds = lookup(node.parentNode, mdsuuid)
+    if mds == None:
+        panic(mdsuuid, "not found.")
     lctl.connect(net, server, port, netuuid)
     lctl.newdev(attach="mdc %s %s" % (name, uuid),
                 setup ="%s %s" %(mdsuuid, netuuid))
@@ -393,10 +396,12 @@ def prepare_mdc(node):
 def prepare_mountpoint(node):
     name, uuid, oscuuid, mdcuuid, mtpt = getMTPTInfo(node)
     print 'MTPT:', name, uuid, oscuuid, mdcuuid, mtpt
-    cmd = "mount -t lustre_lite -o ost=%s,mds=%s none %s" % \
+    cmd = "mount -t lustre_lite -o osc=%s,mdc=%s none %s" % \
           (oscuuid, mdcuuid, mtpt)
     run("mkdir", mtpt)
-    run(cmd)
+    ret, val = run(cmd)
+    if ret:
+        print mtpt, "mount failed."
 # ============================================================
 # Functions to cleanup the various objects
 
@@ -409,8 +414,8 @@ def cleanup_ldlm(node):
         print "cleanup failed: ", name
 
 def cleanup_lov(node):
-    (name, uuid, mdcuuid, stripe_cnt, strip_sz, pattern, devlist, mdsname) = getLOVInfo(node)
-    print 'LOV:', name, uuid, mdcuuid, stripe_cnt, strip_sz, pattern, devlist, mdsname
+    (name, uuid, mdcuuid, stripe_cnt, strip_sz, stripe_off, pattern, devlist, mdsname) = getLOVInfo(node)
+    print 'LOV:', name, uuid, mdcuuid, stripe_cnt, strip_sz, stripe_off, pattern, devlist, mdsname
     try:
         lctl.cleanup(name, uuid)
     except CommandError:
@@ -524,6 +529,7 @@ def getLOVInfo(node):
     name, uuid = getNodeAttr(node)
     devs = node.getElementsByTagName('devices')[0]
     stripe_sz = int(devs.getAttribute('stripesize'))
+    stripe_off = int(devs.getAttribute('stripeoffset'))
     pattern = int(devs.getAttribute('pattern'))
     mdcref =  node.getElementsByTagName('mdc_ref')[0]
     mdcuuid = mdcref.getAttribute('uuidref')
@@ -538,7 +544,7 @@ def getLOVInfo(node):
         if child.nodeName == 'osc_ref':
             devlist = devlist +  child.getAttribute('uuidref') + " "
             strip_cnt = stripe_cnt + 1
-    return (name, uuid, mdcuuid, stripe_cnt, stripe_sz, pattern, devlist, mdsname)
+    return (name, uuid, mdcuuid, stripe_cnt, stripe_sz, stripe_off, pattern, devlist, mdsname)
     
 # extract device attributes for an obd
 def getMDSInfo(node):
@@ -563,12 +569,7 @@ def getOSTInfo(node):
     name, uuid = getNodeAttr(node)
     ref = node.getElementsByTagName('obd_ref')[0]
     uuid = ref.getAttribute('uuidref')
-    obd = lookup(node.parentNode, uuid)
-    if obd:
-         obdname = getOBDInfo(obd)[0]
-    else:
-        obdname = "OBD NOT FOUND"
-    return (name, uuid, obdname)
+    return (name, uuid, uuid)
 
 # extract device attributes for an obd
 def getOSCInfo(node):
@@ -758,11 +759,16 @@ def cleanupProfile(lustreNode, profileNode):
 #
 # Load profile for 
 def doHost(lustreNode, hosts, cleanFlag):
+    node = None
     for h in hosts:
         node = getByName(lustreNode, 'node', h)
         if node:
             break
-        
+
+    if not node:
+        print 'No host entry found.'
+        return
+
     reflist = node.getElementsByTagName('profile_ref')
     for r in reflist:
         if cleanFlag:
@@ -837,11 +843,12 @@ def main():
         usage()
 
     if not options.has_key('hostname'):
+        options['hostname'] = []
         ret, host = run('hostname')
         if ret:
             print "unable to determine hostname"
-        else:
-            options['hostname'] = [string.strip(host[0])] 
+        elif len(host) > 0:
+            options['hostname'].append(string.strip(host[0]))
         options['hostname'].append('localhost')
     print "configuring for host: ", options['hostname']
     doHost(dom.childNodes[0], options['hostname'], options.has_key('cleanup') )
index 3b2a6db..49f3faa 100644 (file)
@@ -38,6 +38,7 @@
 <!ATTLIST lov %tag.attr;>\r
 <!ELEMENT devices (osc_ref)+>\r
 <!ATTLIST devices stripesize CDATA #REQUIRED\r
+                  stripeoffset CDATA #REQUIRED\r
                   pattern    CDATA #REQUIRED>\r
 <!ELEMENT router (misc)*>\r
 <!ATTLIST router %tag.attr;>\r
index f347c79..acd785f 100644 (file)
 #include "parser.h"
 #include <stdio.h>
 
+#define SHMEM_STATS 1
+#if SHMEM_STATS
+#include <sys/ipc.h>
+#include <sys/shm.h>
+
+#define MAX_SHMEM_COUNT 1024
+static long long *shared_counters;
+static long long  counter_snapshot[2][MAX_SHMEM_COUNT];
+struct timeval    prev_time;
+#endif
+
 static int jt_newdev(int argc, char **argv);
 static int jt_attach(int argc, char **argv);
 static int jt_setup(int argc, char **argv);
@@ -243,6 +254,95 @@ static int do_disconnect(char *func, int verbose)
         return rc;
 }
 
+#if SHMEM_STATS
+static void
+shmem_setup ()
+{
+        int shmid = shmget (IPC_PRIVATE, sizeof (counter_snapshot[0]), 0600);
+
+        if (shmid == -1)
+        {
+                fprintf (stderr, "Can't create shared memory counters: %s\n", strerror (errno));
+                return;
+        }
+        
+        shared_counters = (long long *)shmat (shmid, NULL, 0);
+
+        if (shared_counters == (long long *)(-1))
+        {
+                fprintf (stderr, "Can't attach shared memory counters: %s\n", strerror (errno));
+                shared_counters = NULL;
+                return;
+        }
+}
+
+static inline void
+shmem_reset ()
+{
+        if (shared_counters == NULL)
+                return;
+        
+        memset (shared_counters, 0, sizeof (counter_snapshot[0]));
+        memset (counter_snapshot, 0, sizeof (counter_snapshot));
+        gettimeofday (&prev_time, NULL);
+}
+
+static inline void
+shmem_bump ()
+{
+        if (shared_counters == NULL || 
+            thread <= 0 || thread > MAX_SHMEM_COUNT)
+                return;
+        
+        shared_counters[thread - 1]++;
+}
+
+static void
+shmem_snap (int n)
+{
+        struct timeval this_time;
+        int            non_zero = 0;
+        long long      total = 0;
+        double         secs;
+        int            i;
+        
+        if (shared_counters == NULL || n > MAX_SHMEM_COUNT)
+                return;
+
+        memcpy (counter_snapshot[1], counter_snapshot[0], n * sizeof (counter_snapshot[0][0]));
+        memcpy (counter_snapshot[0], shared_counters, n * sizeof (counter_snapshot[0][0]));
+        gettimeofday (&this_time, NULL);
+        
+        for (i = 0; i < n; i++)
+        {
+                long long this_count = counter_snapshot[0][i] - counter_snapshot[1][i];
+                
+                if (this_count != 0)
+                {
+                        non_zero++;
+                        total += this_count;
+                }
+        }
+        
+        secs = (this_time.tv_sec + this_time.tv_usec/1000000.0) -
+               (prev_time.tv_sec + prev_time.tv_usec/1000000.0);
+        
+        printf ("%d/%d Total: %f/second\n", non_zero, n, total / secs);
+
+        prev_time = this_time;
+}
+
+#define SHMEM_SETUP()  shmem_setup()
+#define SHMEM_RESET()  shmem_reset()
+#define SHMEM_BUMP()   shmem_bump()
+#define SHMEM_SNAP(n)  shmem_snap(n)
+#else
+#define SHMEM_SETUP()
+#define SHMEM_RESET()
+#define SHMEM_BUMP()
+#define SHMEM_SNAP(n)
+#endif
+
 extern command_t cmdlist[];
 
 static int xml_command(char *cmd, ...) {
@@ -828,7 +928,7 @@ static int jt__threads(int argc, char **argv)
 {
         int threads, next_thread;
         int verbose;
-        int i, j;
+        int i;
 
         if (argc < 5) {
                 fprintf(stderr,
@@ -841,9 +941,12 @@ static int jt__threads(int argc, char **argv)
 
         verbose = get_verbose(argv[2]);
 
-        printf("%s: starting %d threads on device %s running %s\n",
-               argv[0], threads, argv[3], argv[4]);
+        if (verbose != 0)
+                printf("%s: starting %d threads on device %s running %s\n",
+                       argv[0], threads, argv[3], argv[4]);
 
+        SHMEM_RESET();
+        
         for (i = 1, next_thread = verbose; i <= threads; i++) {
                 rc = fork();
                 if (rc < 0) {
@@ -861,14 +964,23 @@ static int jt__threads(int argc, char **argv)
         }
 
         if (!thread) { /* parent process */
-                if (!verbose)
-                        printf("%s: started %d threads\n\n", argv[0], i - 1);
-                else
-                        printf("\n");
-
-                for (j = 1; j < i; j++) {
-                        int status;
-                        int ret = wait(&status);
+                int live_threads = threads;
+                
+                while (live_threads > 0)
+                {
+                        int    status;
+                        pid_t  ret;
+
+                        ret = waitpid (0, &status, verbose < 0 ? WNOHANG : 0);
+                        if (ret == 0)
+                        {
+                                if (verbose >= 0)
+                                        abort ();
+
+                                sleep (-verbose);
+                                SHMEM_SNAP (threads);
+                                continue;
+                        }
 
                         if (ret < 0) {
                                 fprintf(stderr, "error: %s: wait - %s\n",
@@ -889,6 +1001,8 @@ static int jt__threads(int argc, char **argv)
                                                 argv[0], ret, err);
                                 if (!rc)
                                         rc = err;
+
+                                live_threads--;
                         }
                 }
         }
@@ -1165,6 +1279,7 @@ static int jt_create(int argc, char **argv)
 
         for (i = 1, next_count = verbose; i <= count ; i++) {
                 rc = ioctl(fd, OBD_IOC_CREATE , &data);
+                SHMEM_BUMP();
                 if (rc < 0) {
                         fprintf(stderr, "error: %s: #%d - %s\n",
                                 cmdname(argv[0]), i, strerror(rc = errno));
@@ -1272,11 +1387,13 @@ static int jt_test_getattr(int argc, char **argv)
         gettimeofday(&start, NULL);
         next_time.tv_sec = start.tv_sec - verbose;
         next_time.tv_usec = start.tv_usec;
-        printf("%s: getting %d attrs (testing only): %s", cmdname(argv[0]),
-               count, ctime(&start.tv_sec));
+        if (verbose != 0)
+                printf("%s: getting %d attrs (testing only): %s", cmdname(argv[0]),
+                       count, ctime(&start.tv_sec));
 
         for (i = 1, next_count = verbose; i <= count; i++) {
                 rc = ioctl(fd, OBD_IOC_GETATTR , &data);
+                SHMEM_BUMP();
                 if (rc < 0) {
                         fprintf(stderr, "error: %s: #%d - %s\n",
                                 cmdname(argv[0]), i, strerror(rc = errno));
@@ -1296,9 +1413,10 @@ static int jt_test_getattr(int argc, char **argv)
                 diff = difftime(&end, &start);
 
                 --i;
-                printf("%s: %d attrs in %.4gs (%.4g attr/s): %s",
-                       cmdname(argv[0]), i, diff, (double)i / diff,
-                       ctime(&end.tv_sec));
+                if (verbose != 0)
+                        printf("%s: %d attrs in %.4gs (%.4g attr/s): %s",
+                               cmdname(argv[0]), i, diff, (double)i / diff,
+                               ctime(&end.tv_sec));
         }
         return rc;
 }
@@ -1366,9 +1484,10 @@ static int jt_test_brw(int argc, char **argv)
         next_time.tv_sec = start.tv_sec - verbose;
         next_time.tv_usec = start.tv_usec;
 
-        printf("%s: %s %d (%dx%d pages) (testing only): %s",
-               cmdname(argv[0]), write ? "writing" : "reading",
-               count, obdos, pages, ctime(&start.tv_sec));
+        if (verbose != 0)
+                printf("%s: %s %d (%dx%d pages) (testing only): %s",
+                       cmdname(argv[0]), write ? "writing" : "reading",
+                       count, obdos, pages, ctime(&start.tv_sec));
 
         /*
          * We will put in the start time (and loop count inside the loop)
@@ -1393,6 +1512,7 @@ static int jt_test_brw(int argc, char **argv)
                 }
 
                 rc = ioctl(fd, rw, &data);
+                SHMEM_BUMP();
                 if (rc) {
                         fprintf(stderr, "error: %s: #%d - %s on %s\n",
                                 cmdname(argv[0]), i, strerror(rc = errno),
@@ -1414,10 +1534,11 @@ static int jt_test_brw(int argc, char **argv)
                 diff = difftime(&end, &start);
 
                 --i;
-                printf("%s: %s %dx%dx%d pages in %.4gs (%.4g pg/s): %s",
-                       cmdname(argv[0]), write ? "wrote" : "read", obdos,
-                       pages, i, diff, (double)obdos * i * pages / diff,
-                       ctime(&end.tv_sec));
+                if (verbose != 0)
+                        printf("%s: %s %dx%dx%d pages in %.4gs (%.4g pg/s): %s",
+                               cmdname(argv[0]), write ? "wrote" : "read", obdos,
+                               pages, i, diff, (double)obdos * i * pages / diff,
+                               ctime(&end.tv_sec));
         }
         return rc;
 }
@@ -1430,6 +1551,9 @@ static int jt_lov_config(int argc, char **argv)
         int size, i;
         IOCINIT(data);
 
+        printf("WARNING: obdctl lovconfig NOT MAINTAINED\n"); 
+        return -1;
+
         if (argc <= 5 ){
                 Parser_printhelp("lovconfig"); 
                 return -1;
@@ -1443,8 +1567,8 @@ static int jt_lov_config(int argc, char **argv)
             
         memset(&desc, 0, sizeof(desc)); 
         strcpy(desc.ld_uuid, argv[1]); 
-        desc.ld_default_stripecount = strtoul(argv[2], NULL, 0); 
-        desc.ld_default_stripesize = strtoul(argv[3], NULL, 0); 
+        desc.ld_default_stripe_count = strtoul(argv[2], NULL, 0); 
+        desc.ld_default_stripe_size = strtoul(argv[3], NULL, 0); 
         desc.ld_pattern = strtoul(argv[4], NULL, 0); 
         desc.ld_tgt_count = argc - 5;
 
@@ -1590,7 +1714,9 @@ int main(int argc, char **argv)
         sigact.sa_flags = SA_RESTART;
         sigaction(SIGINT, &sigact, NULL);
 
-
+        setlinebuf (stdout);
+        SHMEM_SETUP();
+        
         if (argc > 1) {
                 rc = Parser_execarg(argc - 1, argv + 1, cmdlist);
         } else {
index 0118e68..0ecfd42 100644 (file)
@@ -28,7 +28,6 @@
 #include <sys/param.h>
 #include <assert.h>
 
-#include <config.h>
 #ifdef HAVE_LIBREADLINE
 #define READLINE_LIBRARY
 #include <readline/readline.h>