Whamcloud - gitweb
Land b1_2 onto HEAD (20040304_171022)
authoradilger <adilger>
Fri, 5 Mar 2004 03:52:30 +0000 (03:52 +0000)
committeradilger <adilger>
Fri, 5 Mar 2004 03:52:30 +0000 (03:52 +0000)
b=2733, b=2773, b=2529, b=2730, b=2819, b=2814, b=2822, b=1450, b=2676, b=2681
b=2817, b=2706, b=2816, b=1987, b=2884, b=1191, b=2809, b=2765, b=2805, b=1972

113 files changed:
lnet/include/linux/kp30.h
lnet/klnds/socklnd/socklnd_cb.c
lnet/lnet/api-init.c
lnet/ulnds/procapi.c
lnet/ulnds/socklnd/procapi.c
lnet/utils/debug.c
lustre/ChangeLog
lustre/cobd/cache_obd.c
lustre/include/linux/lustre_cfg.h
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_export.h
lustre/include/linux/lustre_ha.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_import.h
lustre/include/linux/lustre_lib.h
lustre/include/linux/lustre_log.h
lustre/include/linux/lustre_net.h
lustre/include/linux/lvfs.h
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/include/linux/obd_support.h
lustre/kernel_patches/patches/ext3-delete_thread-2.4.20-hp.patch
lustre/kernel_patches/patches/invalidate_show-2.4.20-hp.patch [new file with mode: 0644]
lustre/kernel_patches/series/hp-pnnl-2.4.20
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_flock.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_plain.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/liblustre/rw.c
lustre/liblustre/super.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/llite/rw.c
lustre/llite/rw24.c
lustre/llite/rw26.c
lustre/lov/lov_internal.h
lustre/lov/lov_log.c
lustre/lov/lov_obd.c
lustre/lov/lov_pack.c
lustre/lvfs/lvfs_common.c
lustre/lvfs/lvfs_linux.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/mds/mds_internal.h
lustre/mds/mds_log.c
lustre/mds/mds_lov.c
lustre/obdclass/class_obd.c
lustre/obdclass/genops.c
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_internal.h
lustre/obdclass/llog_ioctl.c
lustre/obdclass/llog_lvfs.c
lustre/obdclass/llog_obd.c
lustre/obdclass/llog_test.c
lustre/obdclass/lprocfs_status.c
lustre/obdclass/lustre_handles.c
lustre/obdecho/echo.c
lustre/obdecho/echo_client.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io.c
lustre/obdfilter/filter_io_24.c
lustre/obdfilter/filter_lvb.c
lustre/osc/lproc_osc.c
lustre/osc/osc_create.c
lustre/osc/osc_internal.h
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/portals/include/linux/kp30.h
lustre/portals/knals/socknal/socknal_cb.c
lustre/portals/portals/api-init.c
lustre/portals/unals/procapi.c
lustre/portals/utils/debug.c
lustre/ptlrpc/client.c
lustre/ptlrpc/events.c
lustre/ptlrpc/import.c
lustre/ptlrpc/llog_client.c
lustre/ptlrpc/llog_net.c
lustre/ptlrpc/llog_server.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/recov_thread.c
lustre/ptlrpc/recover.c
lustre/ptlrpc/service.c
lustre/scripts/branch.sh [new file with mode: 0755]
lustre/scripts/land1.sh
lustre/tests/acceptance-small.sh
lustre/tests/conf-sanity.sh
lustre/tests/recovery-small.sh
lustre/tests/replay-dual.sh
lustre/tests/replay-ost-single.sh
lustre/tests/replay-single.sh
lustre/tests/runslabinfo
lustre/tests/sanity.sh
lustre/tests/sanityN.sh
lustre/tests/test-framework.sh
lustre/utils/llmount.c
lustre/utils/wirecheck.c
lustre/utils/wirehdr.c
lustre/utils/wiretest.c

index 53828de..c080a57 100644 (file)
@@ -21,57 +21,57 @@ extern unsigned int portal_debug;
 extern unsigned int portal_printk;
 extern unsigned int portal_cerror;
 /* Debugging subsystems (32 bits, non-overlapping) */
-#define S_UNDEFINED    (1 << 0)
-#define S_MDC          (1 << 1)
-#define S_MDS          (1 << 2)
-#define S_OSC          (1 << 3)
-#define S_OST          (1 << 4)
-#define S_CLASS        (1 << 5)
-#define S_LOG          (1 << 6)
-#define S_LLITE        (1 << 7)
-#define S_RPC          (1 << 8)
-#define S_MGMT         (1 << 9)
-#define S_PORTALS     (1 << 10)
-#define S_SOCKNAL     (1 << 11)
-#define S_QSWNAL      (1 << 12)
-#define S_PINGER      (1 << 13)
-#define S_FILTER      (1 << 14)
-#define S_PTLBD       (1 << 15)
-#define S_ECHO        (1 << 16)
-#define S_LDLM        (1 << 17)
-#define S_LOV         (1 << 18)
-#define S_GMNAL       (1 << 19)
-#define S_PTLROUTER   (1 << 20)
-#define S_COBD        (1 << 21)
-#define S_IBNAL       (1 << 22)
+#define S_UNDEFINED   0x00000001
+#define S_MDC         0x00000002
+#define S_MDS         0x00000004
+#define S_OSC         0x00000008
+#define S_OST         0x00000010
+#define S_CLASS       0x00000020
+#define S_LOG         0x00000040
+#define S_LLITE       0x00000080
+#define S_RPC         0x00000100
+#define S_MGMT        0x00000200
+#define S_PORTALS     0x00000400
+#define S_SOCKNAL     0x00000800
+#define S_QSWNAL      0x00001000
+#define S_PINGER      0x00002000
+#define S_FILTER      0x00004000
+#define S_PTLBD       0x00008000
+#define S_ECHO        0x00010000
+#define S_LDLM        0x00020000
+#define S_LOV         0x00040000
+#define S_GMNAL       0x00080000
+#define S_PTLROUTER   0x00100000
+#define S_COBD        0x00200000
+#define S_IBNAL       0x00400000
 
 /* If you change these values, please keep portals/utils/debug.c
  * up to date! */
 
 /* Debugging masks (32 bits, non-overlapping) */
-#define D_TRACE     (1 << 0) /* ENTRY/EXIT markers */
-#define D_INODE     (1 << 1)
-#define D_SUPER     (1 << 2)
-#define D_EXT2      (1 << 3) /* anything from ext2_debug */
-#define D_MALLOC    (1 << 4) /* print malloc, free information */
-#define D_CACHE     (1 << 5) /* cache-related items */
-#define D_INFO      (1 << 6) /* general information */
-#define D_IOCTL     (1 << 7) /* ioctl related information */
-#define D_BLOCKS    (1 << 8) /* ext2 block allocation */
-#define D_NET       (1 << 9) /* network communications */
-#define D_WARNING   (1 << 10) /* CWARN(...) == CDEBUG (D_WARNING, ...) */
-#define D_BUFFS     (1 << 11)
-#define D_OTHER     (1 << 12)
-#define D_DENTRY    (1 << 13)
-#define D_PORTALS   (1 << 14) /* ENTRY/EXIT markers */
-#define D_PAGE      (1 << 15) /* bulk page handling */
-#define D_DLMTRACE  (1 << 16)
-#define D_ERROR     (1 << 17) /* CERROR(...) == CDEBUG (D_ERROR, ...) */
-#define D_EMERG     (1 << 18) /* CEMERG(...) == CDEBUG (D_EMERG, ...) */
-#define D_HA        (1 << 19) /* recovery and failover */
-#define D_RPCTRACE  (1 << 20) /* for distributed debugging */
-#define D_VFSTRACE  (1 << 21)
-#define D_READA     (1 << 22) /* read-ahead */
+#define D_TRACE       0x00000001 /* ENTRY/EXIT markers */
+#define D_INODE       0x00000002
+#define D_SUPER       0x00000004
+#define D_EXT2        0x00000008 /* anything from ext2_debug */
+#define D_MALLOC      0x00000010 /* print malloc, free information */
+#define D_CACHE       0x00000020 /* cache-related items */
+#define D_INFO        0x00000040 /* general information */
+#define D_IOCTL       0x00000080 /* ioctl related information */
+#define D_BLOCKS      0x00000100 /* ext2 block allocation */
+#define D_NET         0x00000200 /* network communications */
+#define D_WARNING     0x00000400 /* CWARN(...) == CDEBUG (D_WARNING, ...) */
+#define D_BUFFS       0x00000800
+#define D_OTHER       0x00001000
+#define D_DENTRY      0x00002000
+#define D_PORTALS     0x00004000 /* ENTRY/EXIT markers */
+#define D_PAGE        0x00008000 /* bulk page handling */
+#define D_DLMTRACE    0x00010000
+#define D_ERROR       0x00020000 /* CERROR(...) == CDEBUG (D_ERROR, ...) */
+#define D_EMERG       0x00040000 /* CEMERG(...) == CDEBUG (D_EMERG, ...) */
+#define D_HA          0x00080000 /* recovery and failover */
+#define D_RPCTRACE    0x00100000 /* for distributed debugging */
+#define D_VFSTRACE    0x00200000
+#define D_READA       0x00400000 /* read-ahead */
 
 #ifdef __KERNEL__
 # include <linux/sched.h> /* THREAD_SIZE */
@@ -234,6 +234,12 @@ extern void kportal_assertion_failed(char *expr, char *file, const char *func,
 #define LASSERTF(cond, fmt...) do { } while (0)
 #endif
 
+#ifdef CONFIG_SMP
+#define LASSERT_SPIN_LOCKED(lock) LASSERT(spin_is_locked(lock))
+#else
+#define LASSERT_SPIN_LOCKED(lock) do {} while(0)
+#endif
+
 #ifdef __arch_um__
 #define LBUG_WITH_LOC(file, func, line)                                 \
 do {                                                                    \
index 72bd0b7..c89e20e 100644 (file)
@@ -760,19 +760,19 @@ ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
 }
 
 ksock_conn_t *
-ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) 
+ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
 {
         struct list_head *tmp;
         ksock_conn_t     *typed = NULL;
         int               tnob  = 0;
         ksock_conn_t     *fallback = NULL;
         int               fnob     = 0;
-        
+
         /* Find the conn with the shortest tx queue */
         list_for_each (tmp, &peer->ksnp_conns) {
                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
                 int           nob = atomic_read(&c->ksnc_tx_nob) +
-                                    c->ksnc_sock->sk->sk_wmem_queued;
+                                        c->ksnc_sock->sk->sk_wmem_queued;
 
                 LASSERT (!c->ksnc_closing);
 
index b811391..e2921ac 100644 (file)
@@ -29,7 +29,7 @@ int ptl_init;
 unsigned int portal_subsystem_debug = ~0 - (S_PORTALS | S_QSWNAL | S_SOCKNAL |
                                             S_GMNAL | S_IBNAL);
 unsigned int portal_debug = (D_WARNING | D_DLMTRACE | D_ERROR | D_EMERG | D_HA |
-                             D_RPCTRACE | D_VFSTRACE);
+                             D_RPCTRACE | D_VFSTRACE | D_MALLOC);
 unsigned int portal_cerror = 1;
 unsigned int portal_printk;
 unsigned int portal_stack;
index bddfe9a..c27f555 100644 (file)
@@ -71,7 +71,7 @@ void procbridge_wakeup_nal(procbridge p)
  *   side, and collects the result
  */
 static int procbridge_forward(nal_t *n, int id, void *args, size_t args_len,
-                             void *ret, ptl_size_t ret_len)
+                             void *ret, size_t ret_len)
 {
     bridge b = (bridge) n->nal_data;
 
index bddfe9a..c27f555 100644 (file)
@@ -71,7 +71,7 @@ void procbridge_wakeup_nal(procbridge p)
  *   side, and collects the result
  */
 static int procbridge_forward(nal_t *n, int id, void *args, size_t args_len,
-                             void *ret, ptl_size_t ret_len)
+                             void *ret, size_t ret_len)
 {
     bridge b = (bridge) n->nal_data;
 
index 2ca4dc3..01e690f 100644 (file)
@@ -66,7 +66,8 @@ static int debug_mask = ~0;
 static const char *portal_debug_subsystems[] =
         {"undefined", "mdc", "mds", "osc", "ost", "class", "log", "llite",
          "rpc", "mgmt", "portals", "socknal", "qswnal", "pinger", "filter",
-         "ptlbd", "echo", "ldlm", "lov", "gmnal", "router", "cobd", NULL};
+         "ptlbd", "echo", "ldlm", "lov", "gmnal", "router", "cobd", "ibnal",
+         NULL};
 static const char *portal_debug_masks[] =
         {"trace", "inode", "super", "ext2", "malloc", "cache", "info", "ioctl",
          "blocks", "net", "warning", "buffs", "other", "dentry", "portals",
index 0a5e0de..274a565 100644 (file)
@@ -1,4 +1,4 @@
-tbd         Cluster File Systems, Inc. <info@clusterfs.com>
+2004-03-04  Cluster File Systems, Inc. <info@clusterfs.com>
        * version 1.2.0
        * bug fixes
        - account for cache space usage on clients to avoid data loss (974)
@@ -19,6 +19,7 @@ tbd         Cluster File Systems, Inc. <info@clusterfs.com>
        - ptlrpc cleanup bug (2710)
        - mds timeout on local locks (2588)
        - namespace lock held during RPCs (2431)
+       - handle interrupted sync write properly (2503)
        - don't try to handle a message that hasn't been replied to (2699)
        - client assert failure during cleanup after abort recovery (2701)
        - leak mdc device after failed mount (2712)
@@ -37,8 +38,27 @@ tbd         Cluster File Systems, Inc. <info@clusterfs.com>
        - don't delete objects on OST if given a bogus objid from MDS (2751)
        - handle large client PAGE_SIZE readdir on small PAGE_SIZE MDS (2777)
        - if rq_no_resend, then timeout request after recovery (2432)
+       - fix MDS llog_logid record size, 64-bit array alignment (2733)
+       - don't call usermode_helper from ptlrpcd, DEFAULT upcall (2773)
+       - put magic in mount.lustre data, check for bad/NULL mount data (2529)
+       - MDS recovery shouldn't delete objects that it has given out (2730)
+       - if enqueue arrives after completion, don't clobber LVB (2819)
+       - don't unlock pages twice when trigger_group_io returns error (2814)
+       - don't deref NULL rq_repmsg if ldlm_handle_enqueue failed (2822)
+       - don't write pages to disk if there was an error (1450)
+       - don't ping imports that have recovery disabled (2676)
+       - take buffered bytes into account when balancing socknal conn (2817)
+       - hold a DLM lock over readdir always, use truncate_inode_pages (2706)
+       - reconnect unlink llog connection after MDS reconnects to OST (2816)
+       - remove little-endian swabbing of llog records (1987)
+       - set/limit i_blksize to LL_MAX_BLKSIZE on client (2884)
+       - retry reposting request buffers if they fail (1191)
+       - grow extent at grant time to avoid granting a revoked lock (2809)
+       - lock revoke doesn't evict page if covered by a second lock (2765)
+       - disable VM readahead to avoid reading outside lock extents (2805)
        * miscellania
        - return LL_SUPER_MAGIC from statfs for the filesystem type (1972)
+       - updated kernel patches for hp-2.4.20 kernel (2681)
 
 2004-02-07  Cluster File Systems, Inc. <info@clusterfs.com>
        * version 1.0.4
index 5c978bf..e725ecf 100644 (file)
@@ -188,10 +188,9 @@ static int cobd_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
 static int cobd_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
                          int objcount, struct obd_ioobj *obj,
                          int niocount, struct niobuf_local *local,
-                         struct obd_trans_info *oti)
+                         struct obd_trans_info *oti, int rc)
 {
         struct obd_export *cobd_exp;
-        int rc;
 
         if (exp->exp_obd == NULL)
                 return -EINVAL;
@@ -200,7 +199,8 @@ static int cobd_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
                 return -EOPNOTSUPP;
 
         cobd_exp = exp->exp_obd->u.cobd.cobd_target_exp;
-        rc = obd_commitrw(cmd, cobd_exp, oa, objcount, obj,niocount,local,oti);
+        rc = obd_commitrw(cmd, cobd_exp, oa, objcount, obj, niocount, local,
+                          oti, rc);
         return rc;
 }
 
@@ -212,7 +212,7 @@ static int cobd_brw(int cmd, struct obd_export *exp, struct obdo *oa,
         struct cache_obd  *cobd;
 
         if (obd == NULL) {
-                CERROR("invalid client cookie "LPX64"\n", 
+                CERROR("invalid client cookie "LPX64"\n",
                        exp->exp_handle.h_cookie);
                 return -EINVAL;
         }
index a9a278f..d8c84be 100644 (file)
@@ -248,6 +248,7 @@ static inline void lustre_cfg_freedata(char *buf, int len)
 
 /* Passed by mount */
 struct lustre_mount_data {
+        uint32_t lmd_magic;
         uint32_t lmd_version;
         uint64_t lmd_local_nid;
         uint64_t lmd_server_nid;
index e37dcb1..b8515a3 100644 (file)
@@ -8,7 +8,7 @@
 
 #ifdef __KERNEL__
 # include <linux/proc_fs.h>
-#endif 
+#endif
 
 #include <linux/lustre_lib.h>
 #include <linux/lustre_net.h>
@@ -91,15 +91,13 @@ typedef enum {
 #define LDLM_CB_BLOCKING    1
 #define LDLM_CB_CANCELING   2
 
-#define L2B(c) (1 << c)
-
 /* compatibility matrix */
-#define LCK_COMPAT_EX  L2B(LCK_NL)
-#define LCK_COMPAT_PW  (LCK_COMPAT_EX | L2B(LCK_CR))
-#define LCK_COMPAT_PR  (LCK_COMPAT_PW | L2B(LCK_PR))
-#define LCK_COMPAT_CW  (LCK_COMPAT_PW | L2B(LCK_CW))
-#define LCK_COMPAT_CR  (LCK_COMPAT_CW | L2B(LCK_PR) | L2B(LCK_PW))
-#define LCK_COMPAT_NL  (LCK_COMPAT_CR | L2B(LCK_EX))
+#define LCK_COMPAT_EX  LCK_NL
+#define LCK_COMPAT_PW  (LCK_COMPAT_EX | LCK_CR)
+#define LCK_COMPAT_PR  (LCK_COMPAT_PW | LCK_PR)
+#define LCK_COMPAT_CW  (LCK_COMPAT_PW | LCK_CW)
+#define LCK_COMPAT_CR  (LCK_COMPAT_CW | LCK_PR | LCK_PW)
+#define LCK_COMPAT_NL  (LCK_COMPAT_CR | LCK_EX)
 
 static ldlm_mode_t lck_compat_array[] = {
         [LCK_EX] LCK_COMPAT_EX,
@@ -110,12 +108,14 @@ static ldlm_mode_t lck_compat_array[] = {
         [LCK_NL] LCK_COMPAT_NL
 };
 
-static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new)
+static inline void lockmode_verify(ldlm_mode_t mode)
 {
-       LASSERT(exist >= LCK_EX && exist <= LCK_NL);
-       LASSERT(new >= LCK_EX && new <= LCK_NL);
+       LASSERT(mode >= LCK_EX && mode <= LCK_NL);
+}
 
-       return (lck_compat_array[exist] & L2B(new));
+static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new)
+{
+       return (lck_compat_array[exist] & new);
 }
 
 /*
@@ -133,8 +133,8 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new)
    -
 */
 
-struct ldlm_lock; 
-struct ldlm_resource; 
+struct ldlm_lock;
+struct ldlm_resource;
 struct ldlm_namespace;
 
 typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **,
@@ -155,7 +155,7 @@ struct ldlm_namespace {
         struct list_head       ns_root_list; /* all root resources in ns */
         struct lustre_lock     ns_lock; /* protects hash, refcount, list */
         struct list_head       ns_list_chain; /* position in global NS list */
-        /* 
+        /*
         struct proc_dir_entry *ns_proc_dir;
         */
 
@@ -200,8 +200,6 @@ struct ldlm_lock {
         struct list_head      l_lru;
         struct list_head      l_res_link; // position in one of three res lists
         struct list_head      l_export_chain; // per-export chain of locks
-        struct list_head      l_pending_chain; // locks with callbacks pending
-        unsigned long         l_callback_timeout;
 
         ldlm_mode_t           l_req_mode;
         ldlm_mode_t           l_granted_mode;
@@ -209,22 +207,13 @@ struct ldlm_lock {
         ldlm_completion_callback l_completion_ast;
         ldlm_blocking_callback   l_blocking_ast;
         ldlm_glimpse_callback    l_glimpse_ast;
-        void                    *l_ast_data;
 
         struct obd_export    *l_export;
-        /* XXX phil can fix this, I'm sure */
         struct obd_export    *l_conn_export;
-//        struct lustre_handle *l_connh;
         __u32                 l_flags;
         struct lustre_handle  l_remote_handle;
         ldlm_policy_data_t    l_policy_data;
 
-        /* This LVB is used only on the client side, as temporary storage for
-         * a lock value block received during an enqueue */
-        __u32                 l_lvb_len;
-        void                 *l_lvb_data;
-        void                 *l_lvb_swabber;
-
         __u32                 l_readers;
         __u32                 l_writers;
         __u8                  l_destroyed;
@@ -234,9 +223,20 @@ struct ldlm_lock {
          * on this waitq to learn when it becomes granted. */
         wait_queue_head_t     l_waitq;
         struct timeval        l_enqueued_time;
-        unsigned long         l_last_used; /* jiffies */
-};
 
+        unsigned long         l_last_used;      /* jiffies */
+        struct ldlm_extent    l_req_extent;
+
+        /* Client-side-only members */
+        __u32                 l_lvb_len;        /* temporary storage for */
+        void                 *l_lvb_data;       /* an LVB received during */
+        void                 *l_lvb_swabber;    /* an enqueue */
+        void                 *l_ast_data;
+
+        /* Server-side-only members */
+        struct list_head      l_pending_chain;  /* callbacks pending */
+        unsigned long         l_callback_timeout;
+};
 
 #define LDLM_PLAIN       10
 #define LDLM_EXTENT      11
@@ -308,7 +308,8 @@ do {                                                                          \
                 CDEBUG(level, "### " format                                   \
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "  \
                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64\
-                       "] flags: %x remote: "LPX64" expref: %d\n" , ## a,     \
+                       "] (req "LPU64"->"LPU64") flags: %x remote: "LPX64     \
+                       " expref: %d\n" , ## a,                                \
                        lock->l_resource->lr_namespace->ns_name, lock,         \
                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),   \
                        lock->l_readers, lock->l_writers,                      \
@@ -320,6 +321,7 @@ do {                                                                          \
                        ldlm_typename[lock->l_resource->lr_type],              \
                        lock->l_policy_data.l_extent.start,                    \
                        lock->l_policy_data.l_extent.end,                      \
+                       lock->l_req_extent.start, lock->l_req_extent.end,      \
                        lock->l_flags, lock->l_remote_handle.cookie,           \
                        lock->l_export ?                                       \
                        atomic_read(&lock->l_export->exp_refcount) : -99);     \
index 218807c..9be781f 100644 (file)
@@ -26,7 +26,7 @@ struct mds_export_data {
 struct osc_creator {
         spinlock_t              oscc_lock;
         struct list_head        oscc_list;
-        struct obd_export      *oscc_exp;
+        struct obd_device       *oscc_obd;
         obd_id                  oscc_last_id;//last available pre-created object
         obd_id                  oscc_next_id;// what object id to give out next
         int                     oscc_initial_create_count;
@@ -38,10 +38,6 @@ struct osc_creator {
         wait_queue_head_t       oscc_waitq; /* creating procs wait on this */
 };
 
-struct osc_export_data {
-        struct osc_creator      oed_oscc;
-};
-
 struct ldlm_export_data {
         struct list_head       led_held_locks; /* protected by namespace lock */
 };
@@ -83,14 +79,12 @@ struct obd_export {
                 struct mds_export_data    eu_mds_data;
                 struct filter_export_data eu_filter_data;
                 struct ec_export_data     eu_ec_data;
-                struct osc_export_data    eu_osc_data;
         } u;
 };
 
 #define exp_mds_data    u.eu_mds_data
 #define exp_lov_data    u.eu_lov_data
 #define exp_filter_data u.eu_filter_data
-#define exp_osc_data    u.eu_osc_data
 #define exp_ec_data     u.eu_ec_data
 
 extern struct obd_export *class_conn2export(struct lustre_handle *conn);
index 4dfc81d..808ff44 100644 (file)
@@ -19,6 +19,7 @@ void ptlrpc_free_committed(struct obd_import *imp);
 void ptlrpc_wake_delayed(struct obd_import *imp);
 int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid);
 int ptlrpc_set_import_active(struct obd_import *imp, int active);
+void ptlrpc_invalidate_import(struct obd_import *imp);
 void ptlrpc_fail_import(struct obd_import *imp, int generation);
 void ptlrpc_fail_export(struct obd_export *exp);
 
index b2dbd86..c0452ea 100644 (file)
@@ -458,7 +458,10 @@ extern void lustre_swab_ost_last_id(obd_id *id);
 
 struct ost_lvb {
         __u64 lvb_size;
-        __u64 lvb_time;
+        __u64 lvb_mtime;
+        __u64 lvb_atime;
+        __u64 lvb_ctime;
+        __u64 lvb_blocks;
 };
 
 extern void lustre_swab_ost_lvb(struct ost_lvb *);
@@ -716,11 +719,11 @@ extern void lustre_swab_ldlm_res_id (struct ldlm_res_id *id);
 /* lock types */
 typedef enum {
         LCK_EX = 1,
-        LCK_PW,
-        LCK_PR,
-        LCK_CW,
-        LCK_CR,
-        LCK_NL
+        LCK_PW = 2,
+        LCK_PR = 4,
+        LCK_CW = 8,
+        LCK_CR = 16,
+        LCK_NL = 32
 } ldlm_mode_t;
 
 struct ldlm_extent {
@@ -759,7 +762,7 @@ struct ldlm_resource_desc {
         __u32 lr_type;
         __u32 lr_padding;
         struct ldlm_res_id lr_name;
-} __attribute__((packed));
+};
 
 extern void lustre_swab_ldlm_resource_desc (struct ldlm_resource_desc *r);
 
@@ -768,7 +771,7 @@ struct ldlm_lock_desc {
         ldlm_mode_t l_req_mode;
         ldlm_mode_t l_granted_mode;
         ldlm_policy_data_t l_policy_data;
-} __attribute__((packed));
+};
 
 extern void lustre_swab_ldlm_lock_desc (struct ldlm_lock_desc *l);
 
@@ -778,12 +781,13 @@ struct ldlm_request {
         struct ldlm_lock_desc lock_desc;
         struct lustre_handle lock_handle1;
         struct lustre_handle lock_handle2;
-} __attribute__((packed));
+};
 
 extern void lustre_swab_ldlm_request (struct ldlm_request *rq);
 
 struct ldlm_reply {
         __u32 lock_flags;
+        __u32 lock_padding;
         struct ldlm_lock_desc lock_desc;
         struct lustre_handle lock_handle;
         __u64  lock_policy_res1;
@@ -863,6 +867,13 @@ struct llog_logid {
         __u32                   lgl_ogen;
 } __attribute__((packed));
 
+/* Records written to the CATALOGS list */
+#define CATLIST "CATALOGS"
+struct llog_catid {
+        struct llog_logid       lci_logid;
+        __u32                   lci_padding[3];
+} __attribute__((packed));
+
 /* Log data record types - there is no specific reason that these need to
  * be related to the RPC opcodes, but no reason not to (may be handy later?)
  */
@@ -874,7 +885,7 @@ typedef enum {
         PTL_CFG_REC      = 0x10630000,
         LLOG_GEN_REC     = 0x10640000,
         LLOG_HDR_MAGIC   = 0x10645539,
-        LLOG_LOGID_MAGIC = 0x1064553a,
+        LLOG_LOGID_MAGIC = 0x1064553b,
 } llog_op_type;
 
 /* Log record header - stored in little endian order.
@@ -896,7 +907,7 @@ struct llog_rec_tail {
 struct llog_logid_rec {
         struct llog_rec_hdr     lid_hdr;
         struct llog_logid       lid_id;
-        __u32                   padding;
+        __u32                   padding[5];
         struct llog_rec_tail    lid_tail;
 } __attribute__((packed));
 
index c940ac1..14943f8 100644 (file)
@@ -39,6 +39,11 @@ static inline char * ptlrpc_import_state_name(enum lustre_imp_state state)
         return import_state_names[state];
 }
 
+enum obd_import_event {
+        IMP_EVENT_DISCON     = 0x808001,
+        IMP_EVENT_INVALIDATE = 0x808002,
+        IMP_EVENT_ACTIVE     = 0x808003,
+};
 
 struct obd_import {
         struct portals_handle     imp_handle;
@@ -75,7 +80,8 @@ struct obd_import {
         /* flags */
         int                       imp_invalid:1, imp_replayable:1,
                                   imp_dlm_fake:1, imp_server_timeout:1,
-                                  imp_initial_recov:1;
+                                  imp_initial_recov:1, imp_force_verify:1,
+                                  imp_pingable:1;
         __u32                     imp_connect_op;
 };
 
index 9013e8a..4eef3be 100644 (file)
@@ -674,3 +674,23 @@ do {                                                                           \
 })
 
 #endif /* _LUSTRE_LIB_H */
+
+#define LMD_MAGIC 0xbdacbdac
+
+#define lmd_bad_magic(LMDP)                                             \
+({                                                                      \
+        struct lustre_mount_data *_lmd__ = (LMDP);                      \
+        int _ret__ = 0;                                                 \
+        if (!_lmd__) {                                                  \
+                CERROR("Missing mount data: "                           \
+                       "check that /sbin/mount.lustre is installed.\n");\
+                _ret__ = 1;                                             \
+        } else if (_lmd__->lmd_magic != LMD_MAGIC) {                    \
+                CERROR("Invalid mount data (%#x != %#x): "              \
+                       "check that /sbin/mount.lustre is installed\n",  \
+                       _lmd__->lmd_magic, LMD_MAGIC);                   \
+                _ret__ = 1;                                             \
+        }                                                               \
+        _ret__;                                                         \
+})
+
index 1ea4740..1d0ff9f 100644 (file)
@@ -121,7 +121,7 @@ int llog_obd_origin_add(struct llog_ctxt *ctxt,
 
 int llog_cat_initialize(struct obd_device *obd, int count);
 int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
-                  int count, struct llog_logid *logid);
+                  int count, struct llog_catid *logid);
 
 int obd_llog_finish(struct obd_device *obd, int count);
 
@@ -134,7 +134,8 @@ int llog_catlog_list(struct obd_device *obd, int count,
 int llog_initiator_connect(struct llog_ctxt *ctxt);
 int llog_receptor_accept(struct llog_ctxt *ctxt, struct obd_import *imp);
 int llog_origin_connect(struct llog_ctxt *ctxt, int count,
-                        struct llog_logid *logid, struct llog_gen *gen);
+                        struct llog_logid *logid, struct llog_gen *gen,
+                        struct obd_uuid *uuid);
 int llog_handle_connect(struct ptlrpc_request *req);
 
 /* recov_thread.c */
@@ -143,7 +144,8 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt,
                          struct llog_cookie *cookies, int flags);
 int llog_obd_repl_sync(struct llog_ctxt *ctxt, struct obd_export *exp);
 int llog_repl_connect(struct llog_ctxt *ctxt, int count,
-                      struct llog_logid *logid, struct llog_gen *gen);
+                      struct llog_logid *logid, struct llog_gen *gen,
+                      struct obd_uuid *uuid);
 
 struct llog_operations {
         int (*lop_write_rec)(struct llog_handle *loghandle,
@@ -169,15 +171,15 @@ struct llog_operations {
         int (*lop_cancel)(struct llog_ctxt *ctxt, struct lov_stripe_md *lsm,
                           int count, struct llog_cookie *cookies, int flags);
         int (*lop_connect)(struct llog_ctxt *ctxt, int count,
-                           struct llog_logid *logid, struct llog_gen *gen);
+                           struct llog_logid *logid, struct llog_gen *gen,
+                           struct obd_uuid *uuid);
         /* XXX add 2 more: commit callbacks and llog recovery functions */
 };
 
 /* llog_lvfs.c */
-int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
-                      char *name, int count, struct llog_logid *idarray);
 extern struct llog_operations llog_lvfs_ops;
-
+int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
+                      char *name, int count, struct llog_catid *idarray);
 
 struct llog_ctxt {
         int                      loc_idx; /* my index the obd array of ctxt's */
@@ -222,7 +224,7 @@ static inline int llog_obd2ops(struct llog_ctxt *ctxt,
 {
        if (ctxt == NULL)
                 return -ENOTCONN;
-        
+
         *lop = ctxt->loc_logops;
         if (*lop == NULL)
                 return -EOPNOTSUPP;
@@ -269,10 +271,10 @@ static inline int llog_write_rec(struct llog_handle *handle,
                 RETURN(-EOPNOTSUPP);
 
         if (buf)
-                buflen = le32_to_cpu(rec->lrh_len) + sizeof(struct llog_rec_hdr)
+                buflen = rec->lrh_len + sizeof(struct llog_rec_hdr)
                                 + sizeof(struct llog_rec_tail);
         else
-                buflen = le32_to_cpu(rec->lrh_len);
+                buflen = rec->lrh_len;
         LASSERT(size_round(buflen) == buflen);
 
         rc = lop->lop_write_rec(handle, rec, logcookies, numcookies, buf, idx);
@@ -368,7 +370,8 @@ static inline int llog_create(struct llog_ctxt *ctxt, struct llog_handle **res,
 }
 
 static inline int llog_connect(struct llog_ctxt *ctxt, int count,
-                               struct llog_logid *logid, struct llog_gen *gen)
+                               struct llog_logid *logid, struct llog_gen *gen,
+                               struct obd_uuid *uuid)
 {
         struct llog_operations *lop;
         int rc;
@@ -380,7 +383,7 @@ static inline int llog_connect(struct llog_ctxt *ctxt, int count,
         if (lop->lop_connect == NULL)
                 RETURN(-EOPNOTSUPP);
 
-        rc = lop->lop_connect(ctxt, count, logid, gen);
+        rc = lop->lop_connect(ctxt, count, logid, gen, uuid);
         RETURN(rc);
 }
 
index 8b34ada..13ce57e 100644 (file)
@@ -243,8 +243,9 @@ struct ptlrpc_request {
         spinlock_t rq_lock;
         /* client-side flags */
         unsigned int rq_intr:1, rq_replied:1, rq_err:1,
-            rq_timedout:1, rq_resend:1, rq_restart:1, rq_replay:1,
-            rq_no_resend:1, rq_waiting:1, rq_receiving_reply:1;
+                rq_timedout:1, rq_resend:1, rq_restart:1, rq_replay:1,
+                rq_no_resend:1, rq_waiting:1, rq_receiving_reply:1,
+                rq_no_delay:1;
         int rq_phase;
         /* client-side refcount for SENT race */
         atomic_t rq_refcount;
@@ -404,7 +405,6 @@ struct ptlrpc_request_buffer_desc {
         struct ptlrpc_srv_ni  *rqbd_srv_ni;
         ptl_handle_md_t        rqbd_md_h;
         int                    rqbd_refcount;
-        int                    rqbd_eventcount;
         char                  *rqbd_buffer;
         struct ptlrpc_cb_id    rqbd_cbid;
         struct ptlrpc_request  rqbd_req;
@@ -425,7 +425,7 @@ struct ptlrpc_srv_ni {
         /* Interface-specific service state */
         struct ptlrpc_service  *sni_service;    /* owning service */
         struct ptlrpc_ni       *sni_ni;         /* network interface */
-        struct list_head        sni_rqbds;      /* all the request buffers */
+        struct list_head        sni_active_rqbds;   /* req buffers receiving */
         struct list_head        sni_active_replies; /* all the active replies */
         int                     sni_nrqbd_receiving; /* # posted request buffers */
 };
@@ -440,6 +440,7 @@ struct ptlrpc_service {
         int              srv_nthreads;          /* # running threads */
         int              srv_n_difficult_replies; /* # 'difficult' replies */
         int              srv_n_active_reqs;     /* # reqs being served */
+        int              srv_rqbd_timeout;      /* timeout before re-posting reqs */
         
         __u32 srv_req_portal;
         __u32 srv_rep_portal;
@@ -447,6 +448,8 @@ struct ptlrpc_service {
         int               srv_n_queued_reqs;    /* # reqs waiting to be served */
         struct list_head  srv_request_queue;    /* reqs waiting for service */
 
+        struct list_head  srv_idle_rqbds;       /* request buffers to be reposted */
+
         atomic_t          srv_outstanding_replies;
         struct list_head  srv_reply_queue;      /* replies waiting for service */
 
@@ -509,7 +512,7 @@ int ptlrpc_reply(struct ptlrpc_request *req);
 int ptlrpc_error(struct ptlrpc_request *req);
 void ptlrpc_resend_req(struct ptlrpc_request *request);
 int ptl_send_rpc(struct ptlrpc_request *request);
-void ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
+int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
 
 /* ptlrpc/client.c */
 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
index bf27a40..b18769f 100644 (file)
@@ -56,8 +56,8 @@ void pop_ctxt(struct obd_run_ctxt *saved, struct obd_run_ctxt *new_ctx,
 
 #ifdef __KERNEL__
 
-struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode);
-struct dentry *simple_mknod(struct dentry *dir, char *name, int mode);
+struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode, int fix);
+struct dentry *simple_mknod(struct dentry *dir, char *name, int mode, int fix);
 int lustre_fread(struct file *file, void *buf, int len, loff_t *off);
 int lustre_fwrite(struct file *file, const void *buf, int len, loff_t *off);
 int lustre_fsync(struct file *file);
index 365708b..24ee1c2 100644 (file)
@@ -76,7 +76,8 @@ struct lov_stripe_md {
         /* Public members. */
         __u64 lsm_object_id;        /* lov object id */
         __u64 lsm_object_gr;        /* lov object id */
-        __u64 lsm_maxbytes;
+        __u64 lsm_maxbytes;         /* maximum possible file size */
+        unsigned long lsm_xfersize; /* optimal transfer size */
 
         /* LOV-private members start here -- only for use in lov/. */
         __u32 lsm_magic;
@@ -174,6 +175,7 @@ struct filter_obd {
 
         struct list_head     fo_export_list;
         int                  fo_subdir_count;
+
         obd_size             fo_tot_dirty;      /* protected by obd_osfs_lock */
         obd_size             fo_tot_granted;    /* all values in bytes */
         obd_size             fo_tot_pending;
@@ -248,6 +250,7 @@ struct client_obd {
 
         struct mdc_rpc_lock     *cl_rpc_lock;
         struct mdc_rpc_lock     *cl_setattr_lock;
+        struct osc_creator      cl_oscc;
 };
 
 /* Like a client, with some hangers-on.  Keep mc_client_obd first so that we
@@ -614,7 +617,7 @@ struct obd_ops {
         int (*o_commitrw)(int cmd, struct obd_export *exp, struct obdo *oa,
                           int objcount, struct obd_ioobj *obj,
                           int niocount, struct niobuf_local *local,
-                          struct obd_trans_info *oti);
+                          struct obd_trans_info *oti, int rc);
         int (*o_enqueue)(struct obd_export *, struct lov_stripe_md *,
                          __u32 type, ldlm_policy_data_t *, __u32 mode,
                          int *flags, void *bl_cb, void *cp_cb, void *gl_cb,
@@ -638,7 +641,7 @@ struct obd_ops {
 
         /* llog related obd_methods */
         int (*o_llog_init)(struct obd_device *obd, struct obd_device *disk_obd,
-                           int count, struct llog_logid *logid);
+                           int count, struct llog_catid *logid);
         int (*o_llog_finish)(struct obd_device *obd, int count);
 
         /* metadata-only methods */
@@ -646,7 +649,8 @@ struct obd_ops {
                      struct obd_client_handle *, int flag);
         int (*o_unpin)(struct obd_export *, struct obd_client_handle *, int);
 
-        int (*o_invalidate_import)(struct obd_device *, struct obd_import *);
+        int (*o_import_event)(struct obd_device *, struct obd_import *,
+                              enum obd_import_event);
 
         int (*o_notify)(struct obd_device *obd, struct obd_device *watched,
                         int active);
index 90a521b..6c97a05 100644 (file)
@@ -828,16 +828,15 @@ static inline int obd_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
 static inline int obd_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
                                int objcount, struct obd_ioobj *obj,
                                int niocount, struct niobuf_local *local,
-                               struct obd_trans_info *oti)
+                               struct obd_trans_info *oti, int rc)
 {
-        int rc;
         ENTRY;
 
         OBD_CHECK_OP(exp->exp_obd, commitrw, -EOPNOTSUPP);
         OBD_COUNTER_INCREMENT(exp->exp_obd, commitrw);
 
         rc = OBP(exp->exp_obd, commitrw)(cmd, exp, oa, objcount, obj, niocount,
-                                         local, oti);
+                                         local, oti, rc);
         RETURN(rc);
 }
 
@@ -970,12 +969,14 @@ static inline int obd_unpin(struct obd_export *exp,
         return(rc);
 }
 
-static inline void obd_invalidate_import(struct obd_device *obd,
-                                         struct obd_import *imp)
+
+static inline void obd_import_event(struct obd_device *obd,
+                                    struct obd_import *imp,
+                                    enum obd_import_event event)
 {
-        if (obd->obd_set_up && OBP(obd, invalidate_import)) {
-                OBD_COUNTER_INCREMENT(obd, invalidate_import);
-                OBP(obd, invalidate_import)(obd, imp);
+        if (obd->obd_set_up && OBP(obd, import_event)) {
+                OBD_COUNTER_INCREMENT(obd, import_event);
+                OBP(obd, import_event)(obd, imp, event);
         }
 }
 
index 7b232ea..41fb301 100644 (file)
@@ -122,6 +122,7 @@ extern unsigned int obd_sync_filter;
 
 #define OBD_FAIL_PTLRPC                  0x500
 #define OBD_FAIL_PTLRPC_ACK              0x501
+#define OBD_FAIL_PTLRPC_RQBD             0x502
 
 #define OBD_FAIL_OBD_PING_NET            0x600
 #define OBD_FAIL_OBD_LOG_CANCEL_NET      0x601
index f91b8ff..d722b68 100644 (file)
@@ -5,10 +5,10 @@
  include/linux/ext3_fs_sb.h |   10 +
  5 files changed, 365 insertions(+)
 
-Index: linux-2.4.20-hp4-pnnl13/fs/ext3/super.c
+Index: linux/fs/ext3/super.c
 ===================================================================
---- linux-2.4.20-hp4-pnnl13.orig/fs/ext3/super.c       2004-01-12 19:22:32.000000000 +0300
-+++ linux-2.4.20-hp4-pnnl13/fs/ext3/super.c    2004-01-13 17:04:38.000000000 +0300
+--- linux.orig/fs/ext3/super.c Mon Feb  2 20:57:35 2004
++++ linux/fs/ext3/super.c      Mon Feb  2 20:58:05 2004
 @@ -400,6 +400,221 @@
        }
  }
@@ -298,10 +298,10 @@ Index: linux-2.4.20-hp4-pnnl13/fs/ext3/super.c
        if (sbi->s_mount_opt & EXT3_MOUNT_ABORT)
                ext3_abort(sb, __FUNCTION__, "Abort forced by user");
  
-Index: linux-2.4.20-hp4-pnnl13/fs/ext3/inode.c
+Index: linux/fs/ext3/inode.c
 ===================================================================
---- linux-2.4.20-hp4-pnnl13.orig/fs/ext3/inode.c       2004-01-12 19:22:32.000000000 +0300
-+++ linux-2.4.20-hp4-pnnl13/fs/ext3/inode.c    2004-01-13 17:01:40.000000000 +0300
+--- linux.orig/fs/ext3/inode.c Mon Feb  2 20:57:35 2004
++++ linux/fs/ext3/inode.c      Mon Feb  2 20:58:05 2004
 @@ -2500,6 +2500,118 @@
        return err;
  }
@@ -421,10 +421,10 @@ Index: linux-2.4.20-hp4-pnnl13/fs/ext3/inode.c
  /* 
   * On success, We end up with an outstanding reference count against
   * iloc->bh.  This _must_ be cleaned up later. 
-Index: linux-2.4.20-hp4-pnnl13/fs/ext3/file.c
+Index: linux/fs/ext3/file.c
 ===================================================================
---- linux-2.4.20-hp4-pnnl13.orig/fs/ext3/file.c        2004-01-12 19:22:32.000000000 +0300
-+++ linux-2.4.20-hp4-pnnl13/fs/ext3/file.c     2004-01-13 17:01:40.000000000 +0300
+--- linux.orig/fs/ext3/file.c  Mon Feb  2 20:57:34 2004
++++ linux/fs/ext3/file.c       Mon Feb  2 20:58:05 2004
 @@ -125,7 +125,11 @@
  };
  
@@ -437,23 +437,10 @@ Index: linux-2.4.20-hp4-pnnl13/fs/ext3/file.c
        setattr:        ext3_setattr,           /* BKL held */
        setxattr:       ext3_setxattr,          /* BKL held */
        getxattr:       ext3_getxattr,          /* BKL held */
-Index: linux-2.4.20-hp4-pnnl13/fs/buffer.c
+Index: linux/include/linux/ext3_fs.h
 ===================================================================
---- linux-2.4.20-hp4-pnnl13.orig/fs/buffer.c   2003-09-13 15:19:26.000000000 +0400
-+++ linux-2.4.20-hp4-pnnl13/fs/buffer.c        2004-01-13 17:01:40.000000000 +0300
-@@ -376,6 +376,8 @@
-       if (sb->s_op && sb->s_op->sync_fs)
-               sb->s_op->sync_fs(sb);
-       unlock_super(sb);
-+      if (sb->s_op && sb->s_op->sync_fs)
-+              sb->s_op->sync_fs(sb);
-       unlock_kernel();
-       return sync_buffers(dev, 1);
-Index: linux-2.4.20-hp4-pnnl13/include/linux/ext3_fs.h
-===================================================================
---- linux-2.4.20-hp4-pnnl13.orig/include/linux/ext3_fs.h       2004-01-12 19:22:32.000000000 +0300
-+++ linux-2.4.20-hp4-pnnl13/include/linux/ext3_fs.h    2004-01-13 17:01:40.000000000 +0300
+--- linux.orig/include/linux/ext3_fs.h Mon Feb  2 20:57:35 2004
++++ linux/include/linux/ext3_fs.h      Mon Feb  2 20:58:05 2004
 @@ -193,6 +193,7 @@
   */
  #define EXT3_STATE_JDATA              0x00000001 /* journaled data exists */
@@ -480,10 +467,10 @@ Index: linux-2.4.20-hp4-pnnl13/include/linux/ext3_fs.h
  
  /* ioctl.c */
  extern int ext3_ioctl (struct inode *, struct file *, unsigned int,
-Index: linux-2.4.20-hp4-pnnl13/include/linux/ext3_fs_sb.h
+Index: linux/include/linux/ext3_fs_sb.h
 ===================================================================
---- linux-2.4.20-hp4-pnnl13.orig/include/linux/ext3_fs_sb.h    2004-01-12 19:22:32.000000000 +0300
-+++ linux-2.4.20-hp4-pnnl13/include/linux/ext3_fs_sb.h 2004-01-13 17:01:40.000000000 +0300
+--- linux.orig/include/linux/ext3_fs_sb.h      Mon Feb  2 20:57:35 2004
++++ linux/include/linux/ext3_fs_sb.h   Mon Feb  2 20:58:05 2004
 @@ -29,6 +29,8 @@
  
  #define EXT3_MAX_GROUP_LOADED 8
diff --git a/lustre/kernel_patches/patches/invalidate_show-2.4.20-hp.patch b/lustre/kernel_patches/patches/invalidate_show-2.4.20-hp.patch
new file mode 100644 (file)
index 0000000..fad6233
--- /dev/null
@@ -0,0 +1,123 @@
+
+
+
+ fs/inode.c         |   21 ++++++++++++++-------
+ fs/smbfs/inode.c   |    2 +-
+ fs/super.c         |    4 ++--
+ include/linux/fs.h |    2 +-
+ 4 files changed, 18 insertions(+), 11 deletions(-)
+
+Index: linux/fs/inode.c
+===================================================================
+--- linux.orig/fs/inode.c      Mon Feb  2 21:24:21 2004
++++ linux/fs/inode.c   Mon Feb  2 21:27:53 2004
+@@ -632,7 +632,8 @@
+ /*
+  * Invalidate all inodes for a device.
+  */
+-static int invalidate_list(struct list_head *head, struct super_block * sb, struct list_head * dispose)
++static int invalidate_list(struct list_head *head, struct super_block * sb,
++                         struct list_head * dispose, int show)
+ {
+       struct list_head *next;
+       int busy = 0, count = 0;
+@@ -657,6 +658,11 @@
+                       count++;
+                       continue;
+               }
++              if (show)
++                      printk(KERN_ERR
++                             "inode busy: dev %s:%lu (%p) mode %o count %u\n",
++                             kdevname(sb->s_dev), inode->i_ino, inode,
++                             inode->i_mode, atomic_read(&inode->i_count));
+               busy = 1;
+       }
+       /* only unused inodes may be cached with i_count zero */
+@@ -675,23 +681,24 @@
+ /**
+  *    invalidate_inodes       - discard the inodes on a device
+  *    @sb: superblock
++ *    @show: whether we should display any busy inodes found
+  *
+  *    Discard all of the inodes for a given superblock. If the discard
+  *    fails because there are busy inodes then a non zero value is returned.
+  *    If the discard is successful all the inodes have been discarded.
+  */
+  
+-int invalidate_inodes(struct super_block * sb)
++int invalidate_inodes(struct super_block * sb, int show)
+ {
+       int busy;
+       LIST_HEAD(throw_away);
+       spin_lock(&inode_lock);
+-      busy = invalidate_list(&inode_in_use, sb, &throw_away);
+-      busy |= invalidate_list(&inode_unused, sb, &throw_away);
+-      busy |= invalidate_list(&inode_unused_pagecache, sb, &throw_away);
+-      busy |= invalidate_list(&sb->s_dirty, sb, &throw_away);
+-      busy |= invalidate_list(&sb->s_locked_inodes, sb, &throw_away);
++      busy = invalidate_list(&inode_in_use, sb, &throw_away, show);
++      busy |= invalidate_list(&inode_unused, sb, &throw_away, show);
++      busy |= invalidate_list(&inode_unused_pagecache, sb, &throw_away, show);
++      busy |= invalidate_list(&sb->s_dirty, sb, &throw_away, show);
++      busy |= invalidate_list(&sb->s_locked_inodes, sb, &throw_away, show);
+       spin_unlock(&inode_lock);
+       dispose_list(&throw_away);
+@@ -717,7 +724,7 @@
+                * hold).
+                */
+               shrink_dcache_sb(sb);
+-              res = invalidate_inodes(sb);
++              res = invalidate_inodes(sb, 0);
+               drop_super(sb);
+       }
+       invalidate_buffers(dev);
+Index: linux/fs/super.c
+===================================================================
+--- linux.orig/fs/super.c      Mon Feb  2 21:24:21 2004
++++ linux/fs/super.c   Mon Feb  2 21:26:08 2004
+@@ -844,7 +844,7 @@
+       lock_super(sb);
+       lock_kernel();
+       sb->s_flags &= ~MS_ACTIVE;
+-      invalidate_inodes(sb);  /* bad name - it should be evict_inodes() */
++      invalidate_inodes(sb, 0);  /* bad name - it should be evict_inodes() */
+       if (sop) {
+               if (sop->write_super && sb->s_dirt)
+                       sop->write_super(sb);
+@@ -853,7 +853,7 @@
+       }
+       /* Forget any remaining inodes */
+-      if (invalidate_inodes(sb)) {
++      if (invalidate_inodes(sb, 1)) {
+               printk(KERN_ERR "VFS: Busy inodes after unmount. "
+                       "Self-destruct in 5 seconds.  Have a nice day...\n");
+       }
+Index: linux/include/linux/fs.h
+===================================================================
+--- linux.orig/include/linux/fs.h      Mon Feb  2 21:24:23 2004
++++ linux/include/linux/fs.h   Mon Feb  2 21:26:08 2004
+@@ -1257,7 +1257,7 @@
+ extern void set_buffer_flushtime(struct buffer_head *);
+ extern void balance_dirty(void);
+ extern int check_disk_change(kdev_t);
+-extern int invalidate_inodes(struct super_block *);
++extern int invalidate_inodes(struct super_block *, int);
+ extern int invalidate_device(kdev_t, int);
+ extern void invalidate_inode_pages(struct inode *);
+ extern void invalidate_inode_pages2(struct address_space *);
+Index: linux/fs/smbfs/inode.c
+===================================================================
+--- linux.orig/fs/smbfs/inode.c        Thu Nov 28 18:53:15 2002
++++ linux/fs/smbfs/inode.c     Mon Feb  2 21:26:08 2004
+@@ -167,7 +167,7 @@
+ {
+       VERBOSE("\n");
+       shrink_dcache_sb(SB_of(server));
+-      invalidate_inodes(SB_of(server));
++      invalidate_inodes(SB_of(server), 0);
+ }
+ /*
index e3c5a62..6cfe667 100644 (file)
@@ -3,7 +3,7 @@ dev_read_only_hp_2.4.20.patch
 exports_2.4.20-rh-hp.patch
 lustre_version.patch
 vfs_intent-2.4.20-hp.patch
-invalidate_show.patch
+invalidate_show-2.4.20-hp.patch 
 export-truncate.patch
 iod-stock-24-exports_hp.patch
 ext-2.4-patch-1.patch
@@ -25,6 +25,8 @@ ext3-map_inode_page.patch
 ext3-error-export.patch
 iopen-2.4.20.patch
 tcp-zero-copy.patch
+jbd-dont-account-blocks-twice.patch
+jbd-commit-tricks.patch
 add_page_private.patch
 socket-exports-vanilla.patch
 removepage-2.4.20.patch
@@ -32,6 +34,7 @@ jbd-ctx_switch.patch
 jbd-flushtime.patch
 jbd-get_write_access.patch
 nfs_export_kernel-2.4.20-hp.patch
+ext3-raw-lookup.patch
 ext3-ea-in-inode-2.4.20.patch
 listman-2.4.20.patch
 ext3-trusted_ea-2.4.20.patch
index 32fb89d..c2c1e25 100644 (file)
  * - the maximum extent
  * - containing the requested extent
  * - and not overlapping existing conflicting extents outside the requested one
- *
- * An alternative policy is to not shrink the new extent when conflicts exist */
+ */
 static void
 ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req,
                             struct ldlm_extent *new_ex)
 {
         struct list_head *tmp;
         ldlm_mode_t req_mode = req->l_req_mode;
-        __u64 req_start = req->l_policy_data.l_extent.start;
-        __u64 req_end = req->l_policy_data.l_extent.end;
+        __u64 req_start = req->l_req_extent.start;
+        __u64 req_end = req->l_req_extent.end;
         ENTRY;
 
-        if (new_ex->start == req_start && new_ex->end == req_end) {
-                EXIT;
-                return;
-        }
+        lockmode_verify(req_mode);
 
         list_for_each(tmp, queue) {
                 struct ldlm_lock *lock;
+                struct ldlm_extent *l_extent;
+
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+                l_extent = &lock->l_policy_data.l_extent;
 
-                if (req == lock) {
+                if (new_ex->start == req_start && new_ex->end == req_end) {
                         EXIT;
                         return;
                 }
 
-                /* if lock doesn't overlap new_ex, skip it. */
-                if (lock->l_policy_data.l_extent.end < new_ex->start ||
-                    lock->l_policy_data.l_extent.start > new_ex->end)
+                /* Don't conflict with ourselves */
+                if (req == lock)
+                        continue;
+
+                /* If lock doesn't overlap new_ex, skip it. */
+                if (l_extent->end < new_ex->start ||
+                    l_extent->start > new_ex->end)
                         continue;
 
                 /* Locks are compatible, overlap doesn't matter */
                 if (lockmode_compat(lock->l_req_mode, req_mode))
                         continue;
 
-                if (lock->l_policy_data.l_extent.start < req_start) {
-                        if (lock->l_policy_data.l_extent.end == ~0) {
+                /* Locks conflicting in requested extents and we can't satisfy
+                 * both locks, so ignore it.  Either we will ping-pong this
+                 * extent (we would regardless of what extent we granted) or
+                 * lock is unused and it shouldn't limit our extent growth. */
+                if (lock->l_req_extent.end >= req_start &&
+                    lock->l_req_extent.start <= req_end)
+                        continue;
+
+                /* We grow extents downwards only as far as they don't overlap
+                 * with already-granted locks, on the assumtion that clients
+                 * will be writing beyond the initial requested end and would
+                 * then need to enqueue a new lock beyond the previous request.
+                 * We don't grow downwards if there are lots of lockers. */
+                if (l_extent->start < req_start) {
+                        if (atomic_read(&req->l_resource->lr_refcount) > 20)
                                 new_ex->start = req_start;
-                                new_ex->end = req_end;
-                                EXIT;
-                                return;
-                        }
-                        new_ex->start = min(lock->l_policy_data.l_extent.end+1,
-                                            req_start);
+                        else
+                                new_ex->start = min(l_extent->end+1, req_start);
                 }
 
-                if (lock->l_policy_data.l_extent.end > req_end) {
-                        if (lock->l_policy_data.l_extent.start == 0) {
-                                new_ex->start = req_start;
-                                new_ex->end = req_end;
-                                EXIT;
-                                return;
-                        }
-                        new_ex->end = MAX(lock->l_policy_data.l_extent.start-1,
-                                          req_end);
+                /* If we need to cancel this lock anyways because our request
+                 * overlaps the granted lock, we grow up to its requested
+                 * extent start instead of limiting this extent, assuming that
+                 * clients are writing forwards and the lock had over grown
+                 * its extent downwards before we enqueued our request. */
+                if (l_extent->end > req_end) {
+                        if (l_extent->start <= req_end)
+                                new_ex->end = max(lock->l_req_extent.start - 1,
+                                                  req_end);
+                        else
+                                new_ex->end = max(l_extent->start - 1, req_end);
                 }
         }
         EXIT;
 }
 
-/* Determine if the lock is compatible with all locks on the queue. */
+/* In order to determine the largest possible extent we can grant, we need
+ * to scan all of the queues. */
+static void ldlm_extent_policy(struct ldlm_resource *res,
+                               struct ldlm_lock *lock, int *flags)
+{
+        struct ldlm_extent new_ex = { .start = 0, .end = ~0};
+
+        ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex);
+        ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex);
+
+        if (new_ex.start != lock->l_policy_data.l_extent.start ||
+            new_ex.end != lock->l_policy_data.l_extent.end) {
+                *flags |= LDLM_FL_LOCK_CHANGED;
+                lock->l_policy_data.l_extent.start = new_ex.start;
+                lock->l_policy_data.l_extent.end = new_ex.end;
+        }
+}
+
+/* Determine if the lock is compatible with all locks on the queue.
+ * We stop walking the queue if we hit ourselves so we don't take
+ * conflicting locks enqueued after us into accound, or we'd wait forever. */
 static int
 ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
                          int send_cbs)
@@ -104,11 +138,13 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
         struct list_head *tmp;
         struct ldlm_lock *lock;
         ldlm_mode_t req_mode = req->l_req_mode;
-        __u64 req_start = req->l_policy_data.l_extent.start;
-        __u64 req_end = req->l_policy_data.l_extent.end;
+        __u64 req_start = req->l_req_extent.start;
+        __u64 req_end = req->l_req_extent.end;
         int compat = 1;
         ENTRY;
 
+        lockmode_verify(req_mode);
+
         list_for_each(tmp, queue) {
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
@@ -148,7 +184,6 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                              ldlm_error_t *err)
 {
         struct ldlm_resource *res = lock->l_resource;
-        struct ldlm_extent new_ex = {0, ~0};
         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
         int rc;
         ENTRY;
@@ -165,22 +200,12 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                         RETURN(LDLM_ITER_STOP);
 
                 ldlm_resource_unlink_lock(lock);
+
+                ldlm_extent_policy(res, lock, flags);
                 ldlm_grant_lock(lock, NULL, 0, 1);
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
-        /* In order to determine the largest possible extent we can
-         * grant, we need to scan all of the queues. */
-        ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex);
-        ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex);
-
-        if (new_ex.start != lock->l_policy_data.l_extent.start ||
-            new_ex.end != lock->l_policy_data.l_extent.end) {
-                *flags |= LDLM_FL_LOCK_CHANGED;
-                lock->l_policy_data.l_extent.start = new_ex.start;
-                lock->l_policy_data.l_extent.end = new_ex.end;
-        }
-
  restart:
         LASSERT(res->lr_tmp == NULL);
         res->lr_tmp = &rpc_list;
@@ -204,6 +229,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                         GOTO(restart, -ERESTART);
                 *flags |= LDLM_FL_BLOCK_GRANTED;
         } else {
+                ldlm_extent_policy(res, lock, flags);
                 ldlm_resource_unlink_lock(lock);
                 ldlm_grant_lock(lock, NULL, 0, 0);
         }
index 181c72e..148be59 100644 (file)
@@ -150,6 +150,8 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                         }
                 }
         } else {
+                lockmode_verify(mode);
+
                 /* This loop determines if there are existing locks
                  * that conflict with the new lock request. */
                 list_for_each(tmp, &res->lr_granted) {
@@ -164,7 +166,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
                         /* locks are compatible, overlap doesn't matter */
                         if (lockmode_compat(lock->l_granted_mode, mode))
                                 continue;
-                        
+
                         if (!ldlm_flocks_overlap(lock, req))
                                 continue;
 
index 4186f5c..4111cbe 100644 (file)
@@ -19,6 +19,10 @@ void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue);
 int ldlm_run_ast_work(struct ldlm_namespace *, struct list_head *rpc_list);
 
+/* ldlm_lockd.c */
+int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+                      struct ldlm_lock *lock);
+
 /* ldlm_plain.c */
 int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
                             ldlm_error_t *err);
index 2787619..bcaed00 100644 (file)
@@ -255,6 +255,7 @@ int client_connect_import(struct lustre_handle *dlm_handle,
                 GOTO(out_ldlm, rc);
         }
 
+        ptlrpc_pinger_add_import(imp);
         EXIT;
 
         if (rc) {
@@ -312,7 +313,7 @@ int client_disconnect_export(struct obd_export *exp, int failover)
 
         /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
         if (obd->obd_no_recov)
-                ptlrpc_set_import_active(imp, 0);
+                ptlrpc_invalidate_import(imp);
         else
                 rc = ptlrpc_disconnect_import(imp);
 
index bb0c0c1..0e7f0b0 100644 (file)
@@ -430,9 +430,9 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode)
 {
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
         ldlm_lock_remove_from_lru(lock);
-        if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
+        if (mode & (LCK_NL | LCK_CR | LCK_PR))
                 lock->l_readers++;
-        else
+        if (mode & (LCK_EX | LCK_CW | LCK_PW))
                 lock->l_writers++;
         lock->l_last_used = jiffies;
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
@@ -448,10 +448,11 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
         ns = lock->l_resource->lr_namespace;
         l_lock(&ns->ns_lock);
-        if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) {
+        if (mode & (LCK_NL | LCK_CR | LCK_PR)) {
                 LASSERT(lock->l_readers > 0);
                 lock->l_readers--;
-        } else {
+        }
+        if (mode & (LCK_EX | LCK_CW | LCK_PW)) {
                 LASSERT(lock->l_writers > 0);
                 lock->l_writers--;
         }
@@ -473,13 +474,11 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
                                "warning\n");
 
                 LDLM_DEBUG(lock, "final decref done on cbpending lock");
-                l_unlock(&ns->ns_lock);
 
-                l_check_no_ns_lock(ns);
-                /* FIXME: need a real 'desc' here */
-                if (lock->l_blocking_ast != NULL)
-                        lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
-                                             LDLM_CB_BLOCKING);
+                LDLM_LOCK_GET(lock); /* dropped by bl thread */
+                ldlm_lock_remove_from_lru(lock);
+                ldlm_bl_to_thread(ns, NULL, lock);
+                l_unlock(&ns->ns_lock);
         } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
                    !lock->l_readers && !lock->l_writers) {
                 /* If this is a client-side namespace and this was the last
@@ -577,7 +576,7 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                     lock->l_readers == 0 && lock->l_writers == 0)
                         continue;
 
-                if (lock->l_req_mode != mode)
+                if (!(lock->l_req_mode & mode))
                         continue;
 
                 if (lock->l_resource->lr_type == LDLM_EXTENT &&
@@ -593,7 +592,10 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                     !(lock->l_flags & LDLM_FL_LOCAL))
                         continue;
 
-                ldlm_lock_addref_internal(lock, mode);
+                if (flags & LDLM_FL_TEST_LOCK)
+                        LDLM_LOCK_GET(lock);
+                else
+                        ldlm_lock_addref_internal(lock, mode);
                 return lock;
         }
 
@@ -622,6 +624,8 @@ void ldlm_lock_allow_match(struct ldlm_lock *lock)
  * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked
  *     to be canceled can still be matched as long as they still have reader
  *     or writer refernces
+ * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock,
+ *     just tell us if we would have matched.
  *
  * Returns 1 if it finds an already-existing lock that is compatible; in this
  * case, lockh is filled in with a addref()ed lock
@@ -672,25 +676,40 @@ int ldlm_lock_match(struct ldlm_namespace *ns, int flags,
         l_unlock(&ns->ns_lock);
 
         if (lock) {
-                struct l_wait_info lwi;
                 ldlm_lock2handle(lock, lockh);
-                if (lock->l_completion_ast)
-                        lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
-                                               NULL);
+                if (!(lock->l_flags & LDLM_FL_CAN_MATCH)) {
+                        struct l_wait_info lwi;
+                        if (lock->l_completion_ast)
+                                lock->l_completion_ast(lock,
+                                                       LDLM_FL_WAIT_NOREPROC,
+                                                       NULL);
 
-                lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, NULL, NULL, NULL);
+                        lwi = LWI_TIMEOUT_INTR(obd_timeout*HZ, NULL,NULL,NULL);
 
-                /* XXX FIXME see comment about CAN_MATCH in lustre_dlm.h */
-                l_wait_event(lock->l_waitq,
-                             (lock->l_flags & LDLM_FL_CAN_MATCH), &lwi);
+                        /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
+                        l_wait_event(lock->l_waitq,
+                                     (lock->l_flags & LDLM_FL_CAN_MATCH), &lwi);
+                }
         }
         if (rc)
-                LDLM_DEBUG(lock, "matched");
-        else
-                LDLM_DEBUG_NOLOCK("not matched");
+                LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
+                           type == LDLM_PLAIN ? res_id->name[2] :
+                                policy->l_extent.start,
+                           type == LDLM_PLAIN ? res_id->name[3] :
+                                policy->l_extent.end);
+        else if (!(flags & LDLM_FL_TEST_LOCK)) /* less verbose for test-only */
+                LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
+                                  LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
+                                  type, mode, res_id->name[0], res_id->name[1],
+                                  type == LDLM_PLAIN ? res_id->name[2] :
+                                        policy->l_extent.start,
+                                  type == LDLM_PLAIN ? res_id->name[3] :
+                                        policy->l_extent.end);
 
         if (old_lock)
                 LDLM_LOCK_PUT(old_lock);
+        if (flags & LDLM_FL_TEST_LOCK && rc)
+                LDLM_LOCK_PUT(lock);
 
         return rc;
 }
@@ -1041,7 +1060,7 @@ void ldlm_cancel_locks_for_export(struct obd_export *exp)
         struct ldlm_resource *res;
 
         l_lock(&ns->ns_lock);
-        while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) {
+        while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) { 
                 lock = list_entry(exp->exp_ldlm_data.led_held_locks.next,
                                   struct ldlm_lock, l_export_chain);
                 res = ldlm_resource_getref(lock->l_resource);
@@ -1148,9 +1167,11 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos)
                "write: %d\n", (int)lock->l_req_mode, (int)lock->l_granted_mode,
                atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers);
         if (lock->l_resource->lr_type == LDLM_EXTENT)
-                CDEBUG(level, "  Extent: "LPU64" -> "LPU64"\n",
+                CDEBUG(level, "  Extent: "LPU64" -> "LPU64
+                       " (req "LPU64"-"LPU64")\n",
                        lock->l_policy_data.l_extent.start,
-                       lock->l_policy_data.l_extent.end);
+                       lock->l_policy_data.l_extent.end,
+                       lock->l_req_extent.start, lock->l_req_extent.end);
         else if (lock->l_resource->lr_type == LDLM_FLOCK)
                 CDEBUG(level, "  Pid: %d Extent: "LPU64" -> "LPU64"\n",
                        lock->l_policy_data.l_flock.pid,
index c28bbe2..5765d8c 100644 (file)
@@ -51,7 +51,7 @@ static int ldlm_refcount = 0;
 
 /* LDLM state */
 
-static struct ldlm_state *ldlm ;
+static struct ldlm_state *ldlm_state;
 
 inline unsigned long round_timeout(unsigned long timeout)
 {
@@ -498,9 +498,12 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
                 ldlm_del_waiting_lock(lock);
                 ldlm_failed_ast(lock, rc, "completion");
+        } else if (rc == -EINVAL) {
+                LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+                           "lock");
         } else if (rc) {
                 LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
-                           "completion AST\n", rc, req->rq_status);
+                           "completion AST", rc, req->rq_status);
                 ldlm_lock_cancel(lock);
                 /* Server-side AST functions are called from ldlm_reprocess_all,
                  * which needs to be told to please restart its reprocessing. */
@@ -541,10 +544,12 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
         if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
                 ldlm_del_waiting_lock(lock);
                 ldlm_failed_ast(lock, rc, "glimpse");
+        } else if (rc == -EINVAL) {
+                LDLM_DEBUG(lock, "lost the race -- client no longer has this "
+                           "lock");
         } else if (rc) {
                 LDLM_ERROR(lock, "client sent rc %d rq_status %d from "
-                           "completion AST\n", rc, req->rq_status);
-                ldlm_lock_cancel(lock);
+                           "glimpse AST", rc, req->rq_status);
         } else {
                 rc = res->lr_namespace->ns_lvbo->lvbo_update(res,
                                                              req->rq_repmsg, 0);
@@ -561,9 +566,9 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
         struct obd_device *obddev = req->rq_export->exp_obd;
         struct ldlm_reply *dlm_rep;
         struct ldlm_request *dlm_req;
-        int rc, size[2] = {sizeof(*dlm_rep)};
+        int rc = 0, size[2] = {sizeof(*dlm_rep)};
         __u32 flags;
-        ldlm_error_t err;
+        ldlm_error_t err = ELDLM_OK;
         struct ldlm_lock *lock = NULL;
         void *cookie = NULL;
         ENTRY;
@@ -574,7 +579,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                                       lustre_swab_ldlm_request);
         if (dlm_req == NULL) {
                 CERROR ("Can't unpack dlm_req\n");
-                RETURN (-EFAULT);
+                GOTO(out, rc = -EFAULT);
         }
 
         flags = dlm_req->lock_flags;
@@ -587,7 +592,7 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
                                 blocking_callback, completion_callback,
                                 glimpse_callback, NULL, 0);
         if (!lock)
-                GOTO(out, err = -ENOMEM);
+                GOTO(out, rc = -ENOMEM);
 
         do_gettimeofday(&lock->l_enqueued_time);
         memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
@@ -614,12 +619,15 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
 
                 rc = lustre_pack_reply(req, buffers, size, NULL);
                 if (rc)
-                        RETURN(rc);
+                        GOTO(out, rc);
         }
 
         if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
                 memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
                        sizeof(ldlm_policy_data_t));
+        if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+                memcpy(&lock->l_req_extent, &lock->l_policy_data.l_extent,
+                       sizeof(lock->l_req_extent));
 
         err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
         if (err)
@@ -643,26 +651,34 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req,
 
         EXIT;
  out:
-        if (lock != NULL && lock->l_resource->lr_lvb_len > 0) {
-                void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
-                                           lock->l_resource->lr_lvb_len);
-                memcpy(lvb, lock->l_resource->lr_lvb_data,
-                       lock->l_resource->lr_lvb_len);
-        }
         req->rq_status = err;
+        if (req->rq_reply_state == NULL) {
+                err = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc == 0)
+                        rc = err;
+        }
 
         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
         if (lock) {
                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
-                           "(err=%d)", err);
+                           "(err=%d, rc=%d)", err, rc);
+
+                if (lock->l_resource->lr_lvb_len > 0) {
+                        void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
+                                                  lock->l_resource->lr_lvb_len);
+                        memcpy(lvb, lock->l_resource->lr_lvb_data,
+                               lock->l_resource->lr_lvb_len);
+                }
+
                 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
                         ldlm_reprocess_all(lock->l_resource);
                 LDLM_LOCK_PUT(lock);
         }
-        LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
+        LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
+                          lock, rc);
 
-        return 0;
+        return rc;
 }
 
 int ldlm_handle_convert(struct ptlrpc_request *req)
@@ -754,7 +770,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
                                 (res, NULL, 0);
                                 //(res, req->rq_reqmsg, 1);
                 }
-                        
+
                 ldlm_lock_cancel(lock);
                 if (ldlm_del_waiting_lock(lock))
                         CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
@@ -902,17 +918,19 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
 {
         req->rq_status = rc;
-        rc = lustre_pack_reply(req, 0, NULL, NULL);
-        if (rc)
-                return rc;
+        if (req->rq_reply_state == NULL) {
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc)
+                        return rc;
+        }
         return ptlrpc_reply(req);
 }
 
 #ifdef __KERNEL__
-static int ldlm_bl_to_thread(struct ldlm_state *ldlm, struct ldlm_namespace *ns,
-                             struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
+                      struct ldlm_lock *lock)
 {
-        struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool;
+        struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
         struct ldlm_bl_work_item *blwi;
         ENTRY;
 
@@ -921,7 +939,8 @@ static int ldlm_bl_to_thread(struct ldlm_state *ldlm, struct ldlm_namespace *ns,
                 RETURN(-ENOMEM);
 
         blwi->blwi_ns = ns;
-        blwi->blwi_ld = *ld;
+        if (ld != NULL)
+                blwi->blwi_ld = *ld;
         blwi->blwi_lock = lock;
 
         spin_lock(&blp->blp_lock);
@@ -969,58 +988,48 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
-        if (req->rq_reqmsg->opc == LDLM_BL_CALLBACK) {
+        LASSERT(req->rq_export != NULL);
+        LASSERT(req->rq_export->exp_obd != NULL);
+
+        switch(req->rq_reqmsg->opc) {
+        case LDLM_BL_CALLBACK:
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
-        } else if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK) {
+                break;
+        case LDLM_CP_CALLBACK:
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
-        } else if (req->rq_reqmsg->opc == LDLM_GL_CALLBACK) {
+                break;
+        case LDLM_GL_CALLBACK:
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
-        } else if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
+                break;
+        case OBD_LOG_CANCEL:
                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
-        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-        } else if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) {
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
-        } else {
-                ldlm_callback_reply(req, -EPROTO);
-                RETURN(0);
-        }
-
-        LASSERT(req->rq_export != NULL);
-        LASSERT(req->rq_export->exp_obd != NULL);
-
-        /* FIXME - how to send reply */
-        if (req->rq_reqmsg->opc == OBD_LOG_CANCEL) {
-                int rc = llog_origin_handle_cancel(req);
+                rc = llog_origin_handle_cancel(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
-        }
-        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CREATE) {
-                int rc = llog_origin_handle_create(req);
-                req->rq_status = rc;
-                ptlrpc_reply(req);
+        case LLOG_ORIGIN_HANDLE_CREATE:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_create(req);
+                ldlm_callback_reply(req, rc);
                 RETURN(0);
-        }
-        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_NEXT_BLOCK) {
-                int rc = llog_origin_handle_next_block(req);
-                req->rq_status = rc;
-                ptlrpc_reply(req);
+        case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_next_block(req);
+                ldlm_callback_reply(req, rc);
                 RETURN(0);
-        }
-        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_READ_HEADER) {
-                int rc = llog_origin_handle_read_header(req);
-                req->rq_status = rc;
-                ptlrpc_reply(req);
+        case LLOG_ORIGIN_HANDLE_READ_HEADER:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_read_header(req);
+                ldlm_callback_reply(req, rc);
                 RETURN(0);
-        }
-        if (req->rq_reqmsg->opc == LLOG_ORIGIN_HANDLE_CLOSE) {
-                int rc = llog_origin_handle_close(req);
+        case LLOG_ORIGIN_HANDLE_CLOSE:
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                rc = llog_origin_handle_close(req);
                 ldlm_callback_reply(req, rc);
                 RETURN(0);
+        default:
+                CERROR("unknown opcode %u\n", req->rq_reqmsg->opc);
+                ldlm_callback_reply(req, -EPROTO);
+                RETURN(0);
         }
 
         ns = req->rq_export->exp_obd->obd_namespace;
@@ -1053,14 +1062,12 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
          * cancelling right now, because it's unused, or have an intent result
          * in the reply, so we might have to push the responsibility for sending
          * the reply down into the AST handlers, alas. */
-        if (req->rq_reqmsg->opc == LDLM_CP_CALLBACK)
-                ldlm_callback_reply(req, 0);
 
         switch (req->rq_reqmsg->opc) {
         case LDLM_BL_CALLBACK:
                 CDEBUG(D_INODE, "blocking ast\n");
 #ifdef __KERNEL__
-                rc = ldlm_bl_to_thread(ldlm, ns, &dlm_req->lock_desc, lock);
+                rc = ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock);
                 ldlm_callback_reply(req, rc);
 #else
                 rc = 0;
@@ -1070,6 +1077,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 break;
         case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "completion ast\n");
+                ldlm_callback_reply(req, 0);
                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
                 break;
         case LDLM_GL_CALLBACK:
@@ -1235,11 +1243,11 @@ static int ldlm_setup(void)
 #endif
         ENTRY;
 
-        if (ldlm != NULL)
+        if (ldlm_state != NULL)
                 RETURN(-EALREADY);
 
-        OBD_ALLOC(ldlm, sizeof(*ldlm));
-        if (ldlm == NULL)
+        OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
+        if (ldlm_state == NULL)
                 RETURN(-ENOMEM);
 
 #ifdef __KERNEL__
@@ -1248,25 +1256,25 @@ static int ldlm_setup(void)
                 GOTO(out_free, rc);
 #endif
 
-        ldlm->ldlm_cb_service =
+        ldlm_state->ldlm_cb_service =
                 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
                                 LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                                 ldlm_callback_handler, "ldlm_cbd",
                                 ldlm_svc_proc_dir);
 
-        if (!ldlm->ldlm_cb_service) {
+        if (!ldlm_state->ldlm_cb_service) {
                 CERROR("failed to start service\n");
                 GOTO(out_proc, rc = -ENOMEM);
         }
 
-        ldlm->ldlm_cancel_service =
-                ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE, 
+        ldlm_state->ldlm_cancel_service =
+                ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
                                 LDLM_CANCEL_REQUEST_PORTAL,
                                 LDLM_CANCEL_REPLY_PORTAL,
                                 ldlm_cancel_handler, "ldlm_canceld",
                                 ldlm_svc_proc_dir);
 
-        if (!ldlm->ldlm_cancel_service) {
+        if (!ldlm_state->ldlm_cancel_service) {
                 CERROR("failed to start service\n");
                 GOTO(out_proc, rc = -ENOMEM);
         }
@@ -1274,7 +1282,7 @@ static int ldlm_setup(void)
         OBD_ALLOC(blp, sizeof(*blp));
         if (blp == NULL)
                 GOTO(out_proc, rc = -ENOMEM);
-        ldlm->ldlm_bl_pool = blp;
+        ldlm_state->ldlm_bl_pool = blp;
 
         atomic_set(&blp->blp_num_threads, 0);
         init_waitqueue_head(&blp->blp_waitq);
@@ -1298,14 +1306,14 @@ static int ldlm_setup(void)
                 wait_for_completion(&blp->blp_comp);
         }
 
-        rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cancel_service,
+        rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service,
                                     LDLM_NUM_THREADS, "ldlm_cn");
         if (rc) {
                 LBUG();
                 GOTO(out_thread, rc);
         }
 
-        rc = ptlrpc_start_n_threads(NULL, ldlm->ldlm_cb_service,
+        rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service,
                                     LDLM_NUM_THREADS, "ldlm_cb");
         if (rc) {
                 LBUG();
@@ -1337,8 +1345,8 @@ static int ldlm_setup(void)
 
 #ifdef __KERNEL__
  out_thread:
-        ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cb_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
 #endif
 
  out_proc:
@@ -1346,15 +1354,15 @@ static int ldlm_setup(void)
         ldlm_proc_cleanup();
  out_free:
 #endif
-        OBD_FREE(ldlm, sizeof(*ldlm));
-        ldlm = NULL;
+        OBD_FREE(ldlm_state, sizeof(*ldlm_state));
+        ldlm_state = NULL;
         return rc;
 }
 
 static int ldlm_cleanup(int force)
 {
 #ifdef __KERNEL__
-        struct ldlm_bl_pool *blp = ldlm->ldlm_bl_pool;
+        struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
 #endif
         ENTRY;
 
@@ -1379,10 +1387,10 @@ static int ldlm_cleanup(int force)
         }
         OBD_FREE(blp, sizeof(*blp));
 
-        ptlrpc_stop_all_threads(ldlm->ldlm_cb_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cb_service);
-        ptlrpc_stop_all_threads(ldlm->ldlm_cancel_service);
-        ptlrpc_unregister_service(ldlm->ldlm_cancel_service);
+        ptlrpc_stop_all_threads(ldlm_state->ldlm_cb_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
+        ptlrpc_stop_all_threads(ldlm_state->ldlm_cancel_service);
+        ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
         ldlm_proc_cleanup();
 
         expired_lock_thread.elt_state = ELT_TERMINATE;
@@ -1392,8 +1400,8 @@ static int ldlm_cleanup(int force)
 
 #endif
 
-        OBD_FREE(ldlm, sizeof(*ldlm));
-        ldlm = NULL;
+        OBD_FREE(ldlm_state, sizeof(*ldlm_state));
+        ldlm_state = NULL;
 
         RETURN(0);
 }
@@ -1494,6 +1502,8 @@ EXPORT_SYMBOL(ldlm_regression_stop);
 EXPORT_SYMBOL(ldlm_namespace_new);
 EXPORT_SYMBOL(ldlm_namespace_cleanup);
 EXPORT_SYMBOL(ldlm_namespace_free);
+EXPORT_SYMBOL(ldlm_namespace_dump);
+EXPORT_SYMBOL(ldlm_dump_all_namespaces);
 EXPORT_SYMBOL(ldlm_resource_get);
 EXPORT_SYMBOL(ldlm_resource_putref);
 
index 9b2af34..9a693e3 100644 (file)
@@ -43,6 +43,8 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req,
         int compat = 1;
         ENTRY;
 
+        lockmode_verify(req_mode);
+
         list_for_each(tmp, queue) {
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
 
index f6045f8..01e4562 100644 (file)
@@ -168,6 +168,9 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         lock->l_lvb_swabber = lvb_swabber;
         if (policy != NULL)
                 memcpy(&lock->l_policy_data, policy, sizeof(*policy));
+        if (type == LDLM_EXTENT)
+                memcpy(&lock->l_req_extent, &policy->l_extent,
+                       sizeof(policy->l_extent));
 
         err = ldlm_lock_enqueue(ns, &lock, policy, flags);
         if (err != ELDLM_OK)
@@ -255,6 +258,9 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                 lock->l_lvb_swabber = lvb_swabber;
                 if (policy != NULL)
                         memcpy(&lock->l_policy_data, policy, sizeof(*policy));
+                if (type == LDLM_EXTENT)
+                        memcpy(&lock->l_req_extent, &policy->l_extent,
+                               sizeof(policy->l_extent));
                 LDLM_DEBUG(lock, "client-side enqueue START");
         }
 
@@ -374,7 +380,9 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                 LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
         }
 
-        if (lvb_len) {
+        /* If the lock has already been granted by a completion AST, don't
+         * clobber the LVB with an older one. */
+        if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {
                 void *tmplvb;
                 tmplvb = lustre_swab_repbuf(req, 1, lvb_len, lvb_swabber);
                 if (tmplvb == NULL)
@@ -581,9 +589,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
 
 int ldlm_cancel_lru(struct ldlm_namespace *ns)
 {
-        struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
+        struct list_head *tmp, *next;
         int count, rc = 0;
-        struct ldlm_ast_work *w;
         ENTRY;
 
         l_lock(&ns->ns_lock);
@@ -607,33 +614,14 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns)
                  * won't see this flag and call l_blocking_ast */
                 lock->l_flags |= LDLM_FL_CBPENDING;
 
-                OBD_ALLOC(w, sizeof(*w));
-                LASSERT(w);
-
-                w->w_lock = LDLM_LOCK_GET(lock);
-                list_add(&w->w_list, &list);
+                LDLM_LOCK_GET(lock); /* dropped by bl thread */
                 ldlm_lock_remove_from_lru(lock);
+                ldlm_bl_to_thread(ns, NULL, lock);
 
                 if (--count == 0)
                         break;
         }
         l_unlock(&ns->ns_lock);
-
-        list_for_each_safe(tmp, next, &list) {
-                struct lustre_handle lockh;
-                int rc;
-                w = list_entry(tmp, struct ldlm_ast_work, w_list);
-
-                ldlm_lock2handle(w->w_lock, &lockh);
-                rc = ldlm_cli_cancel(&lockh);
-                if (rc != ELDLM_OK)
-                        CDEBUG(D_INFO, "ldlm_cli_cancel: %d\n", rc);
-
-                list_del(&w->w_list);
-                LDLM_LOCK_PUT(w->w_lock);
-                OBD_FREE(w, sizeof(*w));
-        }
-
         RETURN(rc);
 }
 
@@ -913,7 +901,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         struct ptlrpc_request *req;
         struct ldlm_request *body;
         struct ldlm_reply *reply;
-        int size;
+        int buffers = 1;
+        int size[2];
         int flags;
 
         /*
@@ -939,8 +928,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         else
                 flags = LDLM_FL_REPLAY;
 
-        size = sizeof(*body);
-        req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, &size, NULL);
+        size[0] = sizeof(*body);
+        req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
@@ -952,8 +941,12 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         body->lock_flags = flags;
 
         ldlm_lock2handle(lock, &body->lock_handle1);
-        size = sizeof(*reply);
-        req->rq_replen = lustre_msg_size(1, &size);
+        size[0] = sizeof(*reply);
+        if (lock->l_lvb_len != 0) {
+                buffers = 2;
+                size[1] = lock->l_lvb_len;
+        }
+        req->rq_replen = lustre_msg_size(buffers, size);
 
         LDLM_DEBUG(lock, "replaying lock:");
 
index 52cebf1..80545d0 100644 (file)
@@ -495,7 +495,6 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
 {
         struct list_head *bucket, *tmp;
         struct ldlm_resource *res = NULL;
-        int rc;
         ENTRY;
 
         LASSERT(ns != NULL);
@@ -523,12 +522,10 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
         l_unlock(&ns->ns_lock);
 
         if (create && ns->ns_lvbo && ns->ns_lvbo->lvbo_init) {
-                rc = ns->ns_lvbo->lvbo_init(res);
-                if (rc) {
-                        CERROR("lvbo_init failure %d\n", rc);
-                        LASSERT(ldlm_resource_putref(res) == 1);
-                        res = NULL;
-                }
+                int rc = ns->ns_lvbo->lvbo_init(res);
+                if (rc)
+                        CERROR("lvbo_init failed for resource "LPU64": rc %d\n",
+                               name.name[0], rc);
         }
 
         RETURN(res);
index d36389b..6fe7431 100644 (file)
@@ -313,17 +313,18 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
         size_lock.end = OBD_OBJECT_EOF;
 
         /* XXX I bet we should be checking the lock ignore flags.. */
+        /* FIXME use LDLM_FL_TEST_LOCK instead */
         flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
         matched = obd_match(exp, lsm, LDLM_EXTENT, &size_lock,
                             sizeof(size_lock), LCK_PR, &flags, inode,
                             &match_lockh);
 
-        /* hey, alright, we hold a size lock that covers the size we 
+        /* hey, alright, we hold a size lock that covers the size we
          * just found, its not going to change for a while.. */
         if (matched == 1) {
                 set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &lli->lli_flags);
                 obd_cancel(exp, lsm, LCK_PR, &match_lockh);
-        } 
+        }
 
         RETURN(0);
 }
index 392e22a..8321956 100644 (file)
@@ -355,6 +355,7 @@ static int llu_have_md_lock(struct inode *inode)
 
         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
 
+        /* FIXME use LDLM_FL_TEST_LOCK instead */
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
                             NULL, LCK_PR, &lockh)) {
index 7733155..3f945a7 100644 (file)
@@ -59,62 +59,20 @@ typedef struct ext2_dir_entry_2 ext2_dirent;
 static int ll_dir_readpage(struct file *file, struct page *page)
 {
         struct inode *inode = page->mapping->host;
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ll_fid mdc_fid;
         __u64 offset;
-        int rc = 0;
         struct ptlrpc_request *request;
-        struct lustre_handle lockh;
         struct mds_body *body;
-        struct lookup_intent it = { .it_op = IT_READDIR };
-        struct mdc_op_data data;
-        struct obd_device *obddev = class_exp2obd(sbi->ll_mdc_exp);
-        struct ldlm_res_id res_id =
-                { .name = {inode->i_ino, (__u64)inode->i_generation} };
+        int rc = 0;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
                inode->i_generation, inode);
-        if ((inode->i_size + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT <= page->index){
-                /* XXX why do we need this exactly, and why do we think that
-                 *     an all-zero directory page is useful?
-                 */
-                CERROR("memsetting dir page %lu to zero (size %lld)\n",
-                       page->index, inode->i_size);
-                memset(kmap(page), 0, PAGE_CACHE_SIZE);
-                kunmap(page);
-                GOTO(readpage_out, rc);
-        }
-
-        rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
-                             &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh);
-        if (!rc) {
-                ll_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0);
-
-                rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, &it, LCK_PR,
-                                 &data, &lockh, NULL, 0,
-                                 ldlm_completion_ast, ll_mdc_blocking_ast,
-                                 inode);
-                request = (struct ptlrpc_request *)it.d.lustre.it_data;
-                if (request)
-                        ptlrpc_req_finished(request);
-                if (rc < 0) {
-                        CERROR("lock enqueue: err: %d\n", rc);
-                        unlock_page(page);
-                        RETURN(rc);
-                }
-        }
-        ldlm_lock_dump_handle(D_OTHER, &lockh);
-
-        if (PageUptodate(page)) {
-                CERROR("Explain this please?\n");
-                GOTO(readpage_out, rc);
-        }
 
         mdc_pack_fid(&mdc_fid, inode->i_ino, inode->i_generation, S_IFDIR);
 
         offset = page->index << PAGE_SHIFT;
-        rc = mdc_readpage(sbi->ll_mdc_exp, &mdc_fid,
+        rc = mdc_readpage(ll_i2sbi(inode)->ll_mdc_exp, &mdc_fid,
                           offset, page, &request);
         if (!rc) {
                 body = lustre_msg_buf(request->rq_repmsg, 0, sizeof (*body));
@@ -122,16 +80,12 @@ static int ll_dir_readpage(struct file *file, struct page *page)
                 LASSERT_REPSWABBED (request, 0); /* swabbed by mdc_readpage() */
 
                 inode->i_size = body->size;
+                SetPageUptodate(page);
         }
         ptlrpc_req_finished(request);
-        EXIT;
-
- readpage_out:
-        if (!rc)
-                SetPageUptodate(page);
 
         unlock_page(page);
-        ldlm_lock_decref(&lockh, LCK_PR);
+        EXIT;
         return rc;
 }
 
@@ -252,9 +206,39 @@ fail:
 
 static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
 {
+        struct ldlm_res_id res_id =
+                { .name = { dir->i_ino, (__u64)dir->i_generation} };
+        struct lustre_handle lockh;
+        struct obd_device *obddev = class_exp2obd(ll_i2sbi(dir)->ll_mdc_exp);
         struct address_space *mapping = dir->i_mapping;
-        struct page *page = read_cache_page(mapping, n,
-                                (filler_t*)mapping->a_ops->readpage, NULL);
+        struct page *page;
+        int rc;
+
+        rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
+                             &res_id, LDLM_PLAIN, NULL, LCK_PR, &lockh);
+        if (!rc) {
+                struct lookup_intent it = { .it_op = IT_READDIR };
+                struct ptlrpc_request *request;
+                struct mdc_op_data data;
+
+                ll_prepare_mdc_op_data(&data, dir, NULL, NULL, 0, 0);
+
+                rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_PLAIN, &it,
+                                 LCK_PR, &data, &lockh, NULL, 0,
+                                 ldlm_completion_ast, ll_mdc_blocking_ast, dir);
+
+                request = (struct ptlrpc_request *)it.d.lustre.it_data;
+                if (request)
+                        ptlrpc_req_finished(request);
+                if (rc < 0) {
+                        CERROR("lock enqueue: rc: %d\n", rc);
+                        return ERR_PTR(rc);
+                }
+        }
+        ldlm_lock_dump_handle(D_OTHER, &lockh);
+
+        page = read_cache_page(mapping, n,
+                               (filler_t*)mapping->a_ops->readpage, NULL);
         if (!IS_ERR(page)) {
                 wait_on_page(page);
                 (void)kmap(page);
@@ -265,14 +249,17 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n)
                 if (PageError(page))
                         goto fail;
         }
+
+out_unlock:
+        ldlm_lock_decref(&lockh, LCK_PR);
         return page;
 
 fail:
         ext2_put_page(page);
-        return ERR_PTR(-EIO);
+        page = ERR_PTR(-EIO);
+        goto out_unlock;
 }
 
-
 /*
  * p is at least 6 bytes before the end of page
  */
@@ -305,8 +292,8 @@ static unsigned char ext2_filetype_table[EXT2_FT_MAX] = {
 
 int ll_readdir(struct file * filp, void * dirent, filldir_t filldir)
 {
-        loff_t pos = filp->f_pos;
         struct inode *inode = filp->f_dentry->d_inode;
+        loff_t pos = filp->f_pos;
         // XXX struct super_block *sb = inode->i_sb;
         unsigned offset = pos & ~PAGE_CACHE_MASK;
         unsigned long n = pos >> PAGE_CACHE_SHIFT;
@@ -314,12 +301,14 @@ int ll_readdir(struct file * filp, void * dirent, filldir_t filldir)
         unsigned chunk_mask = ~(ext2_chunk_size(inode)-1);
         unsigned char *types = NULL;
         int need_revalidate = (filp->f_version != inode->i_version);
+        int rc = 0;
         ENTRY;
 
-        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
-               inode->i_generation, inode);
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p) pos %llu/%llu\n",
+               inode->i_ino, inode->i_generation, inode, pos, inode->i_size);
+
         if (pos > inode->i_size - EXT2_DIR_REC_LEN(1))
-                GOTO(done, 0);
+                RETURN(0);
 
         types = ext2_filetype_table;
 
@@ -328,15 +317,21 @@ int ll_readdir(struct file * filp, void * dirent, filldir_t filldir)
                 ext2_dirent *de;
                 struct page *page;
 
-                CDEBUG(D_EXT2, "reading %lu of dir %lu page %lu, size %llu\n",
-                       PAGE_CACHE_SIZE, inode->i_ino, n, inode->i_size);
+                CDEBUG(D_EXT2,"read %lu of dir %lu/%u page %lu/%lu size %llu\n",
+                       PAGE_CACHE_SIZE, inode->i_ino, inode->i_generation,
+                       n, npages, inode->i_size);
                 page = ll_get_dir_page(inode, n);
 
                 /* size might have been updated by mdc_readpage */
                 npages = dir_pages(inode);
 
-                if (IS_ERR(page))
+                if (IS_ERR(page)) {
+                        rc = PTR_ERR(page);
+                        CERROR("error reading dir %lu/%u page %lu: rc %d\n",
+                               inode->i_ino, inode->i_generation, n, rc);
                         continue;
+                }
+
                 kaddr = page_address(page);
                 if (need_revalidate) {
                         offset = ext2_validate_entry(kaddr, offset, chunk_mask);
@@ -349,6 +344,7 @@ int ll_readdir(struct file * filp, void * dirent, filldir_t filldir)
                                 int over;
                                 unsigned char d_type = DT_UNKNOWN;
 
+                                rc = 0; /* no error if we return something */
                                 if (types && de->file_type < EXT2_FT_MAX)
                                         d_type = types[de->file_type];
 
@@ -358,7 +354,7 @@ int ll_readdir(struct file * filp, void * dirent, filldir_t filldir)
                                                le32_to_cpu(de->inode), d_type);
                                 if (over) {
                                         ext2_put_page(page);
-                                        GOTO(done,0);
+                                        GOTO(done, rc);
                                 }
                         }
                 }
@@ -369,7 +365,7 @@ done:
         filp->f_pos = (n << PAGE_CACHE_SHIFT) | offset;
         filp->f_version = inode->i_version;
         update_atime(inode);
-        RETURN(0);
+        RETURN(rc);
 }
 
 static int ll_dir_ioctl(struct inode *inode, struct file *file,
index bac31cb..2cbc22e 100644 (file)
@@ -318,64 +318,68 @@ static int ll_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
  *
  * No one can dirty the extent until we've finished our work and they can
  * enqueue another lock.  The DLM protects us from ll_file_read/write here,
- * but other kernel actors could have pages locked. */
+ * but other kernel actors could have pages locked.
+ *
+ * Called with the DLM lock held. */
 void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
                               struct ldlm_lock *lock, __u32 stripe)
 {
-        struct ldlm_extent *extent = &lock->l_policy_data.l_extent;
+        ldlm_policy_data_t tmpex;
         unsigned long start, end, count, skip, i, j;
         struct page *page;
-        int rc, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
+        int rc, rc2, discard = lock->l_flags & LDLM_FL_DISCARD_DATA;
+        struct lustre_handle lockh;
         ENTRY;
 
-        CDEBUG(D_INODE, "obdo %lu inode %p ["LPU64"->"LPU64"] size: %llu\n",
-               inode->i_ino, inode, extent->start, extent->end, inode->i_size);
+        memcpy(&tmpex, &lock->l_policy_data, sizeof(tmpex));
+        CDEBUG(D_INODE|D_PAGE, "inode %lu(%p) ["LPU64"->"LPU64"] size: %llu\n",
+               inode->i_ino, inode, tmpex.l_extent.start, tmpex.l_extent.end,
+               inode->i_size);
 
         /* our locks are page granular thanks to osc_enqueue, we invalidate the
          * whole page. */
-        LASSERT((extent->start & ~PAGE_CACHE_MASK) == 0);
-        LASSERT(((extent->end+1) & ~PAGE_CACHE_MASK) == 0);
+        LASSERT((tmpex.l_extent.start & ~PAGE_CACHE_MASK) == 0);
+        LASSERT(((tmpex.l_extent.end + 1) & ~PAGE_CACHE_MASK) == 0);
 
-        start = extent->start >> PAGE_CACHE_SHIFT;
         count = ~0;
         skip = 0;
-        end = (extent->end >> PAGE_CACHE_SHIFT) + 1;
-        if ((end << PAGE_CACHE_SHIFT) < extent->end)
-                end = ~0;
+        start = tmpex.l_extent.start >> PAGE_CACHE_SHIFT;
+        end = tmpex.l_extent.end >> PAGE_CACHE_SHIFT;
         if (lsm->lsm_stripe_count > 1) {
                 count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
                 skip = (lsm->lsm_stripe_count - 1) * count;
-                start += (start/count * skip) + (stripe * count);
+                start += start/count * skip + stripe * count;
                 if (end != ~0)
-                        end += (end/count * skip) + (stripe * count);
-        } 
+                        end += end/count * skip + stripe * count;
+        }
+        if (end < tmpex.l_extent.end >> PAGE_CACHE_SHIFT)
+                end = ~0;
 
         i = (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT;
         if (i < end)
                 end = i;
 
-        CDEBUG(D_INODE, "walking page indices start: %lu j: %lu count: %lu "
-               "skip: %lu end: %lu%s\n", start, start % count, count, skip, end,
-               discard ? " (DISCARDING)" : "");
+        CDEBUG(D_INODE|D_PAGE, "walking page indices start: %lu j: %lu "
+               "count: %lu skip: %lu end: %lu%s\n", start, start % count,
+               count, skip, end, discard ? " (DISCARDING)" : "");
 
         /* this is the simplistic implementation of page eviction at
          * cancelation.  It is careful to get races with other page
          * lockers handled correctly.  fixes from bug 20 will make it
          * more efficient by associating locks with pages and with
          * batching writeback under the lock explicitly. */
-        for (i = start, j = start % count ; ; j++, i++) {
-                if (j == count) {
-                        i += skip;
-                        j = 0;
-                }
-                if (i >= end)
-                        break;
+        for (i = start, j = start % count ; i <= end;
+             j++, i++, tmpex.l_extent.start += PAGE_CACHE_SIZE) {
+                LASSERTF(tmpex.l_extent.start< lock->l_policy_data.l_extent.end,
+                         LPU64" >= "LPU64" start %lu i %lu end %lu\n",
+                         tmpex.l_extent.start, lock->l_policy_data.l_extent.end,
+                         start, i, end);
 
                 ll_pgcache_lock(inode->i_mapping);
                 if (list_empty(&inode->i_mapping->dirty_pages) &&
                     list_empty(&inode->i_mapping->clean_pages) &&
                     list_empty(&inode->i_mapping->locked_pages)) {
-                        CDEBUG(D_INODE, "nothing left\n");
+                        CDEBUG(D_INODE|D_PAGE, "nothing left\n");
                         ll_pgcache_unlock(inode->i_mapping);
                         break;
                 }
@@ -385,14 +389,14 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
 
                 page = find_get_page(inode->i_mapping, i);
                 if (page == NULL)
-                        continue;
-                LL_CDEBUG_PAGE(page, "locking page\n");
+                        goto next_index;
+                LL_CDEBUG_PAGE(D_PAGE, page, "locking page\n");
                 lock_page(page);
 
                 /* page->mapping to check with racing against teardown */
                 if (page->mapping && PageDirty(page) && !discard) {
                         ClearPageDirty(page);
-                        LL_CDEBUG_PAGE(page, "found dirty\n");
+                        LL_CDEBUG_PAGE(D_PAGE, page, "found dirty\n");
                         ll_pgcache_lock(inode->i_mapping);
                         list_del(&page->list);
                         list_add(&page->list, &inode->i_mapping->locked_pages);
@@ -407,13 +411,26 @@ void ll_pgcache_remove_extent(struct inode *inode, struct lov_stripe_md *lsm,
                         lock_page(page);
                 }
 
-                /* checking again to account for writeback's lock_page() */
-                if (page->mapping != NULL) {
-                        LL_CDEBUG_PAGE(page, "truncating\n");
+                tmpex.l_extent.end = tmpex.l_extent.start + PAGE_CACHE_SIZE - 1;
+                /* check to see if another DLM lock covers this page */
+                rc2 = ldlm_lock_match(lock->l_resource->lr_namespace,
+                                      LDLM_FL_BLOCK_GRANTED|LDLM_FL_CBPENDING |
+                                      LDLM_FL_TEST_LOCK,
+                                      &lock->l_resource->lr_name, LDLM_EXTENT,
+                                      &tmpex, LCK_PR | LCK_PW, &lockh);
+                if (rc2 == 0 && page->mapping != NULL) {
+                        // checking again to account for writeback's lock_page()
+                        LL_CDEBUG_PAGE(D_PAGE, page, "truncating\n");
                         ll_truncate_complete_page(page);
                 }
                 unlock_page(page);
                 page_cache_release(page);
+
+        next_index:
+                if (j == count) {
+                        i += skip;
+                        j = 0;
+                }
         }
         EXIT;
 }
@@ -439,12 +456,17 @@ static int ll_extent_lock_callback(struct ldlm_lock *lock,
                         CERROR("ldlm_cli_cancel failed: %d\n", rc);
                 break;
         case LDLM_CB_CANCELING: {
-                struct inode *inode = ll_inode_from_lock(lock);
+                struct inode *inode;
                 struct ll_inode_info *lli;
                 struct lov_stripe_md *lsm;
                 __u32 stripe;
                 __u64 kms;
 
+                /* This lock wasn't granted, don't try to evict pages */
+                if (lock->l_req_mode != lock->l_granted_mode)
+                        RETURN(0);
+
+                inode = ll_inode_from_lock(lock);
                 if (inode == NULL)
                         RETURN(0);
                 lli = ll_i2info(inode);
@@ -569,7 +591,7 @@ static int ll_glimpse_callback(struct ldlm_lock *lock, void *reqp)
                 LBUG();
         }
 
-        LDLM_DEBUG(lock, "i_size: %Lu -> stripe number %d -> size %Lu",
+        LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> size "LPU64,
                    inode->i_size, data.stripe_number, data.size);
 
         rc = lustre_pack_reply(req, 1, &size, NULL);
@@ -607,8 +629,10 @@ int ll_glimpse_size(struct inode *inode, struct ost_lvb *lvb)
                          LCK_PR, &flags, ll_extent_lock_callback,
                          ldlm_completion_ast, ll_glimpse_callback, inode,
                          sizeof(*lvb), lustre_swab_ost_lvb, &lockh);
-        if (rc > 0)
+        if (rc > 0) {
+                CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);
                 RETURN(-EIO);
+        }
 
         lvb->lvb_size = lov_merge_size(lli->lli_smd, 0);
         //inode->i_mtime = lov_merge_mtime(lli->lli_smd, inode->i_mtime);
@@ -720,8 +744,8 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
                 inode->i_size = kms;
         }
 
-        CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld, i_size "
-               LPU64"\n", inode->i_ino, count, *ppos, inode->i_size);
+        CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
+               inode->i_ino, count, *ppos, inode->i_size);
 
         /* turn off the kernel's read-ahead */
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
@@ -1224,6 +1248,7 @@ static int ll_have_md_lock(struct dentry *de)
 
         CDEBUG(D_INFO, "trying to match res "LPU64"\n", res_id.name[0]);
 
+        /* FIXME use LDLM_FL_TEST_LOCK instead */
         flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
         if (ldlm_lock_match(obddev->obd_namespace, flags, &res_id, LDLM_PLAIN,
                             NULL, LCK_PR, &lockh)) {
index c0ca902..80cac34 100644 (file)
@@ -110,14 +110,14 @@ struct ll_async_page {
         struct page     *llap_page;
         struct list_head llap_pending_write;
          /* only trust these if the page lock is providing exclusion */
-         int             llap_write_queued:1,
+        int              llap_write_queued:1,
                          llap_defer_uptodate:1;
         struct list_head llap_proc_item;
 };
 
-#define LL_CDEBUG_PAGE(page, STR)                                       \
-        CDEBUG(D_PAGE, "page %p map %p ind %lu priv %0lx: " STR,        \
-               page, page->mapping, page->index, page->private)
+#define LL_CDEBUG_PAGE(mask, page, fmt, arg...)                         \
+        CDEBUG(mask, "page %p map %p ind %lu priv %0lx: " fmt,          \
+               page, page->mapping, page->index, page->private, ## arg)
 
 /* llite/lproc_llite.c */
 int lprocfs_register_mountpoint(struct proc_dir_entry *parent,
@@ -249,6 +249,8 @@ int ll_close_thread_start(struct ll_close_queue **lcq_ret);
 #define LL_SBI_NOLCK            0x1
 #define LL_SBI_READAHEAD        0x2
 
+#define LL_MAX_BLKSIZE          (4UL * 1024 * 1024)
+
 #if  (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
 #define    ll_s2sbi(sb)     ((struct ll_sb_info *)((sb)->s_fs_info))
 void __d_rehash(struct dentry * entry, int lock);
index 36b0250..c17ad63 100644 (file)
@@ -192,6 +192,8 @@ int lustre_common_fill_super(struct super_block *sb, char *mdc, char *osc)
                 GOTO(out_root, err);
         }
 
+        /* bug 2805 - set VM readahead to zero */
+        vm_max_readahead = vm_min_readahead = 0;
         sb->s_root = d_alloc_root(root);
         RETURN(err);
 
@@ -385,6 +387,9 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile,
         int err;
         ENTRY;
 
+        if (lmd_bad_magic(lmd))
+                RETURN(-EINVAL);
+
         generate_random_uuid(uuid);
         class_uuid_unparse(uuid, &mdc_uuid);
 
@@ -510,10 +515,9 @@ int lustre_fill_super(struct super_block *sb, void *data, int silent)
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb);
-        if (lmd == NULL) {
-                CERROR("lustre_mount_data is NULL: check that /sbin/mount.lustre exists?\n");
+        if (lmd_bad_magic(lmd))
                 RETURN(-EINVAL);
-        }
+
         sbi = lustre_init_sbi(sb);
         if (!sbi)
                 RETURN(-ENOMEM);
@@ -1051,6 +1055,9 @@ void ll_update_inode(struct inode *inode, struct mds_body *body,
                                 LBUG();
                         }
                 }
+                /* bug 2844 - limit i_blksize for broken user-space apps */
+                LASSERTF(lsm->lsm_xfersize != 0, "%lu\n", lsm->lsm_xfersize);
+                inode->i_blksize = min(lsm->lsm_xfersize, LL_MAX_BLKSIZE);
                 if (lli->lli_smd != lsm)
                         obd_free_memmd(ll_i2obdexp(inode), &lsm);
         }
index 58c9ed9..162f568 100644 (file)
@@ -25,6 +25,7 @@
 #include <linux/lustre_lite.h>
 #include <linux/lprocfs_status.h>
 #include <linux/seq_file.h>
+#include <linux/obd_support.h>
 
 #include "llite_internal.h"
 
@@ -562,16 +563,16 @@ static int llite_dump_pgcache_seq_open(struct inode *inode, struct file *file)
         struct ll_sb_info *sbi = dp->data;
         int rc;
 
-        llap = kmalloc(sizeof(*llap), GFP_KERNEL);
+        OBD_ALLOC_GFP(llap, sizeof(*llap), GFP_KERNEL);
         if (llap == NULL)
                 return -ENOMEM;
         llap->llap_page = NULL;
         llap->llap_cookie = sbi;
         llap->llap_magic = 0;
+
         rc = seq_open(file, &llite_dump_pgcache_seq_sops);
         if (rc) {
-                kfree(llap);
+                OBD_FREE(llap, sizeof(*llap));
                 return rc;
         }
         seq = file->private_data;
@@ -584,7 +585,7 @@ static int llite_dump_pgcache_seq_open(struct inode *inode, struct file *file)
         return 0;
 }
 
-static int llite_dump_pgcache_seq_release(struct inode *inode, 
+static int llite_dump_pgcache_seq_release(struct inode *inode,
                                           struct file *file)
 {
         struct seq_file *seq = file->private_data;
@@ -595,7 +596,7 @@ static int llite_dump_pgcache_seq_release(struct inode *inode,
         if (!list_empty(&llap->llap_proc_item))
                 list_del_init(&llap->llap_proc_item);
         spin_unlock(&sbi->ll_pglist_lock);
-        kfree(llap);
+        OBD_FREE(llap, sizeof(*llap));
 
         return seq_release(inode, file);
 }
@@ -603,7 +604,7 @@ static int llite_dump_pgcache_seq_release(struct inode *inode,
 struct file_operations llite_dump_pgcache_fops = {
         .open    = llite_dump_pgcache_seq_open,
         .read    = seq_read,
-        .release    = llite_dump_pgcache_seq_release,
+        .release = llite_dump_pgcache_seq_release,
 };
 
 #endif /* LPROCFS */
index c9ee1db..4d6df97 100644 (file)
@@ -250,7 +250,7 @@ static int ll_ap_make_ready(void *data, int cmd)
         if (TryLockPage(page))
                 RETURN(-EAGAIN);
 
-        LL_CDEBUG_PAGE(page, "made ready\n");
+        LL_CDEBUG_PAGE(D_PAGE, page, "made ready\n");
         page_cache_get(page);
 
         /* if we left PageDirty we might get another writepage call
@@ -439,7 +439,7 @@ free_oig:
                         oig_release(oig);
                         GOTO(out, rc);
                 }
-                LL_CDEBUG_PAGE(page, "write queued\n");
+                LL_CDEBUG_PAGE(D_PAGE, page, "write queued\n");
                 //llap_write_pending(inode, llap);
         } else {
                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
@@ -483,7 +483,7 @@ void ll_removepage(struct page *page)
                 return;
         }
 
-        LL_CDEBUG_PAGE(page, "being evicted\n");
+        LL_CDEBUG_PAGE(D_PAGE, page, "being evicted\n");
 
         exp = ll_i2obdexp(inode);
         if (exp == NULL) {
@@ -530,18 +530,14 @@ static int ll_page_matches(struct page *page)
         page_extent.l_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT;
         page_extent.l_extent.end =
                 page_extent.l_extent.start + PAGE_CACHE_SIZE - 1;
-        flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED;
+        flags = LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
         matches = obd_match(ll_i2sbi(inode)->ll_osc_exp,
                             ll_i2info(inode)->lli_smd, LDLM_EXTENT,
-                            &page_extent, LCK_PR, &flags, inode, &match_lockh);
-        if (matches < 0) {
-                LL_CDEBUG_PAGE(page, "lock match failed\n");
-                RETURN(matches);
-        }
-        if (matches) {
-                obd_cancel(ll_i2sbi(inode)->ll_osc_exp,
-                           ll_i2info(inode)->lli_smd, LCK_PR, &match_lockh);
-        }
+                            &page_extent, LCK_PR | LCK_PW, &flags, inode,
+                            &match_lockh);
+        if (matches < 0)
+                LL_CDEBUG_PAGE(D_ERROR, page, "lock match failed: rc %d\n",
+                               matches);
         RETURN(matches);
 }
 
@@ -558,14 +554,15 @@ static int ll_issue_page_read(struct obd_export *exp,
                                 NULL, oig, llap->llap_cookie, OBD_BRW_READ, 0,
                                 PAGE_SIZE, 0, ASYNC_COUNT_STABLE);
         if (rc) {
-                LL_CDEBUG_PAGE(page, "read queueing failed\n");
+                LL_CDEBUG_PAGE(D_ERROR, page, "read queue failed: rc %d\n", rc);
                 page_cache_release(page);
         }
         RETURN(rc);
 }
 
 #define LL_RA_MIN(inode) ((unsigned long)PTL_MD_MAX_PAGES / 2)
-#define LL_RA_MAX(inode) (inode->i_blksize * 3)
+#define LL_RA_MAX(inode) ((ll_i2info(inode)->lli_smd->lsm_xfersize * 3) >> \
+                          PAGE_CACHE_SHIFT)
 
 static void ll_readahead(struct ll_readahead_state *ras,
                          struct obd_export *exp, struct address_space *mapping,
@@ -624,10 +621,10 @@ static void ll_readahead(struct ll_readahead_state *ras,
 
                 rc = ll_issue_page_read(exp, llap, oig, 1);
                 if (rc == 0)
-                        LL_CDEBUG_PAGE(page, "started read-ahead\n");
+                        LL_CDEBUG_PAGE(D_PAGE, page, "started read-ahead\n");
                 if (rc) {
         next_page:
-                        LL_CDEBUG_PAGE(page, "skipping read-ahead\n");
+                        LL_CDEBUG_PAGE(D_PAGE, page, "skipping read-ahead\n");
 
                         unlock_page(page);
                 }
@@ -767,7 +764,7 @@ int ll_readpage(struct file *filp, struct page *page)
                 ll_readahead(&fd->fd_ras, exp, page->mapping, oig);
                 obd_trigger_group_io(exp, ll_i2info(inode)->lli_smd, NULL,
                                      oig);
-                LL_CDEBUG_PAGE(page, "marking uptodate from defer\n");
+                LL_CDEBUG_PAGE(D_PAGE, page, "marking uptodate from defer\n");
                 SetPageUptodate(page);
                 unlock_page(page);
                 GOTO(out_oig, rc = 0);
@@ -781,11 +778,20 @@ int ll_readpage(struct file *filp, struct page *page)
 
         if (rc == 0) {
                 static unsigned long next_print;
-                CDEBUG(D_INODE, "didn't match a lock\n");
+                CDEBUG(D_INODE, "ino %lu page %lu (%llu) didn't match a lock\n",
+                       inode->i_ino, page->index,
+                       (long long)page->index << PAGE_CACHE_SHIFT);
                 if (time_after(jiffies, next_print)) {
+                        CERROR("ino %lu page %lu (%llu) not covered by "
+                               "a lock (mmap?).  check debug logs.\n",
+                               inode->i_ino, page->index,
+                               (long long)page->index << PAGE_CACHE_SHIFT);
+                        ldlm_dump_all_namespaces();
+                        if (next_print == 0) {
+                                CERROR("%s\n", portals_debug_dumpstack());
+                                portals_debug_dumplog();
+                        }
                         next_print = jiffies + 30 * HZ;
-                        CERROR("not covered by a lock (mmap?).  check debug "
-                               "logs.\n");
                 }
         }
 
@@ -793,7 +799,7 @@ int ll_readpage(struct file *filp, struct page *page)
         if (rc)
                 GOTO(out, rc);
 
-        LL_CDEBUG_PAGE(page, "queued readpage\n");
+        LL_CDEBUG_PAGE(D_PAGE, page, "queued readpage\n");
         if ((ll_i2sbi(inode)->ll_flags & LL_SBI_READAHEAD))
                 ll_readahead(&fd->fd_ras, exp, page->mapping, oig);
 
@@ -834,7 +840,7 @@ int ll_sync_page(struct page *page)
         if (IS_ERR(llap))
                 RETURN(PTR_ERR(llap));
 
-        LL_CDEBUG_PAGE(page, "setting ready|urgent\n");
+        LL_CDEBUG_PAGE(D_PAGE, page, "setting ready|urgent\n");
 
         rc = obd_set_async_flags(exp, ll_i2info(page->mapping->host)->lli_smd,
                                  NULL, llap->llap_cookie,
index 23be231..c645abd 100644 (file)
@@ -71,11 +71,11 @@ void ll_ap_completion_24(void *data, int cmd, int rc)
                 } else {
                         llap->llap_write_queued = 0;
                 }
-        } else { 
+        } else {
                 SetPageError(page);
         }
 
-        LL_CDEBUG_PAGE(page, "io complete, unlocking\n");
+        LL_CDEBUG_PAGE(D_PAGE, page, "io complete, unlocking\n");
 
         unlock_page(page);
 
@@ -108,7 +108,7 @@ static int ll_writepage_24(struct page *page)
 
         page_cache_get(page);
         if (llap->llap_write_queued) {
-                LL_CDEBUG_PAGE(page, "marking urgent\n");
+                LL_CDEBUG_PAGE(D_PAGE, page, "marking urgent\n");
                 rc = obd_set_async_flags(exp, ll_i2info(inode)->lli_smd, NULL,
                                          llap->llap_cookie,
                                          ASYNC_READY | ASYNC_URGENT);
@@ -118,7 +118,7 @@ static int ll_writepage_24(struct page *page)
                                         llap->llap_cookie, OBD_BRW_WRITE, 0, 0,
                                         0, ASYNC_READY | ASYNC_URGENT);
                 if (rc == 0)
-                        LL_CDEBUG_PAGE(page, "mmap write queued\n");
+                        LL_CDEBUG_PAGE(D_PAGE, page, "mmap write queued\n");
                 else
                         llap->llap_write_queued = 0;
         }
index 21e884f..640cf05 100644 (file)
@@ -73,11 +73,11 @@ void ll_ap_completion_26(void *data, int cmd, int rc)
                 } else {
                         llap->llap_write_queued = 0;
                 }
-        } else { 
+        } else {
                 SetPageError(page);
         }
 
-        LL_CDEBUG_PAGE(page, "io complete, unlocking\n");
+        LL_CDEBUG_PAGE(D_PAGE, page, "io complete, unlocking\n");
 
         unlock_page(page);
 
@@ -110,7 +110,7 @@ static int ll_writepage_26(struct page *page, struct writeback_control *wbc)
 
         page_cache_get(page);
         if (llap->llap_write_queued) {
-                LL_CDEBUG_PAGE(page, "marking urgent\n");
+                LL_CDEBUG_PAGE(D_PAGE, page, "marking urgent\n");
                 rc = obd_set_async_flags(exp, ll_i2info(inode)->lli_smd, NULL,
                                          llap->llap_cookie,
                                          ASYNC_READY | ASYNC_URGENT);
@@ -120,7 +120,7 @@ static int ll_writepage_26(struct page *page, struct writeback_control *wbc)
                                         llap->llap_cookie, OBD_BRW_WRITE, 0, 0,
                                         0, ASYNC_READY | ASYNC_URGENT);
                 if (rc == 0)
-                        LL_CDEBUG_PAGE(page, "mmap write queued\n");
+                        LL_CDEBUG_PAGE(D_PAGE, page, "mmap write queued\n");
                 else
                         llap->llap_write_queued = 0;
         }
index a565f51..aa04e4e 100644 (file)
@@ -31,7 +31,7 @@ void lov_free_memmd(struct lov_stripe_md **lsmp);
 
 /* lov_log.c */
 int lov_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                  int count, struct llog_logid *logid);
+                  int count, struct llog_catid *logid);
 int lov_llog_finish(struct obd_device *obd, int count);
 
 /* lov_pack.c */
@@ -41,7 +41,7 @@ int lov_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
                  struct lov_mds_md *lmm, int lmm_bytes);
 int lov_setstripe(struct obd_export *exp,
                   struct lov_stripe_md **lsmp, struct lov_user_md *lump);
-int lov_setea(struct obd_export *exp, struct lov_stripe_md **lsmp, 
+int lov_setea(struct obd_export *exp, struct lov_stripe_md **lsmp,
               struct lov_user_md *lump);
 int lov_getstripe(struct obd_export *exp,
                   struct lov_stripe_md *lsm, struct lov_user_md *lump);
index 59dc29e..7809366 100644 (file)
@@ -92,7 +92,8 @@ static int lov_llog_origin_add(struct llog_ctxt *ctxt,
 
 static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count,
                                    struct llog_logid *logid, 
-                                   struct llog_gen *gen)
+                                   struct llog_gen *gen,
+                                   struct obd_uuid *uuid)
 {
         struct obd_device *obd = ctxt->loc_obd;
         struct lov_obd *lov = &obd->u.lov;
@@ -103,7 +104,11 @@ static int lov_llog_origin_connect(struct llog_ctxt *ctxt, int count,
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                 struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd;
                 struct llog_ctxt *cctxt = llog_get_context(child, ctxt->loc_idx);
-                rc = llog_connect(cctxt, 1, logid, gen);
+
+                if (uuid && !obd_uuid_equals(uuid, &lov->tgts[i].uuid))
+                        continue;
+
+                rc = llog_connect(cctxt, 1, logid, gen, uuid);
                 if (rc) {
                         CERROR("error osc_llog_connect %d\n", i);
                         break;
@@ -156,18 +161,18 @@ static struct llog_operations lov_size_repl_logops = {
 
 
 int lov_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                  int count, struct llog_logid *logid)
+                  int count, struct llog_catid *logid)
 {
         struct lov_obd *lov = &obd->u.lov;
         int i, rc = 0;
         ENTRY;
-        
+
         rc = llog_setup(obd, LLOG_UNLINK_ORIG_CTXT, tgt, 0, NULL,
                         &lov_unlink_orig_logops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL, 
+        rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, 0, NULL,
                         &lov_size_repl_logops);
         if (rc)
                 RETURN(rc);
index 7d657f2..92d862f 100644 (file)
@@ -2201,8 +2201,11 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm,
                         break;
         }
         if (rc == 1) {
-                if (lsm->lsm_stripe_count > 1)
+                if (lsm->lsm_stripe_count > 1) {
+                        if (*flags & LDLM_FL_TEST_LOCK)
+                                lov_llh_destroy(lov_lockh);
                         lov_llh_put(lov_lockh);
+                }
                 RETURN(1);
         }
 
@@ -2640,7 +2643,10 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                 int er;
 
-                if (!lov->tgts[i].active)
+                if (val && !obd_uuid_equals(val, &lov->tgts[i].uuid)) 
+                        continue;
+
+                if (!val && !lov->tgts[i].active)
                         continue;
 
                 er = obd_set_info(lov->tgts[i].ltd_exp, keylen, key, vallen,
index 6a4ac6b..1b40327 100644 (file)
@@ -295,6 +295,7 @@ int lov_alloc_memmd(struct lov_stripe_md **lsmp, int stripe_count, int pattern)
         (*lsmp)->lsm_magic = LOV_MAGIC;
         (*lsmp)->lsm_stripe_count = stripe_count;
         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES * stripe_count;
+        (*lsmp)->lsm_xfersize = PTL_MTU * stripe_count;
         (*lsmp)->lsm_pattern = pattern;
         (*lsmp)->lsm_oinfo[0].loi_ost_idx = ~0;
 
@@ -319,6 +320,7 @@ int lov_unpackmd_v0(struct lov_obd *lov, struct lov_stripe_md *lsm,
         lsm->lsm_object_id = le64_to_cpu(lmm->lmm_object_id);
         /* lsm->lsm_object_gr = 0; implicit */
         lsm->lsm_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
+        lsm->lsm_xfersize = lsm->lsm_stripe_size * lsm->lsm_stripe_count;
         lsm->lsm_pattern = LOV_PATTERN_RAID0;
         ost_offset = le32_to_cpu(lmm->lmm_stripe_offset);
         ost_count = le16_to_cpu(lmm->lmm_ost_count);
@@ -356,6 +358,7 @@ int lov_unpackmd_v1(struct lov_obd *lov, struct lov_stripe_md *lsm,
         lsm->lsm_object_gr = le64_to_cpu(lmm->lmm_object_gr);
         lsm->lsm_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
         lsm->lsm_pattern = le32_to_cpu(lmm->lmm_pattern);
+        lsm->lsm_xfersize = lsm->lsm_stripe_size * lsm->lsm_stripe_count;
 
         for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++) {
                 /* XXX LOV STACKING call down to osc_unpackmd() */
@@ -496,6 +499,7 @@ int lov_setstripe(struct obd_export *exp, struct lov_stripe_md **lsmp,
 
         (*lsmp)->lsm_oinfo[0].loi_ost_idx = lum.lmm_stripe_offset;
         (*lsmp)->lsm_stripe_size = lum.lmm_stripe_size;
+        (*lsmp)->lsm_xfersize = lum.lmm_stripe_size * stripe_count;
 
         RETURN(0);
 }
index 6d18d0d..c1a6640 100644 (file)
@@ -27,8 +27,8 @@
 
 #include <linux/lvfs.h>
 
-struct dentry *lvfs_fid2dentry(struct obd_run_ctxt *ctxt, __u64 id, __u32 gen, __u64 gr,
-                               void *data)
+struct dentry *lvfs_fid2dentry(struct obd_run_ctxt *ctxt, __u64 id,
+                               __u32 gen, __u64 gr, void *data)
 {
         return ctxt->cb_ops.l_fid2dentry(id, gen, gr, data);
 }
index 7e34fce..7f381d3 100644 (file)
@@ -179,7 +179,7 @@ void pop_ctxt(struct obd_run_ctxt *saved, struct obd_run_ctxt *new_ctx,
 EXPORT_SYMBOL(pop_ctxt);
 
 /* utility to make a file */
-struct dentry *simple_mknod(struct dentry *dir, char *name, int mode)
+struct dentry *simple_mknod(struct dentry *dir, char *name, int mode, int fix)
 {
         struct dentry *dchild;
         int err = 0;
@@ -198,7 +198,7 @@ struct dentry *simple_mknod(struct dentry *dir, char *name, int mode)
                         GOTO(out_err, err = -EEXIST);
 
                 /* Fixup file permissions if necessary */
-                if ((old_mode & S_IALLUGO) != (mode & S_IALLUGO)) {
+                if (fix && (old_mode & S_IALLUGO) != (mode & S_IALLUGO)) {
                         CWARN("fixing permissions on %s from %o to %o\n",
                               name, old_mode, mode);
                         dchild->d_inode->i_mode = (mode & S_IALLUGO) |
@@ -224,7 +224,7 @@ out_up:
 EXPORT_SYMBOL(simple_mknod);
 
 /* utility to make a directory */
-struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode)
+struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode, int fix)
 {
         struct dentry *dchild;
         int err = 0;
@@ -242,7 +242,7 @@ struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode)
                         GOTO(out_err, err = -ENOTDIR);
 
                 /* Fixup directory permissions if necessary */
-                if ((old_mode & S_IALLUGO) != (mode & S_IALLUGO)) {
+                if (fix && (old_mode & S_IALLUGO) != (mode & S_IALLUGO)) {
                         CWARN("fixing permissions on %s from %o to %o\n",
                               name, old_mode, mode);
                         dchild->d_inode->i_mode = (mode & S_IALLUGO) |
index 51de280..c692def 100644 (file)
@@ -405,13 +405,23 @@ static int mdc_close_interpret(struct ptlrpc_request *req, void *data, int rc)
 {
         union ptlrpc_async_args *aa = data;
         struct mdc_rpc_lock *rpc_lock = aa->pointer_arg[0];
-        
-        mdc_put_rpc_lock(rpc_lock, NULL);
+        struct obd_device *obd = aa->pointer_arg[1];
+
+        if (rpc_lock == NULL) {
+                CERROR("called with NULL rpc_lock\n");
+        } else {
+                mdc_put_rpc_lock(rpc_lock, NULL);
+                LASSERTF(req->rq_async_args.pointer_arg[0] ==
+                         obd->u.cli.cl_rpc_lock, "%p != %p\n",
+                         req->rq_async_args.pointer_arg[0],
+                         obd->u.cli.cl_rpc_lock);
+                aa->pointer_arg[0] = NULL;
+        }
         wake_up(&req->rq_reply_waitq);
         RETURN(rc);
 }
 
-/* We can't use ptlrpc_check_reply, because we don't want to wake up for 
+/* We can't use ptlrpc_check_reply, because we don't want to wake up for
  * anything but a reply or an error. */
 static int mdc_close_check_reply(struct ptlrpc_request *req)
 {
@@ -443,7 +453,6 @@ int mdc_close(struct obd_export *exp, struct obdo *obdo,
         struct ptlrpc_request *req;
         struct mdc_open_data *mod;
         struct l_wait_info lwi;
-        struct mdc_rpc_lock *rpc_lock = obd->u.cli.cl_rpc_lock;
         ENTRY;
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &reqsize,
@@ -478,9 +487,10 @@ int mdc_close(struct obd_export *exp, struct obdo *obdo,
         /* We hand a ref to the rpcd here, so we need another one of our own. */
         ptlrpc_request_addref(req);
 
-        mdc_get_rpc_lock(rpc_lock, NULL);
+        mdc_get_rpc_lock(obd->u.cli.cl_rpc_lock, NULL);
         req->rq_interpret_reply = mdc_close_interpret;
-        req->rq_async_args.pointer_arg[0] = rpc_lock;
+        req->rq_async_args.pointer_arg[0] = obd->u.cli.cl_rpc_lock;
+        req->rq_async_args.pointer_arg[1] = obd;
         ptlrpcd_add_req(req);
         lwi = LWI_TIMEOUT_INTR(MAX(req->rq_timeout * HZ, 1), go_back_to_sleep,
                                NULL, NULL);
@@ -498,6 +508,11 @@ int mdc_close(struct obd_export *exp, struct obdo *obdo,
                                "close succeeded.  Please tell CFS.\n");
                 }
         }
+        if (req->rq_async_args.pointer_arg[0] != NULL) {
+                CERROR("returned without dropping rpc_lock: rc %d\n", rc);
+                mdc_close_interpret(req, &req->rq_async_args, rc);
+                portals_debug_dumplog();
+        }
 
         EXIT;
  out:
@@ -812,6 +827,39 @@ static int mdc_detach(struct obd_device *dev)
         return lprocfs_obd_detach(dev);
 }
 
+static int mdc_import_event(struct obd_device *obd,
+                            struct obd_import *imp, 
+                            enum obd_import_event event)
+{
+        int rc = 0;
+
+        LASSERT(imp->imp_obd == obd);
+
+        switch (event) {
+        case IMP_EVENT_DISCON: {
+                break;
+        }
+        case IMP_EVENT_INVALIDATE: {
+                struct ldlm_namespace *ns = obd->obd_namespace;
+                
+                ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
+
+                if (obd->obd_observer)
+                        rc = obd_notify(obd->obd_observer, obd, 0);
+                break;
+        }
+        case IMP_EVENT_ACTIVE: {
+                if (obd->obd_observer)
+                        rc = obd_notify(obd->obd_observer, obd, 1);
+                break;
+        }
+        default:
+                CERROR("Unknown import event %d\n", event);
+                LBUG();
+        }
+        RETURN(rc);
+}
+
 static int mdc_setup(struct obd_device *obd, obd_count len, void *buf)
 {
         struct client_obd *cli = &obd->u.cli;
@@ -906,7 +954,7 @@ static int mdc_cleanup(struct obd_device *obd, int flags)
 
 
 static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                         int count, struct llog_logid *logid)
+                         int count, struct llog_catid *logid)
 {
         struct llog_ctxt *ctxt;
         int rc;
@@ -916,7 +964,7 @@ static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt,
                         &llog_client_ops);
         if (rc == 0) {
                 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
-                ctxt->loc_imp = obd->u.cli.cl_import; 
+                ctxt->loc_imp = obd->u.cli.cl_import;
         }
 
         RETURN(rc);
@@ -945,6 +993,7 @@ struct obd_ops mdc_obd_ops = {
         o_statfs:      mdc_statfs,
         o_pin:         mdc_pin,
         o_unpin:       mdc_unpin,
+        o_import_event: mdc_import_event,
         o_llog_init:   mdc_llog_init,
         o_llog_finish: mdc_llog_finish,
 };
index fe28761..5a50482 100644 (file)
@@ -355,34 +355,39 @@ static int mds_destroy_export(struct obd_export *export)
         RETURN(rc);
 }
 
-static int mds_disconnect(struct obd_export *export, int flags)
+static int mds_disconnect(struct obd_export *exp, int flags)
 {
         unsigned long irqflags;
         int rc;
         ENTRY;
 
-        ldlm_cancel_locks_for_export(export);
+        LASSERT(exp);
+        class_export_get(exp);
+
+        spin_lock_irqsave(&exp->exp_lock, irqflags);
+        exp->exp_flags = flags;
+        spin_unlock_irqrestore(&exp->exp_lock, irqflags);
+
+        /* Disconnect early so that clients can't keep using export */
+        rc = class_disconnect(exp, flags);
+        ldlm_cancel_locks_for_export(exp);
 
         /* complete all outstanding replies */
-        spin_lock_irqsave (&export->exp_lock, irqflags);
-        while (!list_empty (&export->exp_outstanding_replies)) {
+        spin_lock_irqsave(&exp->exp_lock, irqflags);
+        while (!list_empty(&exp->exp_outstanding_replies)) {
                 struct ptlrpc_reply_state *rs =
-                        list_entry (export->exp_outstanding_replies.next, 
-                                    struct ptlrpc_reply_state, rs_exp_list);
+                        list_entry(exp->exp_outstanding_replies.next,
+                                   struct ptlrpc_reply_state, rs_exp_list);
                 struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
 
-                spin_lock (&svc->srv_lock);
-                list_del_init (&rs->rs_exp_list);
-                ptlrpc_schedule_difficult_reply (rs);
-                spin_unlock (&svc->srv_lock);
+                spin_lock(&svc->srv_lock);
+                list_del_init(&rs->rs_exp_list);
+                ptlrpc_schedule_difficult_reply(rs);
+                spin_unlock(&svc->srv_lock);
         }
-        spin_unlock_irqrestore (&export->exp_lock, irqflags);
+        spin_unlock_irqrestore(&exp->exp_lock, irqflags);
 
-        spin_lock_irqsave(&export->exp_lock, irqflags);
-        export->exp_flags = flags;
-        spin_unlock_irqrestore(&export->exp_lock, irqflags);
-
-        rc = class_disconnect(export, flags);
+        class_export_put(exp);
         RETURN(rc);
 }
 
@@ -1480,7 +1485,8 @@ static int mds_postrecov(struct obd_device *obd)
         LASSERT(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT) != NULL);
 
         rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
-                          obd->u.mds.mds_lov_desc.ld_tgt_count, NULL, NULL);
+                          obd->u.mds.mds_lov_desc.ld_tgt_count,
+                          NULL, NULL, NULL);
         if (rc != 0) {
                 CERROR("faild at llog_origin_connect: %d\n", rc);
         }
index 1dfc246..d3e235a 100644 (file)
@@ -388,7 +388,7 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
 
         /* setup the directory tree */
         push_ctxt(&saved, &obd->obd_ctxt, NULL);
-        dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755);
+        dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755, 0);
         if (IS_ERR(dentry)) {
                 rc = PTR_ERR(dentry);
                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
@@ -410,7 +410,7 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
         }
         mds->mds_fid_de = dentry;
 
-        dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777);
+        dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777, 1);
         if (IS_ERR(dentry)) {
                 rc = PTR_ERR(dentry);
                 CERROR("cannot create PENDING directory: rc = %d\n", rc);
@@ -418,7 +418,7 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
         }
         mds->mds_pending_dir = dentry;
 
-        dentry = simple_mkdir(current->fs->pwd, "LOGS", 0777);
+        dentry = simple_mkdir(current->fs->pwd, "LOGS", 0777, 1);
         if (IS_ERR(dentry)) {
                 rc = PTR_ERR(dentry);
                 CERROR("cannot create LOGS directory: rc = %d\n", rc);
@@ -426,7 +426,7 @@ int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
         }
         mds->mds_logs_dir = dentry;
 
-        dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777);
+        dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
         if (IS_ERR(dentry)) {
                 rc = PTR_ERR(dentry);
                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
index a6bba27..d8eb150 100644 (file)
@@ -51,11 +51,11 @@ int mds_cleanup_orphans(struct obd_device *obd);
 
 
 /* mds/mds_log.c */
-int mds_log_op_unlink(struct obd_device *obd, struct inode *inode, 
+int mds_log_op_unlink(struct obd_device *obd, struct inode *inode,
                       struct lov_mds_md *lmm, int lmm_size,
                       struct llog_cookie *logcookies, int cookies_size);
-int mds_llog_init(struct obd_device *obd, struct obd_device *tgt, int count, 
-                  struct llog_logid *logid);
+int mds_llog_init(struct obd_device *obd, struct obd_device *tgt, int count,
+                  struct llog_catid *logid);
 int mds_llog_finish(struct obd_device *obd, int count);
 
 /* mds/mds_lov.c */
index c4d5690..b8ce8b5 100644 (file)
@@ -54,7 +54,8 @@ static int mds_llog_origin_add(struct llog_ctxt *ctxt,
 
 static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count,
                                    struct llog_logid *logid,
-                                   struct llog_gen *gen)
+                                   struct llog_gen *gen,
+                                   struct obd_uuid *uuid)
 {
         struct obd_device *obd = ctxt->loc_obd;
         struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
@@ -63,7 +64,7 @@ static int mds_llog_origin_connect(struct llog_ctxt *ctxt, int count,
         ENTRY;
 
         lctxt = llog_get_context(lov_obd, ctxt->loc_idx);
-        rc = llog_connect(lctxt, count, logid, gen);
+        rc = llog_connect(lctxt, count, logid, gen, uuid);
         RETURN(rc);
 }
 
@@ -118,7 +119,7 @@ static struct llog_operations mds_size_repl_logops = {
 };
 
 int mds_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                  int count, struct llog_logid *logid)
+                  int count, struct llog_catid *logid)
 {
         struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
         int rc;
index 97deb7d..0e9d2f0 100644 (file)
@@ -280,7 +280,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         if (!obd->obd_recovering) {
                 rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
                                   obd->u.mds.mds_lov_desc.ld_tgt_count, NULL,
-                                  NULL);
+                                  NULL, NULL);
                 if (rc != 0)
                         CERROR("faild at llog_origin_connect: %d\n", rc);
 
@@ -497,6 +497,7 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, int active)
 {
         struct obd_uuid *uuid;
         int rc = 0;
+        ENTRY;
 
         if (!active)
                 RETURN(0);
@@ -512,6 +513,21 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, int active)
                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
                       obd->obd_name, uuid->uuid);
         } else {
+                LASSERT(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT) != NULL);
+
+                rc = obd_set_info(obd->u.mds.mds_osc_exp, strlen("mds_conn"), "mds_conn",
+                                  0, uuid);
+                if (rc != 0)
+                        RETURN(rc);
+
+                rc = llog_connect(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT),
+                                  obd->u.mds.mds_lov_desc.ld_tgt_count,
+                                  NULL, NULL, uuid);
+                if (rc != 0) {
+                        CERROR("faild at llog_origin_connect: %d\n", rc);
+                        RETURN(rc);
+                }
+
                 CWARN("MDS %s: %s now active, resetting orphans\n",
                       obd->obd_name, uuid->uuid);
                 rc = mds_lov_clearorphans(&obd->u.mds, uuid);
index fdbfb91..5e2c305 100644 (file)
@@ -85,7 +85,7 @@ int proc_version;
 /* The following are visible and mutable through /proc/sys/lustre/. */
 unsigned int obd_fail_loc;
 unsigned int obd_timeout = 100;
-char obd_lustre_upcall[128] = "/usr/lib/lustre/lustre_upcall";
+char obd_lustre_upcall[128] = "DEFAULT"; /* or NONE or /full/path/to/upcall  */
 unsigned int obd_sync_filter; /* = 0, don't sync by default */
 
 #ifdef __KERNEL__
index 9ee9c4d..5088abb 100644 (file)
@@ -393,7 +393,6 @@ struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
         return obd->u.cli.cl_import;
 }
 
-
 /* Export management functions */
 static void export_handle_addref(void *export)
 {
@@ -588,8 +587,7 @@ int class_disconnect(struct obd_export *export, int flags)
 
         if (export == NULL) {
                 fixme();
-                CDEBUG(D_IOCTL, "disconnect: attempting to free "
-                       "null export %p\n", export);
+                CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
                 RETURN(-EINVAL);
         }
 
index e4146dc..0ad595f 100644 (file)
@@ -66,9 +66,9 @@ void llog_free_handle(struct llog_handle *loghandle)
 
         if (!loghandle->lgh_hdr)
                 goto out;
-        if (le32_to_cpu(loghandle->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)
+        if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
                 list_del_init(&loghandle->u.phd.phd_entry);
-        if (le32_to_cpu(loghandle->lgh_hdr->llh_flags) & LLOG_F_IS_CAT)
+        if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
                 LASSERT(list_empty(&loghandle->u.chd.chd_head));
         OBD_FREE(loghandle->lgh_hdr, LLOG_CHUNK_SIZE);
 
@@ -97,10 +97,10 @@ int llog_cancel_rec(struct llog_handle *loghandle, int index)
                 RETURN(-EINVAL);
         }
 
-        llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) - 1);
+        llh->llh_count--;
 
-        if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
-            (le32_to_cpu(llh->llh_count) == 1) &&
+        if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
+            (llh->llh_count == 1) &&
             (loghandle->lgh_last_idx == (LLOG_BITMAP_BYTES * 8) - 1)) {
                 rc = llog_destroy(loghandle);
                 if (rc)
@@ -131,10 +131,10 @@ int llog_init_handle(struct llog_handle *handle, int flags,
                 RETURN(-ENOMEM);
         handle->lgh_hdr = llh;
         /* first assign flags to use llog_client_ops */
-        llh->llh_flags = cpu_to_le32(flags);
+        llh->llh_flags = flags;
         rc = llog_read_header(handle);
         if (rc == 0) {
-                flags = le32_to_cpu(llh->llh_flags);
+                flags = llh->llh_flags;
                 if (uuid)
                         LASSERT(obd_uuid_equals(uuid, &llh->llh_tgtuuid));
                 GOTO(out, rc);
@@ -146,21 +146,20 @@ int llog_init_handle(struct llog_handle *handle, int flags,
         rc = 0;
 
         handle->lgh_last_idx = 0; /* header is record with index 0 */
-        llh->llh_count = cpu_to_le32(1);         /* for the header record */
-        llh->llh_hdr.lrh_type = cpu_to_le32(LLOG_HDR_MAGIC);
-        llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len =
-                cpu_to_le32(LLOG_CHUNK_SIZE);
+        llh->llh_count = 1;         /* for the header record */
+        llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
+        llh->llh_hdr.lrh_len = llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
         llh->llh_hdr.lrh_index = llh->llh_tail.lrt_index = 0;
-        llh->llh_timestamp = cpu_to_le64(LTIME_S(CURRENT_TIME));
+        llh->llh_timestamp = LTIME_S(CURRENT_TIME);
         if (uuid)
                 memcpy(&llh->llh_tgtuuid, uuid, sizeof(llh->llh_tgtuuid));
-        llh->llh_bitmap_offset = cpu_to_le32(offsetof(typeof(*llh),llh_bitmap));
+        llh->llh_bitmap_offset = offsetof(typeof(*llh),llh_bitmap);
         ext2_set_bit(0, llh->llh_bitmap);
 
 out:
         if (flags & LLOG_F_IS_CAT) {
                 INIT_LIST_HEAD(&handle->u.chd.chd_head);
-                llh->llh_size = cpu_to_le32(sizeof(struct llog_logid_rec));
+                llh->llh_size = sizeof(struct llog_logid_rec);
         }
         else if (flags & LLOG_F_IS_PLAIN)
                 INIT_LIST_HEAD(&handle->u.phd.phd_entry);
@@ -235,11 +234,12 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
                         GOTO(out, rc);
 
                 rec = buf;
-                idx = le32_to_cpu(rec->lrh_index);
+                idx = rec->lrh_index;
                 if (idx < index)
                         CDEBUG(D_HA, "index %u : idx %u\n", index, idx);
                 while (idx < index) {
-                        rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+                        rec = (struct llog_rec_hdr *)
+                                ((char *)rec + rec->lrh_len);
                         idx ++;
                 }
 
@@ -266,7 +266,8 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb,
                         ++index;
                         if (index > last_index)
                                 GOTO(out, rc = 0);
-                        rec = ((void *)rec + le32_to_cpu(rec->lrh_len));
+                        rec = (struct llog_rec_hdr *)
+                                ((char *)rec + rec->lrh_len);
                 }
         }
 
index b0e82fe..d4fa370 100644 (file)
@@ -51,7 +51,7 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
 {
         struct llog_handle *loghandle;
         struct llog_log_hdr *llh;
-        struct llog_logid_rec rec;
+        struct llog_logid_rec rec = { { 0 }, };
         int rc, index, bitmap_size;
         ENTRY;
 
@@ -61,7 +61,7 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
         index = (cathandle->lgh_last_idx + 1) % bitmap_size;
 
         /* maximum number of available slots in catlog is bitmap_size - 2 */
-        if (llh->llh_cat_idx == cpu_to_le32(index)) {
+        if (llh->llh_cat_idx == index) {
                 CERROR("no free catalog slots for log...\n");
                 RETURN(ERR_PTR(-ENOSPC));
         } else {
@@ -73,8 +73,8 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
                         LBUG(); /* should never happen */
                 }
                 cathandle->lgh_last_idx = index;
-                llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
-                llh->llh_tail.lrt_index = cpu_to_le32(index);
+                llh->llh_count++;
+                llh->llh_tail.lrt_index = index;
         }
 
         rc = llog_create(cathandle->lgh_ctxt, &loghandle, NULL, NULL);
@@ -91,12 +91,12 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
                LPX64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen,
                index, cathandle->lgh_id.lgl_oid);
         /* build the record for this log in the catalog */
-        rec.lid_hdr.lrh_len = cpu_to_le32(sizeof(rec));
-        rec.lid_hdr.lrh_index = cpu_to_le32(index);
-        rec.lid_hdr.lrh_type = cpu_to_le32(LLOG_LOGID_MAGIC);
+        rec.lid_hdr.lrh_len = sizeof(rec);
+        rec.lid_hdr.lrh_index = index;
+        rec.lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
         rec.lid_id = loghandle->lgh_id;
-        rec.lid_tail.lrt_len = cpu_to_le32(sizeof(rec));
-        rec.lid_tail.lrt_index = cpu_to_le32(index);
+        rec.lid_tail.lrt_len = sizeof(rec);
+        rec.lid_tail.lrt_index = index;
 
         /* update the catalog: header and record */
         rc = llog_write_rec(cathandle, &rec.lid_hdr,
@@ -105,7 +105,7 @@ static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
                 GOTO(out_destroy, rc);
         }
 
-        loghandle->lgh_hdr->llh_cat_idx = cpu_to_le32(index);
+        loghandle->lgh_hdr->llh_cat_idx = index;
         cathandle->u.chd.chd_current_log = loghandle;
         LASSERT(list_empty(&loghandle->u.phd.phd_entry));
         list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
@@ -163,8 +163,8 @@ int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
         if (!rc) {
                 loghandle->u.phd.phd_cat_handle = cathandle;
                 loghandle->u.phd.phd_cookie.lgc_lgl = cathandle->lgh_id;
-                loghandle->u.phd.phd_cookie.lgc_index =
-                        le32_to_cpu(loghandle->lgh_hdr->llh_cat_idx);
+                loghandle->u.phd.phd_cookie.lgc_index = 
+                        loghandle->lgh_hdr->llh_cat_idx;
         }
 
 out:
@@ -257,13 +257,21 @@ int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
         int rc;
         ENTRY;
 
-        LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
+        LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
         loghandle = llog_cat_current_log(cathandle, 1);
         if (IS_ERR(loghandle))
                 RETURN(PTR_ERR(loghandle));
         /* loghandle is already locked by llog_cat_current_log() for us */
         rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
         up_write(&loghandle->lgh_lock);
+        if (rc == -ENOSPC) {
+                /* to create a new plain log */
+                loghandle = llog_cat_current_log(cathandle, 1);
+                if (IS_ERR(loghandle))
+                        RETURN(PTR_ERR(loghandle));
+                rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
+                up_write(&loghandle->lgh_lock);
+        }
 
         RETURN(rc);
 }
@@ -328,13 +336,13 @@ int llog_cat_process_cb(struct llog_handle *cat_llh, struct llog_rec_hdr *rec,
         struct llog_handle *llh;
         int rc;
 
-        if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+        if (rec->lrh_type != LLOG_LOGID_MAGIC) {
                 CERROR("invalid record in catalog\n");
                 RETURN(-EINVAL);
         }
         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
-               le32_to_cpu(rec->lrh_index), cat_llh->lgh_id.lgl_oid);
+               rec->lrh_index, cat_llh->lgh_id.lgl_oid);
 
         rc = llog_cat_id2handle(cat_llh, &llh, &lir->lid_id);
         if (rc) {
@@ -355,7 +363,7 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
         int rc;
         ENTRY;
 
-        LASSERT(llh->llh_flags &cpu_to_le32(LLOG_F_IS_CAT));
+        LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
         d.lpd_data = data;
         d.lpd_cb = cb;
 
@@ -363,7 +371,7 @@ int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data)
                 CWARN("catlog "LPX64" crosses index zero\n",
                       cat_llh->lgh_id.lgl_oid);
 
-                cd.first_idx = le32_to_cpu(llh->llh_cat_idx);
+                cd.first_idx = llh->llh_cat_idx;
                 cd.last_idx = 0;
                 rc = llog_process(cat_llh, llog_cat_process_cb, &d, &cd);
                 if (rc != 0)
@@ -387,17 +395,17 @@ int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
         ENTRY;
 
         bitmap_size = sizeof(llh->llh_bitmap) * 8;
-        if (llh->llh_cat_idx == cpu_to_le32(index - 1)) {
-                idx = le32_to_cpu(llh->llh_cat_idx) + 1;
-                llh->llh_cat_idx = cpu_to_le32(idx);
+        if (llh->llh_cat_idx == (index - 1)) {
+                idx = llh->llh_cat_idx + 1;
+                llh->llh_cat_idx = idx;
                 if (idx == cathandle->lgh_last_idx)
                         goto out;
                 for (i = (index + 1) % bitmap_size;
                      i != cathandle->lgh_last_idx;
                      i = (i + 1) % bitmap_size) {
                         if (!ext2_test_bit(i, llh->llh_bitmap)) {
-                                idx = le32_to_cpu(llh->llh_cat_idx) + 1;
-                                llh->llh_cat_idx = cpu_to_le32(idx);
+                                idx = llh->llh_cat_idx + 1;
+                                llh->llh_cat_idx = idx;
                         } else if (i == 0) {
                                 llh->llh_cat_idx = 0;
                         } else {
@@ -406,7 +414,7 @@ int llog_cat_set_first_idx(struct llog_handle *cathandle, int index)
                 }
 out:
                 CDEBUG(D_HA, "set catlog "LPX64" first idx %u\n",
-                       cathandle->lgh_id.lgl_oid,le32_to_cpu(llh->llh_cat_idx));
+                       cathandle->lgh_id.lgl_oid, llh->llh_cat_idx);
         }
 
         RETURN(0);
index 8674351..0066087 100644 (file)
@@ -1,10 +1,8 @@
 #ifndef __LLOG_INTERNAL_H__
 #define __LLOG_INTERNAL_H__
 
-int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd, 
-                      char *name, int count, struct llog_logid *idarray);
-int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd, 
-                      char *name, int count, struct llog_logid *);
+int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
+                      char *name, int count, struct llog_catid *idarray);
 int llog_cat_id2handle(struct llog_handle *cathandle, struct llog_handle **res,
                        struct llog_logid *logid);
 #endif
index 310f122..14d20f2 100644 (file)
@@ -69,7 +69,7 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
         char *endp;
         int cur_index, rc = 0;
 
-        cur_index = le32_to_cpu(rec->lrh_index);
+        cur_index = rec->lrh_index;
 
         if (ioc_data && (ioc_data->ioc_inllen1)) {
                 l = 0;
@@ -90,15 +90,15 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
                 if (to > 0 && cur_index > to)
                         RETURN(-LLOG_EEMPTY);
         }
-        if (handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) {
+        if (handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) {
                 struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
                 struct llog_handle *log_handle;
 
-                if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) {
+                if (rec->lrh_type != LLOG_LOGID_MAGIC) {
                         l = snprintf(out, remains, "[index]: %05d  [type]: "
                                      "%02x  [len]: %04d failed\n",
-                                     cur_index, le32_to_cpu(rec->lrh_type),
-                                     le32_to_cpu(rec->lrh_len));
+                                     cur_index, rec->lrh_type,
+                                     rec->lrh_len);
                 }
                 if (handle->lgh_ctxt == NULL)
                         RETURN(-EOPNOTSUPP);
@@ -106,7 +106,7 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
                 rc = llog_process(log_handle, llog_check_cb, NULL, NULL);
                 llog_close(log_handle);
         } else {
-                switch (le32_to_cpu(rec->lrh_type)) {
+                switch (rec->lrh_type) {
                 case OST_SZ_REC:
                 case OST_RAID1_REC:
                 case MDS_UNLINK_REC:
@@ -115,8 +115,8 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
                 case LLOG_HDR_MAGIC: {
                          l = snprintf(out, remains, "[index]: %05d  [type]: "
                                       "%02x  [len]: %04d ok\n",
-                                      cur_index, le32_to_cpu(rec->lrh_type),
-                                      le32_to_cpu(rec->lrh_len));
+                                      cur_index, rec->lrh_type,
+                                      rec->lrh_len);
                          out += l;
                          remains -= l;
                          if (remains <= 0) {
@@ -128,8 +128,8 @@ static int llog_check_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
                 default: {
                          l = snprintf(out, remains, "[index]: %05d  [type]: "
                                       "%02x  [len]: %04d failed\n",
-                                      cur_index, le32_to_cpu(rec->lrh_type),
-                                      le32_to_cpu(rec->lrh_len));
+                                      cur_index, rec->lrh_type,
+                                      rec->lrh_len);
                          out += l;
                          remains -= l;
                          if (remains <= 0) {
@@ -168,15 +168,15 @@ static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
                 ioc_data->ioc_inllen1 = 0;
         }
 
-        cur_index = le32_to_cpu(rec->lrh_index);
+        cur_index = rec->lrh_index;
         if (cur_index < from)
                 RETURN(0);
         if (to > 0 && cur_index > to)
                 RETURN(-LLOG_EEMPTY);
 
-        if (handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)) {
+        if (handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) {
                 struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
-                if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC)) {
+                if (rec->lrh_type != LLOG_LOGID_MAGIC) {
                         CERROR("invalid record in catalog\n");
                         RETURN(-EINVAL);
                 }
@@ -188,8 +188,8 @@ static int llog_print_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
         } else {
                 l = snprintf(out, remains,
                              "[index]: %05d  [type]: %02x  [len]: %04d\n",
-                             cur_index, le32_to_cpu(rec->lrh_type),
-                             le32_to_cpu(rec->lrh_len));
+                             cur_index, rec->lrh_type,
+                             rec->lrh_len);
         }
         out += l;
         remains -= l;
@@ -235,7 +235,7 @@ static int llog_delete_cb(struct llog_handle *handle, struct llog_rec_hdr *rec,
         struct  llog_logid_rec *lir = (struct llog_logid_rec*)rec;
         int     rc;
 
-        if (rec->lrh_type != cpu_to_le32(LLOG_LOGID_MAGIC))
+        if (rec->lrh_type != LLOG_LOGID_MAGIC)
               return (-EINVAL);
         rc = llog_remove_log(handle, &lir->lid_id);
 
@@ -283,10 +283,10 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
                              "last index:       %d\n",
                              handle->lgh_id.lgl_oid, handle->lgh_id.lgl_ogr,
                              handle->lgh_id.lgl_ogen,
-                             le32_to_cpu(handle->lgh_hdr->llh_flags),
-                             le32_to_cpu(handle->lgh_hdr->llh_flags) &
+                             handle->lgh_hdr->llh_flags,
+                             handle->lgh_hdr->llh_flags &
                              LLOG_F_IS_CAT ? "cat" : "plain",
-                             le32_to_cpu(handle->lgh_hdr->llh_count),
+                             handle->lgh_hdr->llh_count,
                              handle->lgh_last_idx);
                 out += l;
                 remains -= l;
@@ -316,7 +316,7 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
                 struct llog_logid plain;
                 char *endp;
 
-                if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
+                if (!(handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
                         GOTO(out_close, err = -EINVAL);
 
                 err = str2logid(&plain, data->ioc_inlbuf2, data->ioc_inllen2);
@@ -333,7 +333,7 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
         case OBD_IOC_LLOG_REMOVE: {
                 struct llog_logid plain;
 
-                if (!(handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT)))
+                if (!(handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT))
                         GOTO(out_close, err = -EINVAL);
 
                 if (data->ioc_inlbuf2) {
@@ -353,7 +353,7 @@ int llog_ioctl(struct llog_ctxt *ctxt, int cmd, struct obd_ioctl_data *data)
 
 out_close:
         if (handle->lgh_hdr &&
-            handle->lgh_hdr->llh_flags & cpu_to_le32(LLOG_F_IS_CAT))
+            handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
                 llog_cat_put(handle);
         else
                 llog_close(handle);
@@ -366,8 +366,9 @@ int llog_catlog_list(struct obd_device *obd, int count,
                      struct obd_ioctl_data *data)
 {
         int size, i;
-        struct llog_logid *idarray, *id;
-        char name[32] = "CATLIST";
+        struct llog_catid *idarray;
+        struct llog_logid *id;
+        char name[32] = CATLIST;
         char *out;
         int l, remains, rc = 0;
 
@@ -386,12 +387,11 @@ int llog_catlog_list(struct obd_device *obd, int count,
 
         out = data->ioc_bulk;
         remains = data->ioc_inllen1;
-        id = idarray;
         for (i = 0; i < count; i++) {
+                id = &idarray[i].lci_logid;
                 l = snprintf(out, remains,
                              "catalog log: #"LPX64"#"LPX64"#%08x\n",
                              id->lgl_oid, id->lgl_ogr, id->lgl_ogen);
-                id++;
                 out += l;
                 remains -= l;
                 if (remains <= 0) {
index ec32b11..ad0b562 100644 (file)
 static int llog_lvfs_pad(struct obd_device *obd, struct l_file *file,
                                 int len, int index)
 {
-        struct llog_rec_hdr rec;
+        struct llog_rec_hdr rec = { 0 };
         struct llog_rec_tail tail;
         int rc;
         ENTRY;
 
         LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0);
 
-        tail.lrt_len = rec.lrh_len = cpu_to_le32(len);
-        tail.lrt_index = rec.lrh_index = cpu_to_le32(index);
+        tail.lrt_len = rec.lrh_len = len;
+        tail.lrt_index = rec.lrh_index = index;
         rec.lrh_type = 0;
 
         rc = fsfilt_write_record(obd, file, &rec, sizeof(rec), &file->f_pos, 0);
@@ -86,7 +86,7 @@ static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
         int rc;
         struct llog_rec_tail end;
         loff_t saved_off = file->f_pos;
-        int buflen = le32_to_cpu(rec->lrh_len);
+        int buflen = rec->lrh_len;
 
         ENTRY;
         file->f_pos = off;
@@ -101,7 +101,7 @@ static int llog_lvfs_write_blob(struct obd_device *obd, struct l_file *file,
         }
 
         /* the buf case */
-        rec->lrh_len = cpu_to_le32(sizeof(*rec) + buflen + sizeof(end));
+        rec->lrh_len = sizeof(*rec) + buflen + sizeof(end);
         rc = fsfilt_write_record(obd, file, rec, sizeof(*rec), &file->f_pos, 0);
         if (rc) {
                 CERROR("error writing log hdr: rc %d\n", rc);
@@ -165,7 +165,7 @@ static int llog_lvfs_read_header(struct llog_handle *handle)
         if (rc)
                 CERROR("error reading log header\n");
 
-        handle->lgh_last_idx = le32_to_cpu(handle->lgh_hdr->llh_tail.lrt_index);
+        handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
         handle->lgh_file->f_pos = handle->lgh_file->f_dentry->d_inode->i_size;
 
         RETURN(rc);
@@ -179,11 +179,10 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                                void *buf, int idx)
 {
         struct llog_log_hdr *llh;
-        int reclen = le32_to_cpu(rec->lrh_len), index, rc;
+        int reclen = rec->lrh_len, index, rc;
         struct llog_rec_tail *lrt;
         struct obd_device *obd;
         struct file *file;
-        loff_t offset;
         size_t left;
         ENTRY;
 
@@ -193,8 +192,8 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
 
         /* record length should not bigger than LLOG_CHUNK_SIZE */
         if (buf)
-                rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr)
-                      sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
+                rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) -
+                      sizeof(struct llog_rec_tail)) ? -E2BIG : 0;
         else
                 rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0;
         if (rc)
@@ -217,7 +216,7 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                 if (rc || idx == 0)
                         RETURN(rc);
 
-                saved_offset = sizeof(*llh) + (idx-1)*le32_to_cpu(rec->lrh_len);
+                saved_offset = sizeof(*llh) + (idx-1)*rec->lrh_len;
                 rc = llog_lvfs_write_blob(obd, file, rec, buf, saved_offset);
                 if (rc == 0 && reccookie) {
                         reccookie->lgc_lgl = loghandle->lgh_id;
@@ -236,23 +235,28 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
          */
         left = LLOG_CHUNK_SIZE - (file->f_pos & (LLOG_CHUNK_SIZE - 1));
         if (buf)
-                reclen = sizeof(*rec) + le32_to_cpu(rec->lrh_len) +
+                reclen = sizeof(*rec) + rec->lrh_len + 
                         sizeof(struct llog_rec_tail);
 
         /* NOTE: padding is a record, but no bit is set */
         if (left != 0 && left != reclen &&
             left < (reclen + LLOG_MIN_REC_SIZE)) {
+                int bitmap_size = sizeof(llh->llh_bitmap) * 8;
                 loghandle->lgh_last_idx++;
                 rc = llog_lvfs_pad(obd, file, left, loghandle->lgh_last_idx);
                 if (rc)
                         RETURN(rc);
+                /* if it's the last idx in log file, then return -ENOSPC */
+                if (loghandle->lgh_last_idx == bitmap_size - 1)
+                        RETURN(-ENOSPC);
         }
 
         loghandle->lgh_last_idx++;
         index = loghandle->lgh_last_idx;
-        rec->lrh_index = cpu_to_le32(index);
+        rec->lrh_index = index;
         if (buf == NULL) {
-                lrt = (void *)rec + le32_to_cpu(rec->lrh_len) - sizeof(*lrt);
+                lrt = (struct llog_rec_tail *)
+                        ((char *)rec + rec->lrh_len - sizeof(*lrt));
                 lrt->lrt_len = rec->lrh_len;
                 lrt->lrt_index = rec->lrh_index;
         }
@@ -260,10 +264,9 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                 CERROR("argh, index %u already set in log bitmap?\n", index);
                 LBUG(); /* should never happen */
         }
-        llh->llh_count = cpu_to_le32(le32_to_cpu(llh->llh_count) + 1);
-        llh->llh_tail.lrt_index = cpu_to_le32(index);
+        llh->llh_count++;
+        llh->llh_tail.lrt_index = index;
 
-        offset = 0;
         rc = llog_lvfs_write_blob(obd, file, &llh->llh_hdr, NULL, 0);
         if (rc)
                 RETURN(rc);
@@ -273,21 +276,21 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                 RETURN(rc);
 
         CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n",
-               loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len));
+               loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
         if (rc == 0 && reccookie) {
                 reccookie->lgc_lgl = loghandle->lgh_id;
                 reccookie->lgc_index = index;
-                if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
+                if (rec->lrh_type == MDS_UNLINK_REC)
                         reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
-                else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
+                else if (rec->lrh_type == OST_SZ_REC)
                         reccookie->lgc_subsys = LLOG_SIZE_ORIG_CTXT;
-                else if (le32_to_cpu(rec->lrh_type) == OST_RAID1_REC)
+                else if (rec->lrh_type == OST_RAID1_REC)
                         reccookie->lgc_subsys = LLOG_RD1_ORIG_CTXT;
                 else
                         reccookie->lgc_subsys = -1;
                 rc = 1;
         }
-        if (rc == 0 && le32_to_cpu(rec->lrh_type) == LLOG_GEN_REC)
+        if (rc == 0 && rec->lrh_type == LLOG_GEN_REC)
                 rc = 1;
 
         RETURN(rc);
@@ -362,7 +365,7 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
                 }
 
                 tail = buf + rc - sizeof(struct llog_rec_tail);
-                *cur_idx = le32_to_cpu(tail->lrt_index);
+                *cur_idx = tail->lrt_index;
 
                 /* this shouldn't happen */
                 if (tail->lrt_index == 0) {
@@ -371,15 +374,15 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *cur_idx,
                                loghandle->lgh_id.lgl_ogen, *cur_offset);
                         RETURN(-EINVAL);
                 }
-                if (le32_to_cpu(tail->lrt_index) < next_idx)
+                if (tail->lrt_index < next_idx)
                         continue;
 
                 /* sanity check that the start of the new buffer is no farther
                  * than the record that we wanted.  This shouldn't happen. */
                 rec = buf;
-                if (le32_to_cpu(rec->lrh_index) > next_idx) {
+                if (rec->lrh_index > next_idx) {
                         CERROR("missed desired record? %u > %u\n",
-                               le32_to_cpu(rec->lrh_index), next_idx);
+                               rec->lrh_index, next_idx);
                         RETURN(-ENOENT);
                 }
                 RETURN(0);
@@ -554,7 +557,7 @@ static int llog_lvfs_destroy(struct llog_handle *handle)
 
 /* reads the catalog list */
 int llog_get_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
-                      char *name, int count, struct llog_logid *idarray)
+                      char *name, int count, struct llog_catid *idarray)
 {
         struct obd_run_ctxt saved;
         struct l_file *file;
@@ -596,7 +599,7 @@ EXPORT_SYMBOL(llog_get_cat_list);
 
 /* writes the cat list */
 int llog_put_cat_list(struct obd_device *obd, struct obd_device *disk_obd,
-                      char *name, int count, struct llog_logid *idarray)
+                      char *name, int count, struct llog_catid *idarray)
 {
         struct obd_run_ctxt saved;
         struct l_file *file;
index d01441a..e9a9856 100644 (file)
@@ -128,13 +128,13 @@ static int cat_cancel_cb(struct llog_handle *cathandle,
         int rc, index;
         ENTRY;
 
-        if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+        if (rec->lrh_type != LLOG_LOGID_MAGIC) {
                 CERROR("invalid record in catalog\n");
                 RETURN(-EINVAL);
         }
         CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
                lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
-               le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid);
+               rec->lrh_index, cathandle->lgh_id.lgl_oid);
 
         rc = llog_cat_id2handle(cathandle, &loghandle, &lir->lid_id);
         if (rc) {
@@ -144,8 +144,8 @@ static int cat_cancel_cb(struct llog_handle *cathandle,
         }
 
         llh = loghandle->lgh_hdr;
-        if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
-            (le32_to_cpu(llh->llh_count) == 1)) {
+        if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
+            (llh->llh_count == 1)) {
                 rc = llog_destroy(loghandle);
                 if (rc)
                         CERROR("failure destroying log in postsetup: %d\n", rc);
@@ -160,7 +160,7 @@ static int cat_cancel_cb(struct llog_handle *cathandle,
                 if (rc == 0)
                         CWARN("cancel log "LPX64":%x at index %u of catalog "
                               LPX64"\n", lir->lid_id.lgl_oid,
-                              lir->lid_id.lgl_ogen, le32_to_cpu(rec->lrh_index),
+                              lir->lid_id.lgl_ogen, rec->lrh_index,
                               cathandle->lgh_id.lgl_oid);
         }
 
@@ -233,9 +233,9 @@ int llog_obd_origin_cleanup(struct llog_ctxt *ctxt)
                                          &cathandle->u.chd.chd_head,
                                          u.phd.phd_entry) {
                         llh = loghandle->lgh_hdr;
-                        if ((le32_to_cpu(llh->llh_flags) &
+                        if ((llh->llh_flags &
                                 LLOG_F_ZAP_WHEN_EMPTY) &&
-                            (le32_to_cpu(llh->llh_count) == 1)) {
+                            (llh->llh_count == 1)) {
                                 rc = llog_destroy(loghandle);
                                 if (rc)
                                         CERROR("failure destroying log during "
@@ -280,9 +280,9 @@ EXPORT_SYMBOL(llog_obd_origin_add);
 
 int llog_cat_initialize(struct obd_device *obd, int count)
 {
-        struct llog_logid *idarray;
+        struct llog_catid *idarray;
         int size = sizeof(*idarray) * count;
-        char name[32] = "CATLIST";
+        char name[32] = CATLIST;
         int rc;
         ENTRY;
 
@@ -290,8 +290,6 @@ int llog_cat_initialize(struct obd_device *obd, int count)
         if (!idarray)
                 RETURN(-ENOMEM);
 
-        memset(idarray, 0, size);
-
         rc = llog_get_cat_list(obd, obd, name, count, idarray);
         if (rc) {
                 CERROR("rc: %d\n", rc);
@@ -317,7 +315,7 @@ int llog_cat_initialize(struct obd_device *obd, int count)
 EXPORT_SYMBOL(llog_cat_initialize);
 
 int obd_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
-                  int count, struct llog_logid *logid)
+                  int count, struct llog_catid *logid)
 {
         int rc;
         ENTRY;
index 0607d12..f8e6de1 100644 (file)
@@ -62,9 +62,9 @@ static int verify_handle(char *test, struct llog_handle *llh, int num_recs)
                 RETURN(-ERANGE);
         }
 
-        if (le32_to_cpu(llh->lgh_hdr->llh_count) != num_recs) {
+        if (llh->lgh_hdr->llh_count != num_recs) {
                 CERROR("%s: handle->count is %d, expected %d after write\n",
-                       test, le32_to_cpu(llh->lgh_hdr->llh_count), num_recs);
+                       test, llh->lgh_hdr->llh_count, num_recs);
                 RETURN(-ERANGE);
         }
 
@@ -168,8 +168,8 @@ static int llog_test_3(struct obd_device *obd, struct llog_handle *llh)
         int num_recs = 1;       /* 1 for the header */
         ENTRY;
 
-        lcr.lcr_hdr.lrh_len = lcr.lcr_tail.lrt_len = cpu_to_le32(sizeof(lcr));
-        lcr.lcr_hdr.lrh_type = cpu_to_le32(OST_SZ_REC);
+        lcr.lcr_hdr.lrh_len = lcr.lcr_tail.lrt_len = sizeof(lcr);
+        lcr.lcr_hdr.lrh_type = OST_SZ_REC;
 
         CWARN("3a: write one create_rec\n");
         rc = llog_write_rec(llh,  &lcr.lcr_hdr, NULL, 0, NULL, -1);
@@ -186,8 +186,8 @@ static int llog_test_3(struct obd_device *obd, struct llog_handle *llh)
         for (i = 0; i < 10; i++) {
                 struct llog_rec_hdr hdr;
                 char buf[8];
-                hdr.lrh_len = cpu_to_le32(8);
-                hdr.lrh_type = cpu_to_le32(OBD_CFG_REC);
+                hdr.lrh_len = 8;
+                hdr.lrh_type = OBD_CFG_REC;
                 memset(buf, 0, sizeof buf);
                 rc = llog_write_rec(llh, &hdr, NULL, 0, buf, -1);
                 if (rc) {
@@ -237,9 +237,8 @@ static int llog_test_4(struct obd_device *obd)
 
         ENTRY;
 
-        lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len =
-                cpu_to_le32(LLOG_MIN_REC_SIZE);
-        lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00);
+        lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
+        lmr.lmr_hdr.lrh_type = 0xf00f00;
 
         sprintf(name, "%x", llog_test_rand+1);
         CWARN("4a: create a catalog log with name: %s\n", name);
@@ -294,8 +293,8 @@ static int llog_test_4(struct obd_device *obd)
         if (buf == NULL)
                 GOTO(out, rc = -ENOMEM);
         for (i = 0; i < 5; i++) {
-                rec.lrh_len = cpu_to_le32(buflen);
-                rec.lrh_type = cpu_to_le32(OBD_CFG_REC);
+                rec.lrh_len = buflen;
+                rec.lrh_type = OBD_CFG_REC;
                 rc = llog_cat_add_rec(cath, &rec, NULL, buf);
                 if (rc) {
                         CERROR("4e: write 5 records failed at #%d: %d\n",
@@ -320,13 +319,13 @@ static int cat_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec,
 {
         struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
 
-        if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+        if (rec->lrh_type != LLOG_LOGID_MAGIC) {
                 CERROR("invalid record in catalog\n");
                 RETURN(-EINVAL);
         }
 
         CWARN("seeing record at index %d - "LPX64":%x in log "LPX64"\n",
-               le32_to_cpu(rec->lrh_index), lir->lid_id.lgl_oid,
+               rec->lrh_index, lir->lid_id.lgl_oid,
                lir->lid_id.lgl_ogen, llh->lgh_id.lgl_oid);
         RETURN(0);
 }
@@ -334,13 +333,13 @@ static int cat_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec,
 static int plain_print_cb(struct llog_handle *llh, struct llog_rec_hdr *rec,
                           void *data)
 {
-        if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
+        if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
                 CERROR("log is not plain\n");
                 RETURN(-EINVAL);
         }
 
         CWARN("seeing record at index %d in log "LPX64"\n",
-               le32_to_cpu(rec->lrh_index), llh->lgh_id.lgl_oid);
+               rec->lrh_index, llh->lgh_id.lgl_oid);
         RETURN(0);
 }
 
@@ -350,13 +349,13 @@ static int llog_cancel_rec_cb(struct llog_handle *llh, struct llog_rec_hdr *rec,
         struct llog_cookie cookie;
         static int i = 0;
 
-        if (!(le32_to_cpu(llh->lgh_hdr->llh_flags) & LLOG_F_IS_PLAIN)) {
+        if (!(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)) {
                 CERROR("log is not plain\n");
                 RETURN(-EINVAL);
         }
 
         cookie.lgc_lgl = llh->lgh_id;
-        cookie.lgc_index = le32_to_cpu(rec->lrh_index);
+        cookie.lgc_index = rec->lrh_index;
 
         llog_cat_cancel_records(llh->u.phd.phd_cat_handle, 1, &cookie);
         i++;
@@ -378,9 +377,8 @@ static int llog_test_5(struct obd_device *obd)
 
         ENTRY;
 
-        lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len =
-                cpu_to_le32(LLOG_MIN_REC_SIZE);
-        lmr.lmr_hdr.lrh_type = cpu_to_le32(0xf00f00);
+        lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
+        lmr.lmr_hdr.lrh_type = 0xf00f00;
 
         CWARN("5a: re-open catalog by id\n");
         rc = llog_create(ctxt, &llh, &cat_logid, NULL);
@@ -548,7 +546,7 @@ static int llog_run_tests(struct obd_device *obd)
 
 
 static int llog_test_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                               int count, struct llog_logid *logid)
+                               int count, struct llog_catid *logid)
 {
         int rc;
         ENTRY;
index 54a1d7b..119ca99 100644 (file)
@@ -646,7 +646,7 @@ int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_finish); 
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, pin); 
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, unpin);
-        LPROCFS_OBD_OP_INIT(num_private_stats, stats, invalidate_import);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, import_event);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, notify);
         
         for (i = num_private_stats; i < num_stats; i++) {
index 2339f28..0a6acfe 100644 (file)
@@ -144,8 +144,8 @@ static void cleanup_all_handles(void)
                         struct portals_handle *h;
                         h = list_entry(tmp, struct portals_handle, h_link);
 
-                        CERROR("forcing cleanup for handle "LPX64"\n",
-                               h->h_cookie);
+                        CERROR("force clean handle "LPX64" addr %p addref %p\n",
+                               h->h_cookie, h, h->h_addref);
 
                         class_handle_unhash_nolock(h);
                 }
index 8ac9b5a..093f3ac 100644 (file)
@@ -362,17 +362,20 @@ preprw_cleanup:
 
 int echo_commitrw(int cmd, struct obd_export *export, struct obdo *oa,
                   int objcount, struct obd_ioobj *obj, int niocount,
-                  struct niobuf_local *res, struct obd_trans_info *oti)
+                  struct niobuf_local *res, struct obd_trans_info *oti, int rc)
 {
         struct obd_device *obd;
         struct niobuf_local *r = res;
-        int i, vrc = 0, rc = 0;
+        int i, vrc = 0;
         ENTRY;
 
         obd = export->exp_obd;
         if (obd == NULL)
                 RETURN(-EINVAL);
 
+        if (rc)
+                GOTO(commitrw_cleanup, rc);
+
         if ((cmd & OBD_BRW_RWMASK) == OBD_BRW_READ) {
                 CDEBUG(D_PAGE, "reading %d obdos with %d IOs\n",
                        objcount, niocount);
index f5bcf79..136d357 100644 (file)
@@ -905,7 +905,7 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
         struct niobuf_remote *rnb;
         obd_off off;
         obd_size npages, tot_pages;
-        int i, ret = 0, err = 0;
+        int i, ret = 0;
         ENTRY;
 
         if (count <= 0 || (count & (PAGE_SIZE - 1)) != 0 ||
@@ -946,29 +946,27 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
                         struct page *page = lnb[i].page;
 
                         /* read past eof? */
-                        if (page == NULL && lnb[i].rc == 0) 
+                        if (page == NULL && lnb[i].rc == 0)
                                 continue;
 
                         if (oa->o_id == ECHO_PERSISTENT_OBJID)
                                 continue;
 
-                        if (rw == OBD_BRW_WRITE) 
-                                echo_client_page_debug_setup(lsm, page, rw, 
-                                                             oa->o_id, 
-                                                             rnb[i].offset, 
+                        if (rw == OBD_BRW_WRITE)
+                                echo_client_page_debug_setup(lsm, page, rw,
+                                                             oa->o_id,
+                                                             rnb[i].offset,
                                                              rnb[i].len);
                         else
-                                echo_client_page_debug_check(lsm, page, 
-                                                             oa->o_id, 
-                                                             rnb[i].offset, 
+                                echo_client_page_debug_check(lsm, page,
+                                                             oa->o_id,
+                                                             rnb[i].offset,
                                                              rnb[i].len);
                 }
 
-                ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti);
+                ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret);
                 if (ret != 0)
                         GOTO(out, ret);
-                if (err)
-                        GOTO(out, ret = err);
         }
 
 out:
@@ -979,7 +977,7 @@ out:
         RETURN(ret);
 }
 
-int echo_client_brw_ioctl(int rw, struct obd_export *exp, 
+int echo_client_brw_ioctl(int rw, struct obd_export *exp,
                           struct obd_ioctl_data *data)
 {
         struct obd_device *obd = class_exp2obd(exp);
index d3785d4..0e8e458 100644 (file)
@@ -570,7 +570,7 @@ static int filter_prep_groups(struct obd_device *obd)
         int i, rc = 0, cleanup_phase = 0;
         ENTRY;
 
-        O_dentry = simple_mkdir(current->fs->pwd, "O", 0700);
+        O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
         CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
         if (IS_ERR(O_dentry)) {
                 rc = PTR_ERR(O_dentry);
@@ -645,7 +645,7 @@ static int filter_prep_groups(struct obd_device *obd)
                 loff_t off = 0;
 
                 sprintf(name, "%d", i);
-                dentry = simple_mkdir(O_dentry, name, 0700);
+                dentry = simple_mkdir(O_dentry, name, 0700, 1);
                 CDEBUG(D_INODE, "got/created O/%s: %p\n", name, dentry);
                 if (IS_ERR(dentry)) {
                         rc = PTR_ERR(dentry);
@@ -704,7 +704,7 @@ static int filter_prep_groups(struct obd_device *obd)
                         char dir[20];
                         snprintf(dir, sizeof(dir), "d%u", i);
 
-                        dentry = simple_mkdir(O_dentry, dir, 0700);
+                        dentry = simple_mkdir(O_dentry, dir, 0700, 1);
                         CDEBUG(D_INODE, "got/created O/0/%s: %p\n", dir,dentry);
                         if (IS_ERR(dentry)) {
                                 rc = PTR_ERR(dentry);
@@ -1411,6 +1411,93 @@ static int filter_precleanup(struct obd_device *obd, int flags)
         RETURN(rc);
 }
 
+/* Do extra sanity checks for grant accounting.  We do this at connect,
+ * disconnect, and statfs RPC time, so it shouldn't be too bad.  We can
+ * always get rid of it or turn it off when we know accounting is good. */
+static void filter_grant_sanity_check(struct obd_device *obd, char *func)
+{
+        struct filter_export_data *fed;
+        struct obd_export *exp;
+        obd_size maxsize = obd->obd_osfs.os_blocks * obd->obd_osfs.os_bsize;
+        obd_size tot_dirty = 0, tot_pending = 0, tot_granted = 0;
+        obd_size fo_tot_dirty, fo_tot_pending, fo_tot_granted;
+
+        if (list_empty(&obd->obd_exports))
+                return;
+
+        spin_lock(&obd->obd_osfs_lock);
+        spin_lock(&obd->obd_dev_lock);
+        list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
+                fed = &exp->exp_filter_data;
+                LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
+                         "cli %s/%p %lu+%lu > "LPU64"\n",
+                         exp->exp_client_uuid.uuid, exp,
+                         fed->fed_grant, fed->fed_pending, maxsize);
+                LASSERTF(fed->fed_dirty <= maxsize, "cli %s/%p %lu > "LPU64"\n",
+                         exp->exp_client_uuid.uuid, exp,fed->fed_dirty,maxsize);
+                CDEBUG(D_CACHE,"%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
+                       obd->obd_name, exp->exp_client_uuid.uuid, exp,
+                       fed->fed_dirty, fed->fed_pending, fed->fed_grant);
+                tot_granted += fed->fed_grant + fed->fed_pending;
+                tot_pending += fed->fed_pending;
+                tot_dirty += fed->fed_dirty;
+        }
+        fo_tot_granted = obd->u.filter.fo_tot_granted;
+        fo_tot_pending = obd->u.filter.fo_tot_pending;
+        fo_tot_dirty = obd->u.filter.fo_tot_dirty;
+        spin_unlock(&obd->obd_dev_lock);
+        spin_unlock(&obd->obd_osfs_lock);
+
+        /* Do these assertions outside the spinlocks so we don't kill system */
+        LASSERTF(tot_granted == fo_tot_granted, "%s "LPU64" != "LPU64"\n",
+                 func, tot_granted, fo_tot_granted);
+        LASSERTF(tot_pending == fo_tot_pending, "%s "LPU64" != "LPU64"\n",
+                 func, tot_pending, fo_tot_pending);
+        LASSERTF(tot_dirty == fo_tot_dirty, "%s "LPU64" != "LPU64"\n",
+                 func, tot_dirty, fo_tot_dirty);
+        LASSERTF(tot_pending <= tot_granted, "%s "LPU64" > "LPU64"\n",
+                 func, tot_pending, tot_granted);
+        LASSERTF(tot_granted <= maxsize, "%s "LPU64" > "LPU64"\n",
+                 func, tot_granted, maxsize);
+        LASSERTF(tot_dirty <= maxsize, "%s "LPU64" > "LPU64"\n",
+                 func, tot_dirty, maxsize);
+}
+
+/* Remove this client from the grant accounting totals.  This is done at
+ * disconnect time and also at export destroy time in case there was a race
+ * between removing the export and an incoming BRW updating the client grant.
+ * The client should do something similar when it invalidates its import. */
+static void filter_grant_discard(struct obd_export *exp)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct filter_obd *filter = &obd->u.filter;
+        struct filter_export_data *fed = &exp->exp_filter_data;
+
+        spin_lock(&obd->obd_osfs_lock);
+        CDEBUG(D_CACHE, "%s: cli %s/%p dirty %lu pend %lu grant %lu\n",
+               obd->obd_name, exp->exp_client_uuid.uuid, exp,
+               fed->fed_dirty, fed->fed_pending, fed->fed_grant);
+
+        LASSERTF(filter->fo_tot_granted >= fed->fed_grant,
+                 "%s: tot_granted "LPU64" cli %s/%p fed_grant %lu\n",
+                 obd->obd_name, filter->fo_tot_granted,
+                 exp->exp_client_uuid.uuid, exp, fed->fed_grant);
+        filter->fo_tot_granted -= fed->fed_grant;
+        LASSERTF(exp->exp_obd->u.filter.fo_tot_pending >= fed->fed_pending,
+                 "%s: tot_pending "LPU64" cli %s/%p fed_pending %lu\n",
+                 obd->obd_name, filter->fo_tot_pending,
+                 exp->exp_client_uuid.uuid, exp, fed->fed_pending);
+        LASSERTF(filter->fo_tot_dirty >= fed->fed_dirty,
+                 "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %lu\n",
+                 obd->obd_name, filter->fo_tot_dirty,
+                 exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
+        filter->fo_tot_dirty -= fed->fed_dirty;
+        fed->fed_dirty = 0;
+        fed->fed_grant = 0;
+
+        spin_unlock(&obd->obd_osfs_lock);
+}
+
 static int filter_destroy_export(struct obd_export *exp)
 {
         ENTRY;
@@ -1424,62 +1511,46 @@ static int filter_destroy_export(struct obd_export *exp)
 
         if (exp->exp_obd->obd_replayable)
                 filter_client_free(exp, exp->exp_flags);
+
+        filter_grant_sanity_check(exp->exp_obd, __FUNCTION__);
+
         RETURN(0);
 }
 
 /* also incredibly similar to mds_disconnect */
 static int filter_disconnect(struct obd_export *exp, int flags)
 {
-        struct filter_obd *filter = &exp->exp_obd->u.filter;
-        struct filter_export_data *fed = &exp->exp_filter_data;
+        struct obd_device *obd = exp->exp_obd;
         unsigned long irqflags;
         struct llog_ctxt *ctxt;
         int rc;
         ENTRY;
 
         LASSERT(exp);
-
-        /* This would imply RPCs still in flight or preprw/commitrw imbalance */
-        if (fed->fed_pending)
-                CWARN("%s: cli %s has %lu pending at disconnect time\n",
-                       exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
-                       fed->fed_pending);
-
-        /* Forget what this client had cached.  This is also done on the
-         * client when it invalidates its import.  Do this before unlinking
-         * from the export list so filter_grant_sanity_check totals are OK. */
-        spin_lock(&exp->exp_obd->obd_osfs_lock);
-        LASSERTF(exp->exp_obd->u.filter.fo_tot_dirty >= fed->fed_dirty,
-                 "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %lu\n",
-                 exp->exp_obd->obd_name, exp->exp_obd->u.filter.fo_tot_dirty,
-                 exp->exp_client_uuid.uuid, exp, fed->fed_dirty);
-        exp->exp_obd->u.filter.fo_tot_dirty -= fed->fed_dirty;
-        LASSERTF(exp->exp_obd->u.filter.fo_tot_granted >= fed->fed_grant,
-                 "%s: tot_granted "LPU64" cli %s/%p fed_grant %lu\n",
-                 exp->exp_obd->obd_name, exp->exp_obd->u.filter.fo_tot_granted,
-                 exp->exp_client_uuid.uuid, exp, fed->fed_grant);
-        exp->exp_obd->u.filter.fo_tot_granted -= fed->fed_grant;
-        LASSERTF(exp->exp_obd->u.filter.fo_tot_pending >= fed->fed_pending,
-                 "%s: tot_pending "LPU64" cli %s/%p fed_pending %lu\n",
-                 exp->exp_obd->obd_name, exp->exp_obd->u.filter.fo_tot_pending,
-                 exp->exp_client_uuid.uuid, exp, fed->fed_pending);
-        fed->fed_dirty = 0;
-        fed->fed_grant = 0;
-        spin_unlock(&exp->exp_obd->obd_osfs_lock);
-
-        ldlm_cancel_locks_for_export(exp);
+        class_export_get(exp);
 
         spin_lock_irqsave(&exp->exp_lock, irqflags);
         exp->exp_flags = flags;
         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
 
-        fsfilt_sync(exp->exp_obd, filter->fo_sb);
+        filter_grant_discard(exp);
+
+        /* Disconnect early so that clients can't keep using export */
+        rc = class_disconnect(exp, flags);
+
+        /* Do this twice in case a BRW arrived between the first call and
+         * the class_export_unlink() call (bug 2663) */
+        filter_grant_discard(exp);
+
+        ldlm_cancel_locks_for_export(exp);
+
+        fsfilt_sync(obd, obd->u.filter.fo_sb);
 
         /* flush any remaining cancel messages out to the target */
-        ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT);
+        ctxt = llog_get_context(obd, LLOG_UNLINK_REPL_CTXT);
         llog_sync(ctxt, exp);
 
-        rc = class_disconnect(exp, flags);
+        class_export_put(exp);
         RETURN(rc);
 }
 
@@ -2091,59 +2162,10 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
-/* debugging to make sure that nothing bad happens, can be turned off soon.
- * caller must hold osfs lock */
-static void filter_grant_total_exports(struct obd_device *obd,
-                                       obd_size *tot_dirty,
-                                       obd_size *tot_pending,
-                                       obd_size *tot_granted,
-                                       obd_size maxsize)
-{
-        struct filter_export_data *fed;
-        struct obd_export *exp_pos;
-
-        spin_lock(&obd->obd_dev_lock);
-        list_for_each_entry(exp_pos, &obd->obd_exports, exp_obd_chain) {
-                fed = &exp_pos->exp_filter_data;
-                LASSERTF(fed->fed_dirty <= maxsize, "cli %s/%p %lu > "LPU64"\n",
-                         exp_pos->exp_client_uuid.uuid, exp_pos,
-                         fed->fed_dirty, maxsize);
-                LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize,
-                         "cli %s/%p %lu+%lu > "LPU64"\n",
-                         exp_pos->exp_client_uuid.uuid, exp_pos,
-                         fed->fed_grant, fed->fed_pending, maxsize);
-                *tot_dirty += fed->fed_dirty;
-                *tot_pending += fed->fed_pending;
-                *tot_granted += fed->fed_grant + fed->fed_pending;
-        }
-        spin_unlock(&obd->obd_dev_lock);
-}
-
-static void filter_grant_sanity_check(obd_size tot_dirty, obd_size tot_pending,
-                                      obd_size tot_granted,
-                                      obd_size fo_tot_dirty,
-                                      obd_size fo_tot_pending,
-                                      obd_size fo_tot_granted, obd_size maxsize)
-{
-        LASSERTF(tot_dirty == fo_tot_dirty, LPU64" != "LPU64"\n",
-                 tot_dirty, fo_tot_dirty);
-        LASSERTF(tot_pending == fo_tot_pending, LPU64" != "LPU64"\n",
-                 tot_pending, fo_tot_pending);
-        LASSERTF(tot_granted == fo_tot_granted, LPU64" != "LPU64"\n",
-                 tot_granted, fo_tot_granted);
-        LASSERTF(tot_dirty <= maxsize, LPU64" > "LPU64"\n", tot_dirty, maxsize);
-        LASSERTF(tot_pending <= tot_granted, LPU64" > "LPU64"\n", tot_pending,
-                 tot_granted);
-        LASSERTF(tot_granted <= maxsize, LPU64" > "LPU64"\n",
-                 tot_granted, maxsize);
-}
-
 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                          unsigned long max_age)
 {
         struct filter_obd *filter = &obd->u.filter;
-        obd_size tot_cached = 0, tot_pending = 0, tot_granted = 0;
-        obd_size fo_tot_cached, fo_tot_pending, fo_tot_granted;
         int blockbits = filter->fo_sb->s_blocksize_bits;
         int rc;
         ENTRY;
@@ -2154,26 +2176,19 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
         spin_lock(&obd->obd_osfs_lock);
         rc = fsfilt_statfs(obd, filter->fo_sb, max_age);
         memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
-        filter_grant_total_exports(obd, &tot_cached, &tot_pending, &tot_granted,
-                                   osfs->os_blocks << blockbits);
-        fo_tot_cached = filter->fo_tot_dirty;
-        fo_tot_pending = filter->fo_tot_pending;
-        fo_tot_granted = filter->fo_tot_granted;
         spin_unlock(&obd->obd_osfs_lock);
 
-        /* Do check outside spinlock, to avoid wedging system on failure */
-        filter_grant_sanity_check(tot_cached, tot_pending, tot_granted,
-                                  fo_tot_cached, fo_tot_pending,
-                                  fo_tot_granted, osfs->os_blocks << blockbits);
-
         CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64
-               "pending "LPU64" free "LPU64" avail "LPU64"\n",
-               tot_cached >> blockbits, tot_granted >> blockbits,
-               tot_pending >> blockbits, osfs->os_bfree, osfs->os_bavail);
+               " pending "LPU64" free "LPU64" avail "LPU64"\n",
+               filter->fo_tot_dirty, filter->fo_tot_granted,
+               filter->fo_tot_pending,
+               osfs->os_bfree << blockbits, osfs->os_bavail << blockbits);
+
+        filter_grant_sanity_check(obd, __FUNCTION__);
 
         osfs->os_bavail -= min(osfs->os_bavail,
-                               (tot_cached +tot_pending +osfs->os_bsize -1) >>
-                                        blockbits);
+                               (filter->fo_tot_dirty + filter->fo_tot_pending +
+                                osfs->os_bsize -1) >> blockbits);
 
         RETURN(rc);
 }
@@ -2312,7 +2327,7 @@ static struct llog_operations filter_size_orig_logops = {
 };
 
 static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt,
-                            int count, struct llog_logid *logid)
+                            int count, struct llog_catid *logid)
 {
         struct llog_ctxt *ctxt;
         int rc;
index 93379d8..6203418 100644 (file)
@@ -121,7 +121,7 @@ int filter_preprw(int cmd, struct obd_export *, struct obdo *, int objcount,
                   struct niobuf_local *, struct obd_trans_info *);
 int filter_commitrw(int cmd, struct obd_export *, struct obdo *, int objcount,
                     struct obd_ioobj *, int niocount, struct niobuf_local *,
-                    struct obd_trans_info *);
+                    struct obd_trans_info *, int rc);
 int filter_brw(int cmd, struct obd_export *, struct obdo *,
               struct lov_stripe_md *, obd_count oa_bufs, struct brw_page *,
               struct obd_trans_info *);
@@ -130,7 +130,8 @@ void flip_into_page_cache(struct inode *inode, struct page *new_page);
 /* filter_io_*.c */
 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
                           struct obd_ioobj *obj, int niocount,
-                          struct niobuf_local *res, struct obd_trans_info *oti);
+                          struct niobuf_local *res, struct obd_trans_info *oti,
+                          int rc);
 obd_size filter_grant_space_left(struct obd_export *exp);
 long filter_grant(struct obd_export *exp, obd_size current_grant,
                   obd_size want, obd_size fs_space_left);
index ad4298c..7e305f5 100644 (file)
@@ -108,6 +108,8 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
         struct obd_device *obd = exp->exp_obd;
         ENTRY;
 
+        LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
+
         if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) !=
                                         (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) {
                 oa->o_valid &= ~OBD_MD_FLGRANT;
@@ -122,8 +124,8 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
          * leave this here in case there is a large error in accounting. */
         CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ?
                D_ERROR : D_CACHE,
-               "%s: cli %s reports granted: "LPU64" dropped: %u, local: %lu\n",
-               obd->obd_name, exp->exp_client_uuid.uuid, oa->o_grant,
+               "%s: cli %s/%p reports grant: "LPU64" dropped: %u, local: %lu\n",
+               obd->obd_name, exp->exp_client_uuid.uuid, exp, oa->o_grant,
                oa->o_dropped, fed->fed_grant);
 
         /* Update our accounting now so that statfs takes it into account.
@@ -132,14 +134,14 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
          * on fed_dirty however. */
         obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty;
         if (fed->fed_grant < oa->o_dropped) {
-                CERROR("%s: cli %s reports %u dropped > fed_grant %lu\n",
-                       obd->obd_name, exp->exp_client_uuid.uuid,
+                CERROR("%s: cli %s/%p reports %u dropped > fed_grant %lu\n",
+                       obd->obd_name, exp->exp_client_uuid.uuid, exp,
                        oa->o_dropped, fed->fed_grant);
                 oa->o_dropped = 0;
         }
         if (obd->u.filter.fo_tot_granted < oa->o_dropped) {
-                CERROR("%s: cli %s reports %u dropped > tot_granted "LPU64"\n",
-                       obd->obd_name, exp->exp_client_uuid.uuid,
+                CERROR("%s: cli %s/%p reports %u dropped > tot_grant "LPU64"\n",
+                       obd->obd_name, exp->exp_client_uuid.uuid, exp,
                        oa->o_dropped, obd->u.filter.fo_tot_granted);
                 oa->o_dropped = 0;
         }
@@ -163,6 +165,8 @@ obd_size filter_grant_space_left(struct obd_export *exp)
         obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0;
         int rc, statfs_done = 0;
 
+        LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
+
         if (time_before(obd->obd_osfs_age, jiffies - HZ)) {
 restat:
                 rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, jiffies + 1);
@@ -191,10 +195,10 @@ restat:
                 if (left < tot_granted - obd->u.filter.fo_tot_pending &&
                     time_after(jiffies, next)) {
                         spin_unlock(&obd->obd_osfs_lock);
-                        CERROR("%s: cli %s granted "LPU64" more than available "
+                        CERROR("%s: cli %s/%p grant "LPU64" > available "
                                LPU64" and pending "LPU64"\n", obd->obd_name,
-                               exp->exp_client_uuid.uuid, tot_granted, left,
-                               obd->u.filter.fo_tot_pending);
+                               exp->exp_client_uuid.uuid, exp, tot_granted,
+                               left, obd->u.filter.fo_tot_pending);
                         if (next == 0)
                                 portals_debug_dumplog();
                         next = jiffies + 20 * HZ;
@@ -203,11 +207,11 @@ restat:
                 left = 0;
         }
 
-        CDEBUG(D_CACHE, "%s: cli %s free: "LPU64" avail: "LPU64" grant "LPU64
+        CDEBUG(D_CACHE, "%s: cli %s/%p free: "LPU64" avail: "LPU64" grant "LPU64
                " left: "LPU64" pending: "LPU64"\n", obd->obd_name,
-               exp->exp_client_uuid.uuid, obd->obd_osfs.os_bfree << blockbits,
-               avail << blockbits, tot_granted, left,
-               obd->u.filter.fo_tot_pending);
+               exp->exp_client_uuid.uuid, exp,
+               obd->obd_osfs.os_bfree << blockbits, avail << blockbits,
+               tot_granted, left, obd->u.filter.fo_tot_pending);
 
         return left;
 }
@@ -224,6 +228,8 @@ long filter_grant(struct obd_export *exp, obd_size current_grant,
         int blockbits = obd->u.filter.fo_sb->s_blocksize_bits;
         __u64 grant = 0;
 
+        LASSERT_SPIN_LOCKED(&obd->obd_osfs_lock);
+
         /* Grant some fraction of the client's requested grant space so that
          * they are not always waiting for write credits (not all of it to
          * avoid overgranting in face of multiple RPCs in flight).  This
@@ -246,12 +252,12 @@ long filter_grant(struct obd_export *exp, obd_size current_grant,
                 }
         }
 
-        CDEBUG(D_CACHE,"%s: cli %s wants: "LPU64" granting: "LPU64"\n",
-               obd->obd_name, exp->exp_client_uuid.uuid, want, grant);
+        CDEBUG(D_CACHE,"%s: cli %s/%p wants: "LPU64" granting: "LPU64"\n",
+               obd->obd_name, exp->exp_client_uuid.uuid, exp, want, grant);
         CDEBUG(D_CACHE,
-               "%s: cli %s tot cached:"LPU64" granted:"LPU64
+               "%s: cli %s/%p tot cached:"LPU64" granted:"LPU64
                " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid,
-               obd->u.filter.fo_tot_dirty,
+               exp, obd->u.filter.fo_tot_dirty,
                obd->u.filter.fo_tot_granted, obd->obd_num_exports);
 
         return grant;
@@ -426,6 +432,8 @@ static int filter_grant_check(struct obd_export *exp, int objcount,
         unsigned long used = 0, ungranted = 0, using;
         int i, rc = -ENOSPC, obj, n = 0, mask = D_CACHE;
 
+        LASSERT_SPIN_LOCKED(&exp->exp_obd->obd_osfs_lock);
+
         for (obj = 0; obj < objcount; obj++) {
                 for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) {
                         int tmp, bytes;
@@ -440,10 +448,10 @@ static int filter_grant_check(struct obd_export *exp, int objcount,
                         if (rnb[n].flags & OBD_BRW_FROM_GRANT) {
                                 if (fed->fed_grant < used + bytes) {
                                         CDEBUG(D_CACHE,
-                                               "%s: cli %s claims %ld+%d GRANT,"
-                                               " no such grant %lu, idx %d\n",
+                                               "%s: cli %s/%p claims %ld+%d "
+                                               "GRANT, real grant %lu idx %d\n",
                                                exp->exp_obd->obd_name,
-                                               exp->exp_client_uuid.uuid,
+                                               exp->exp_client_uuid.uuid, exp,
                                                used, bytes, fed->fed_grant, n);
                                         mask = D_ERROR;
                                 } else {
@@ -472,9 +480,9 @@ static int filter_grant_check(struct obd_export *exp, int objcount,
                          * ignore this error. */
                         lnb[n].rc = -ENOSPC;
                         rnb[n].flags &= OBD_BRW_GRANTED;
-                        CDEBUG(D_CACHE, "%s: cli %s idx %d no space for %d\n",
+                        CDEBUG(D_CACHE,"%s: cli %s/%p idx %d no space for %d\n",
                                exp->exp_obd->obd_name,
-                               exp->exp_client_uuid.uuid, n, bytes);
+                               exp->exp_client_uuid.uuid, exp, n, bytes);
                 }
         }
 
@@ -488,8 +496,8 @@ static int filter_grant_check(struct obd_export *exp, int objcount,
         exp->exp_obd->u.filter.fo_tot_pending += used;
 
         CDEBUG(mask,
-               "%s: cli %s used: %lu ungranted: %lu grant: %lu dirty: %lu\n",
-               exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, used,
+               "%s: cli %s/%p used: %lu ungranted: %lu grant: %lu dirty: %lu\n",
+               exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, used,
                ungranted, fed->fed_grant, fed->fed_dirty);
 
         /* Rough calc in case we don't refresh cached statfs data */
@@ -501,8 +509,8 @@ static int filter_grant_check(struct obd_export *exp, int objcount,
                 exp->exp_obd->obd_osfs.os_bavail = 0;
 
         if (fed->fed_dirty < used) {
-                CERROR("%s: cli %s claims used %lu > fed_dirty %lu\n",
-                       exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
+                CERROR("%s: cli %s/%p claims used %lu > fed_dirty %lu\n",
+                       exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,
                        used, fed->fed_dirty);
                 used = fed->fed_dirty;
         }
@@ -668,7 +676,7 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
 static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
                                 int objcount, struct obd_ioobj *obj,
                                 int niocount, struct niobuf_local *res,
-                                struct obd_trans_info *oti)
+                                struct obd_trans_info *oti, int rc)
 {
         struct obd_ioobj *o;
         struct niobuf_local *lnb;
@@ -692,9 +700,10 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
                         page_cache_release(lnb->page);
                 }
         }
+
         if (res->dentry != NULL)
                 f_dput(res->dentry);
-        RETURN(0);
+        RETURN(rc);
 }
 
 void flip_into_page_cache(struct inode *inode, struct page *new_page)
@@ -769,14 +778,14 @@ void filter_grant_commit(struct obd_export *exp, int niocount,
 
 int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
                     int objcount, struct obd_ioobj *obj, int niocount,
-                    struct niobuf_local *res, struct obd_trans_info *oti)
+                    struct niobuf_local *res, struct obd_trans_info *oti,int rc)
 {
         if (cmd == OBD_BRW_WRITE)
                 return filter_commitrw_write(exp, oa, objcount, obj, niocount,
-                                             res, oti);
+                                             res, oti, rc);
         if (cmd == OBD_BRW_READ)
                 return filter_commitrw_read(exp, oa, objcount, obj, niocount,
-                                            res, oti);
+                                            res, oti, rc);
         LBUG();
         return -EPROTO;
 }
@@ -826,7 +835,7 @@ int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
                 kunmap(pga[i].pg);
         }
 
-        ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
+        ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti, ret);
 
 out:
         if (lnb)
index 8c0ad36..6ab28db 100644 (file)
@@ -225,7 +225,8 @@ static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
 
 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
                           struct obd_ioobj *obj, int niocount,
-                          struct niobuf_local *res, struct obd_trans_info *oti)
+                          struct niobuf_local *res, struct obd_trans_info *oti,
+                          int rc)
 {
         struct obd_device *obd = exp->exp_obd;
         struct obd_run_ctxt saved;
@@ -234,7 +235,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
         struct iattr iattr = { 0 };
         struct kiobuf *iobuf;
         struct inode *inode = NULL;
-        int rc = 0, i, n, cleanup_phase = 0, err;
+        int i, n, cleanup_phase = 0, err;
         unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
         void *wait_handle;
         ENTRY;
@@ -242,6 +243,9 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
         LASSERT(objcount == 1);
         LASSERT(current->journal_info == NULL);
 
+        if (rc != 0)
+                GOTO(cleanup, rc);
+
         rc = alloc_kiovec(1, &iobuf);
         if (rc)
                 GOTO(cleanup, rc);
index 254a3fb..852aeaf 100644 (file)
@@ -82,7 +82,9 @@ static int filter_lvbo_init(struct ldlm_resource *res)
         f_dput(dentry);
 
         lvb->lvb_size = dentry->d_inode->i_size;
-        lvb->lvb_time = LTIME_S(dentry->d_inode->i_mtime);
+        lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
+        CDEBUG(D_DLMTRACE, "res: "LPU64" initial lvb size: "LPU64", mtime: "
+               LPU64"\n", res->lr_name.name[0], lvb->lvb_size, lvb->lvb_mtime);
 
  out:
         if (oa)
@@ -141,11 +143,11 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
                                lvb->lvb_size, new->lvb_size);
                         lvb->lvb_size = new->lvb_size;
                 }
-                if (new->lvb_time > lvb->lvb_time) {
-                        CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb time: "
+                if (new->lvb_mtime > lvb->lvb_mtime) {
+                        CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb mtime: "
                                LPU64" -> "LPU64"\n", res->lr_name.name[0],
-                               lvb->lvb_time, new->lvb_time);
-                        lvb->lvb_time = new->lvb_time;
+                               lvb->lvb_mtime, new->lvb_mtime);
+                        lvb->lvb_mtime = new->lvb_mtime;
                 }
                 GOTO(out, rc = 0);
         }
@@ -169,9 +171,9 @@ static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
 
         lvb->lvb_size = dentry->d_inode->i_size;
-        lvb->lvb_time = LTIME_S(dentry->d_inode->i_mtime);
-        CDEBUG(D_DLMTRACE, "res: "LPU64" initial lvb size: "LPU64", time: "
-               LPU64"\n", res->lr_name.name[0], lvb->lvb_size, lvb->lvb_time);
+        lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
+        CDEBUG(D_DLMTRACE, "res: "LPU64" disk lvb size: "LPU64", mtime: "
+               LPU64"\n", res->lr_name.name[0], lvb->lvb_size, lvb->lvb_mtime);
         f_dput(dentry);
 
  out:
index 9216ec0..0492fc6 100644 (file)
@@ -170,28 +170,21 @@ int osc_rd_create_low_wm(char *page, char **start, off_t off, int count,
                          int *eof, void *data)
 {
         struct obd_device *obd = data;
-        struct obd_export *exp;
 
-        if (obd == NULL || list_empty(&obd->obd_exports))
+        if (obd == NULL)
                 return 0;
 
-        spin_lock(&obd->obd_dev_lock);
-        exp = list_entry(obd->obd_exports.next, struct obd_export,
-                         exp_obd_chain);
-        spin_unlock(&obd->obd_dev_lock);
-
         return snprintf(page, count, "%d\n",
-                        exp->exp_osc_data.oed_oscc.oscc_kick_barrier);
+                        obd->u.cli.cl_oscc.oscc_kick_barrier);
 }
 
 int osc_wr_create_low_wm(struct file *file, const char *buffer,
                          unsigned long count, void *data)
 {
         struct obd_device *obd = data;
-        struct obd_export *exp;
         int val, rc;
 
-        if (obd == NULL || list_empty(&obd->obd_exports))
+        if (obd == NULL)
                 return 0;
 
         rc = lprocfs_write_helper(buffer, count, &val);
@@ -202,9 +195,7 @@ int osc_wr_create_low_wm(struct file *file, const char *buffer,
                 return -ERANGE;
 
         spin_lock(&obd->obd_dev_lock);
-        exp = list_entry(obd->obd_exports.next, struct obd_export,
-                         exp_obd_chain);
-        exp->exp_osc_data.oed_oscc.oscc_kick_barrier = val;
+        obd->u.cli.cl_oscc.oscc_kick_barrier = val;
         spin_unlock(&obd->obd_dev_lock);
 
         return count;
@@ -214,28 +205,21 @@ int osc_rd_create_count(char *page, char **start, off_t off, int count,
                         int *eof, void *data)
 {
         struct obd_device *obd = data;
-        struct obd_export *exp;
 
-        if (obd == NULL || list_empty(&obd->obd_exports))
+        if (obd == NULL)
                 return 0;
 
-        spin_lock(&obd->obd_dev_lock);
-        exp = list_entry(obd->obd_exports.next, struct obd_export,
-                         exp_obd_chain);
-        spin_unlock(&obd->obd_dev_lock);
-
         return snprintf(page, count, "%d\n",
-                        exp->exp_osc_data.oed_oscc.oscc_grow_count);
+                        obd->u.cli.cl_oscc.oscc_grow_count);
 }
 
 int osc_wr_create_count(struct file *file, const char *buffer,
                         unsigned long count, void *data)
 {
         struct obd_device *obd = data;
-        struct obd_export *exp;
         int val, rc;
 
-        if (obd == NULL || list_empty(&obd->obd_exports))
+        if (obd == NULL)
                 return 0;
 
         rc = lprocfs_write_helper(buffer, count, &val);
@@ -245,11 +229,7 @@ int osc_wr_create_count(struct file *file, const char *buffer,
         if (val < 0)
                 return -ERANGE;
 
-        spin_lock(&obd->obd_dev_lock);
-        exp = list_entry(obd->obd_exports.next, struct obd_export,
-                         exp_obd_chain);
-        exp->exp_osc_data.oed_oscc.oscc_grow_count = val;
-        spin_unlock(&obd->obd_dev_lock);
+        obd->u.cli.cl_oscc.oscc_grow_count = val;
 
         return count;
 }
@@ -258,36 +238,24 @@ int osc_rd_prealloc_next_id(char *page, char **start, off_t off, int count,
                             int *eof, void *data)
 {
         struct obd_device *obd = data;
-        struct obd_export *exp;
 
-        if (obd == NULL || list_empty(&obd->obd_exports))
+        if (obd == NULL)
                 return 0;
 
-        spin_lock(&obd->obd_dev_lock);
-        exp = list_entry(obd->obd_exports.next, struct obd_export,
-                         exp_obd_chain);
-        spin_unlock(&obd->obd_dev_lock);
-
         return snprintf(page, count, LPU64"\n",
-                        exp->exp_osc_data.oed_oscc.oscc_next_id);
+                        obd->u.cli.cl_oscc.oscc_next_id);
 }
 
 int osc_rd_prealloc_last_id(char *page, char **start, off_t off, int count,
                             int *eof, void *data)
 {
         struct obd_device *obd = data;
-        struct obd_export *exp;
 
-        if (obd == NULL || list_empty(&obd->obd_exports))
+        if (obd == NULL)
                 return 0;
 
-        spin_lock(&obd->obd_dev_lock);
-        exp = list_entry(obd->obd_exports.next, struct obd_export,
-                         exp_obd_chain);
-        spin_unlock(&obd->obd_dev_lock);
-
         return snprintf(page, count, LPU64"\n",
-                        exp->exp_osc_data.oed_oscc.oscc_last_id);
+                        obd->u.cli.cl_oscc.oscc_last_id);
 }
 
 static struct lprocfs_vars lprocfs_obd_vars[] = {
index 845b306..6d3b80f 100644 (file)
@@ -110,7 +110,7 @@ static int oscc_internal_create(struct osc_creator *oscc)
         oscc->oscc_flags |= OSCC_FLAG_CREATING;
         spin_unlock(&oscc->oscc_lock);
 
-        request = ptlrpc_prep_req(class_exp2cliimp(oscc->oscc_exp), OST_CREATE,
+        request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import, OST_CREATE,
                                   1, &size, NULL);
         if (request == NULL) {
                 spin_lock(&oscc->oscc_lock);
@@ -163,8 +163,8 @@ static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
         ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC);
         spin_unlock(&oscc->oscc_lock);
 
-        osc_invalid = class_exp2cliimp(oscc->oscc_exp)->imp_invalid;
-
+        osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
+                      
         return have_objs || ost_full || osc_invalid;
 }
 
@@ -186,7 +186,7 @@ static int oscc_precreate(struct osc_creator *oscc, int wait)
         if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC))
                 rc = -ENOSPC;
 
-        if (class_exp2cliimp(oscc->oscc_exp)->imp_invalid)
+        if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)
                 rc = -EIO;
 
         RETURN(rc);
@@ -207,7 +207,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
         struct lov_stripe_md *lsm;
-        struct osc_creator *oscc = &exp->u.eu_osc_data.oed_oscc;
+        struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
         int try_again = 1, rc = 0;
         ENTRY;
         LASSERT(oa);
@@ -236,7 +236,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 oa->o_valid |= OBD_MD_FLID;
                 oa->o_id = oscc->oscc_next_id - 1;
 
-                rc = osc_real_create(oscc->oscc_exp, oa, ea, NULL);
+                rc = osc_real_create(exp, oa, ea, NULL);
 
                 spin_lock(&oscc->oscc_lock);
                 if (rc == -ENOSPC)
@@ -262,8 +262,10 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 rc = l_wait_event(oscc->oscc_waitq, !oscc_recovering(oscc),
                                   &lwi);
                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
-                if (rc == -ETIMEDOUT)
+                if (rc == -ETIMEDOUT) {
+                        CDEBUG(D_HA, "%p: timed out waiting for recovery\n", oscc);
                         RETURN(rc);
+                }
                 CDEBUG(D_HA, "%p: oscc recovery over, waking up\n", oscc);
         }
         
@@ -295,26 +297,27 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
-void oscc_init(struct obd_export *exp)
+void oscc_init(struct obd_device *obd)
 {
-        struct osc_export_data *oed;
+        struct osc_creator *oscc;
 
-        if (exp == NULL)
+        if (obd == NULL)
                 return;
 
-        oed = &exp->exp_osc_data;
-        memset(oed, 0, sizeof(*oed));
-        INIT_LIST_HEAD(&oed->oed_oscc.oscc_list);
-        init_waitqueue_head(&oed->oed_oscc.oscc_waitq);
-        spin_lock_init(&oed->oed_oscc.oscc_lock);
-        oed->oed_oscc.oscc_exp = exp;
-        oed->oed_oscc.oscc_kick_barrier = 100;
-        oed->oed_oscc.oscc_grow_count = 2000;
-        oed->oed_oscc.oscc_initial_create_count = 2000;
-
-        oed->oed_oscc.oscc_next_id = 2;
-        oed->oed_oscc.oscc_last_id = 1;
-        oed->oed_oscc.oscc_flags |= OSCC_FLAG_RECOVERING;
+        oscc = &obd->u.cli.cl_oscc;
+
+        memset(oscc, 0, sizeof(*oscc));
+        INIT_LIST_HEAD(&oscc->oscc_list);
+        init_waitqueue_head(&oscc->oscc_waitq);
+        spin_lock_init(&oscc->oscc_lock);
+        oscc->oscc_obd = obd;
+        oscc->oscc_kick_barrier = 100;
+        oscc->oscc_grow_count = 2000;
+        oscc->oscc_initial_create_count = 2000;
+
+        oscc->oscc_next_id = 2;
+        oscc->oscc_last_id = 1;
+        oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
         /* XXX the export handle should give the oscc the last object */
         /* oed->oed_oscc.oscc_last_id = exph->....; */
 }
diff --git