Whamcloud - gitweb
merge b_devel into HEAD (20030703)
authorpschwan <pschwan>
Thu, 3 Jul 2003 20:06:39 +0000 (20:06 +0000)
committerpschwan <pschwan>
Thu, 3 Jul 2003 20:06:39 +0000 (20:06 +0000)
83 files changed:
lnet/libcfs/module.c
lustre/cobd/cache_obd.c
lustre/doc/lconf.lyx
lustre/include/linux/lustre_ha.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_import.h
lustre/include/linux/lustre_lib.h
lustre/include/linux/lustre_lite.h
lustre/include/linux/lustre_net.h
lustre/include/linux/lustre_otree.h [new file with mode: 0644]
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/include/linux/obd_support.h
lustre/kernel_patches/patches/netconsole_sysrq.patch [new file with mode: 0644]
lustre/kernel_patches/patches/tg3_netconsole.patch [new file with mode: 0644]
lustre/kernel_patches/pc/netconsole_sysrq.pc [new file with mode: 0644]
lustre/kernel_patches/pc/tg3_netconsole.pc [new file with mode: 0644]
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/liblustre/super.c
lustre/llite/dcache.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/iod.c
lustre/llite/llite_internal.h
lustre/llite/lproc_llite.c
lustre/llite/namei.c
lustre/llite/rw.c
lustre/llite/super.c
lustre/llite/super25.c
lustre/lov/lov_obd.c
lustre/lov/lov_pack.c
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_open.c
lustre/obdclass/Makefile.am
lustre/obdclass/class_obd.c
lustre/obdclass/genops.c
lustre/obdclass/lprocfs_status.c
lustre/obdclass/otree.c [new file with mode: 0644]
lustre/obdclass/statfs_pack.c
lustre/obdecho/echo.c
lustre/obdecho/echo_client.c
lustre/obdfilter/filter.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/portals/libcfs/module.c
lustre/ptlbd/client.c
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/ptlrpc_lib.c
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/recover.c
lustre/scripts/lustre.spec.in
lustre/tests/.cvsignore
lustre/tests/Makefile.am
lustre/tests/getdents.c [new file with mode: 0644]
lustre/tests/llmount.sh
lustre/tests/llmountcleanup.sh
lustre/tests/local.sh
lustre/tests/lov.sh
lustre/tests/lstiming.sh [new file with mode: 0644]
lustre/tests/mkdirdeep.c
lustre/tests/o_directory.c [new file with mode: 0644]
lustre/tests/recovery-small.sh
lustre/tests/runas.c
lustre/tests/rundbench
lustre/tests/sanity.sh
lustre/tests/utime.c
lustre/utils/Makefile.am
lustre/utils/lactive
lustre/utils/lconf
lustre/utils/lctl.c
lustre/utils/lfind.c
lustre/utils/lmc
lustre/utils/load_ldap.sh
lustre/utils/obd.c

index 5e3fcb5..14cc325 100644 (file)
@@ -429,6 +429,7 @@ static int kportal_ioctl(struct inode *inode, struct file *file,
                         return (-EINVAL);
 
                 err = PtlFailNid (*nip, data->ioc_nid, data->ioc_count);
+                kportal_put_ni (data->ioc_nal);
                 break;
         }
 
index c96b2ad..5efb545 100644 (file)
@@ -144,19 +144,18 @@ cobd_get_info(struct lustre_handle *conn, obd_count keylen,
         return obd_get_info(&cobd->cobd_target, keylen, key, vallen, val);
 }
 
-static int
-cobd_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
+static int cobd_statfs(struct obd_export *exp, struct obd_statfs *osfs)
 {
-        struct obd_device *obd = class_conn2obd(conn);
-        struct cache_obd  *cobd;
+        struct obd_export *cobd_exp;
+        int rc;
 
-        if (obd == NULL) {
-                CERROR("invalid client cookie "LPX64"\n", conn->cookie);
+        if (exp->exp_obd == NULL)
                 return -EINVAL;
-        }
 
-        cobd = &obd->u.cobd;
-        return (obd_statfs (&cobd->cobd_target, osfs));
+        cobd_exp = class_conn2export(&exp->exp_obd->u.cobd.cobd_target);
+        rc = obd_statfs(cobd_exp, osfs);
+        class_export_put(cobd_exp);
+        return rc;
 }
 
 static int
@@ -208,7 +207,7 @@ cobd_close(struct lustre_handle *conn, struct obdo *oa,
         return (obd_close (&cobd->cobd_target, oa, lsm, oti));
 }
 
-static int cobd_preprw(int cmd, struct obd_export *exp,
+static int cobd_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
                        int objcount, struct obd_ioobj *obj,
                        int niocount, struct niobuf_remote *nb,
                        struct niobuf_local *res, void **desc_private,
@@ -224,7 +223,7 @@ static int cobd_preprw(int cmd, struct obd_export *exp,
                 return -EOPNOTSUPP;
 
         cobd_exp = class_conn2export(&exp->exp_obd->u.cobd.cobd_target);
-        rc = obd_preprw(cmd, cobd_exp, objcount, obj, niocount, nb, res,
+        rc = obd_preprw(cmd, cobd_exp, obdo, objcount, obj, niocount, nb, res,
                         desc_private, oti);
         class_export_put(cobd_exp);
         return rc;
index 85c670b..0b1416f 100644 (file)
@@ -48,7 +48,7 @@ lconf\SpecialChar ~
 DESCRIPTION
 \layout Standard
 
-This program configures a node following directives in the <XML-config-file>..
+This program configures a node following directives in the <XML-config-file>.
  There will be single configuration file for all the nodes in a single cluster.
  This file should be distributed to all the nodes in the cluster or kept
  in a location accessible to all the nodes.
@@ -140,13 +140,14 @@ node_name
 \layout Description
 
 --group\SpecialChar ~
-<arg> The group of devices tol cleanup/configure.
+<arg> The group of devices to cleanup/configure.
 \layout Description
 
 --nomod Only setup devices and services, do not load modules.
 \layout Description
 
 --noexec,-n Don't do anything, but print what would happen.
+ Useful for debugging purposes.
 \layout Description
 
 --verbose,-v Be verbose and show actions while going along.
@@ -213,20 +214,14 @@ When used in conjunction with cleanup, services are torn down up to a certain
  will cause lconf to load the lustre modules from this soure tree.
 \layout Description
 
---portals=src_dir Portals source directory.
- If this is a relative path, it is assumed to be relative to Lustre source
- tree location.
-\layout Description
-
 --ptldebug\SpecialChar ~
 debug\SpecialChar ~
 level This options can be used to set the required debug
- level 
+ level.
 \layout Description
 
 --subsystem\SpecialChar ~
-<arg> Set the portals debug subsystem 
+<arg> Set the portals debug subsystem.
 \layout Subsection
 
 EXAMPLES
index fffbd60..8493f91 100644 (file)
@@ -12,8 +12,7 @@ struct ptlrpc_request;
 
 void ptlrpc_run_failed_import_upcall(struct obd_import *imp);
 void ptlrpc_run_recovery_over_upcall(struct obd_device *obd);
-int ptlrpc_reconnect_import(struct obd_import *imp,
-                            struct ptlrpc_request **reqptr);
+int ptlrpc_reconnect_import(struct obd_import *imp);
 int ptlrpc_replay(struct obd_import *imp);
 int ptlrpc_resend(struct obd_import *imp);
 void ptlrpc_free_committed(struct obd_import *imp);
index b49fd16..f4a5f2d 100644 (file)
@@ -124,7 +124,7 @@ extern struct obd_uuid lctl_fake_uuid;
 #define LUSTRE_CONN_NEW          1
 #define LUSTRE_CONN_CON          2
 #define LUSTRE_CONN_NOTCONN      3
-#define LUSTRE_CONN_RECOV      4
+#define LUSTRE_CONN_RECOVER      4
 #define LUSTRE_CONN_FULL         5
 
 /* packet types */
@@ -269,14 +269,14 @@ struct obdo {
         obd_time                o_mtime;
         obd_time                o_ctime;
         obd_size                o_size;
-        obd_blocks              o_blocks;
-        obd_rdev                o_rdev;
+        obd_blocks              o_blocks; /* brw: clients sent cached bytes */
+        obd_rdev                o_rdev; /* brw: clients/servers sent grant */
         obd_blksize             o_blksize;      /* optimal IO blocksize */
         obd_mode                o_mode;
         obd_uid                 o_uid;
         obd_gid                 o_gid;
         obd_flag                o_flags;
-        obd_count               o_nlink;
+        obd_count               o_nlink; /* brw: checksum */
         obd_count               o_generation;
         obd_flag                o_valid;        /* hot fields in this obdo */
         obd_flag                o_obdflags;
@@ -345,12 +345,13 @@ extern void lustre_swab_obd_statfs (struct obd_statfs *os);
 
 /* ost_body.data values for OST_BRW */
 
-#define OBD_BRW_READ    0x01
-#define OBD_BRW_WRITE   0x02
-#define OBD_BRW_RWMASK  (OBD_BRW_READ | OBD_BRW_WRITE)
-#define OBD_BRW_CREATE  0x04
-#define OBD_BRW_SYNC    0x08
-#define OBD_BRW_CHECK   0x10
+#define OBD_BRW_READ       0x01
+#define OBD_BRW_WRITE      0x02
+#define OBD_BRW_RWMASK     (OBD_BRW_READ | OBD_BRW_WRITE)
+#define OBD_BRW_CREATE     0x04
+#define OBD_BRW_SYNC       0x08
+#define OBD_BRW_CHECK      0x10
+#define OBD_BRW_FROM_GRANT 0x20
 
 #define OBD_OBJECT_EOF 0xffffffffffffffffULL
 
index c1af641..9dc0a92 100644 (file)
@@ -36,7 +36,8 @@ struct obd_import {
         __u64                     imp_peer_committed_transno;
         struct obd_uuid           imp_target_uuid; /* XXX -> lustre_name */
         struct lustre_handle      imp_remote_handle;
-
+        unsigned long             imp_next_ping;
+        
         /* Protects flags, level, generation, *_list */
         spinlock_t                imp_lock;
 
index 57e9620..b18e2d2 100644 (file)
@@ -538,10 +538,10 @@ static inline void obd_ioctl_freedata(char *buf, int len)
 #define CHECKSUM_BULK 0
 
 #if CHECKSUM_BULK
-static inline void ost_checksum(__u64 *cksum, void *addr, int len)
+static inline void ost_checksum(obd_count *cksum, void *addr, int len)
 {
         unsigned char *ptr = (unsigned char *)addr;
-        __u64          sum = 0;
+        obd_count          sum = 0;
 
         /* very stupid, but means I don't have to think about byte order */
         while (len-- > 0)
index 17ea754..81184e7 100644 (file)
@@ -52,19 +52,6 @@ struct ll_dentry_data {
 
 #define ll_d2d(dentry) ((struct ll_dentry_data*) dentry->d_fsdata)
 
-struct ll_dirty_offsets {
-        rb_root_t       do_root;
-        spinlock_t      do_lock;
-        unsigned long   do_num_dirty;
-};
-
-void ll_lldo_init(struct ll_dirty_offsets *lldo);
-void ll_record_dirty(struct inode *inode, unsigned long offset);
-void ll_remove_dirty(struct inode *inode, unsigned long start,
-                     unsigned long end);
-int ll_find_dirty(struct ll_dirty_offsets *lldo, unsigned long *start,
-                  unsigned long *end);
-int ll_farthest_dirty(struct ll_dirty_offsets *lldo, unsigned long *farthest);
 extern struct file_operations ll_pgcache_seq_fops;
 
 struct ll_inode_info {
@@ -74,7 +61,6 @@ struct ll_inode_info {
         struct list_head        lli_read_extents;
         loff_t                  lli_maxbytes;
         spinlock_t              lli_read_extent_lock;
-        struct ll_dirty_offsets lli_dirty;
         unsigned long           lli_flags;
 #define LLI_F_HAVE_SIZE_LOCK    0
 
@@ -258,8 +244,9 @@ do {                                                                           \
         down(&ll_d2d(de)->lld_it_sem);                                         \
         LASSERT(de->d_it == NULL);                                             \
         de->d_it = it;                                                         \
-        CDEBUG(D_DENTRY, "D_IT DOWN dentry %p fsdata %p intent: %s sem %d\n",  \
-               de, ll_d2d(de), ldlm_it2str(de->d_it->it_op),                   \
+        CDEBUG(D_DENTRY,                                                       \
+               "D_IT DOWN dentry %p fsdata %p intent: %p %s sem %d\n",         \
+               de, ll_d2d(de), de->d_it, ldlm_it2str(de->d_it->it_op),         \
                atomic_read(&(ll_d2d(de)->lld_it_sem.count)));                  \
 } while(0)
 
@@ -271,8 +258,8 @@ do {                                                                           \
         LASSERT(it);                                                           \
         LASSERT(it->it_op != IT_RELEASED_MAGIC);                               \
                                                                                \
-        CDEBUG(D_DENTRY, "D_IT UP dentry %p fsdata %p intent: %s\n",           \
-               de, ll_d2d(de), ldlm_it2str(de->d_it->it_op));                  \
+        CDEBUG(D_DENTRY, "D_IT UP dentry %p fsdata %p intent: %p %s\n",        \
+               de, ll_d2d(de), de->d_it, ldlm_it2str(de->d_it->it_op));        \
         de->d_it = NULL;                                                       \
         it->it_op = IT_RELEASED_MAGIC;                                         \
         up(&ll_d2d(de)->lld_it_sem);                                           \
@@ -281,8 +268,7 @@ do {                                                                           \
 #define LL_IT2STR(it) ((it) ? ldlm_it2str((it)->it_op) : "0")
 
 enum {
-         LPROC_LL_DIRTY_PAGES       = 0,
-         LPROC_LL_DIRTY_HITS,
+         LPROC_LL_DIRTY_HITS = 0,
          LPROC_LL_DIRTY_MISSES,
          LPROC_LL_WB_WRITEPAGE,
          LPROC_LL_WB_PRESSURE,
index f71802c..ac87d7f 100644 (file)
@@ -278,19 +278,20 @@ struct ptlrpc_request {
 /* Spare the preprocessor, spoil the bugs. */
 #define FLAG(field, str) (field ? str : "")
 
-#define DEBUG_REQ_FLAGS(req)                                                    \
-        ((req->rq_phase == RQ_PHASE_NEW) ? "New" :                              \
-         (req->rq_phase == RQ_PHASE_RPC) ? "Rpc" :                              \
-         (req->rq_phase == RQ_PHASE_INTERPRET) ? "Interpret" :                  \
-         (req->rq_phase == RQ_PHASE_COMPLETE) ? "Complete" : "?phase?"),        \
-        FLAG(req->rq_intr, "I"), FLAG(req->rq_replied, "R"),                    \
-        FLAG(req->rq_want_ack, "A"), FLAG(req->rq_err, "E"),                    \
-        FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"),   \
-        FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"),                  \
-        FLAG(req->rq_no_resend, "N"), FLAG(req->rq_resent, "s"),                \
+#define DEBUG_REQ_FLAGS(req)                                                   \
+        ((req->rq_phase == RQ_PHASE_NEW) ? "New" :                             \
+         (req->rq_phase == RQ_PHASE_RPC) ? "RPC" :                             \
+         (req->rq_phase == RQ_PHASE_INTERPRET) ? "Interpret" :                 \
+         (req->rq_phase == RQ_PHASE_COMPLETE) ? "Complete" :                   \
+         (req->rq_phase == RQ_PHASE_BULK) ? "Bulk" : "?phase?"),               \
+        FLAG(req->rq_intr, "I"), FLAG(req->rq_replied, "R"),                   \
+        FLAG(req->rq_want_ack, "A"), FLAG(req->rq_err, "E"),                   \
+        FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"),  \
+        FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"),                 \
+        FLAG(req->rq_no_resend, "N"), FLAG(req->rq_resent, "s"),               \
         FLAG(req->rq_no_recov, "n"), FLAG(req->rq_waiting, "W")
 
-#define REQ_FLAGS_FMT "%s%s%s%s%s%s%s%s%s%s%s%s%s"
+#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s%s"
 
 #define DEBUG_REQ(level, req, fmt, args...)                                    \
 do {                                                                           \
@@ -533,4 +534,8 @@ int client_import_connect(struct lustre_handle *conn, struct obd_device *obd,
                           struct obd_uuid *cluuid);
 int client_import_disconnect(struct lustre_handle *conn, int failover);
 
+/* ptlrpc/pinger.c */
+int ptlrpc_pinger_add_import(struct obd_import *imp);
+int ptlrpc_pinger_del_import(struct obd_import *imp);
+
 #endif
diff --git a/lustre/include/linux/lustre_otree.h b/lustre/include/linux/lustre_otree.h
new file mode 100644 (file)
index 0000000..3d8d510
--- /dev/null
@@ -0,0 +1,31 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+#ifndef _LUSTRE_OTREE_H
+#define _LUSTRE_OTREE_H
+
+/* XXX ok, I can't make sense of our header nest right now.. */
+#ifdef __KERNEL__
+#include <linux/rbtree.h>
+#include <linux/spinlock.h>
+
+struct otree {
+        rb_root_t       ot_root;
+        spinlock_t      ot_lock;
+        unsigned long   ot_num_marked;
+};
+#else
+struct otree {
+        unsigned long   lalala;
+};
+#endif
+
+int ot_mark_offset(struct otree *ot, unsigned long offset);
+int ot_clear_extent(struct otree *ot, unsigned long start, unsigned long end);
+int ot_find_marked_extent(struct otree *ot, unsigned long *start,
+                          unsigned long *end);
+int ot_last_marked(struct otree *ot, unsigned long *last);
+unsigned long ot_num_marked(struct otree *ot);
+void ot_init(struct otree *ot);
+
+#endif
index 2fb2c5c..ba848a9 100644 (file)
 #ifndef __OBD_H
 #define __OBD_H
 
+#include <linux/lustre_otree.h>
+
 struct lov_oinfo { /* per-child structure */
         __u64 loi_id;              /* object ID on the target OST */
         struct lustre_handle *loi_handle; /* open file handle for obj on OST */
         int loi_ost_idx;           /* OST stripe index in lmd_objects array */
+        /* tracking offsets per file, per stripe.. */
+        struct otree *loi_dirty_ot; /* lets lov stack on osc */
+        struct otree loi_dirty_ot_inline;
 };
 
 struct lov_stripe_md {
@@ -91,9 +96,13 @@ struct filter_obd {
         struct file_operations *fo_fop;
         struct inode_operations *fo_iop;
         struct address_space_operations *fo_aops;
+
         struct list_head     fo_export_list;
         spinlock_t           fo_fddlock; /* protect setting dentry->d_fsdata */
         int                  fo_subdir_count;
+        spinlock_t           fo_grant_lock;       /* protects tot_granted */
+        obd_size             fo_tot_granted;
+        obd_size             fo_tot_cached;
 };
 
 struct mds_server_data;
@@ -107,6 +116,13 @@ struct client_obd {
         int                  cl_max_mds_easize;
         struct obd_device   *cl_containing_lov;
         kdev_t               cl_sandev;
+        struct semaphore     cl_dirty_sem;
+        obd_size             cl_dirty;  /* both in bytes */
+        obd_size             cl_dirty_granted;
+        /* this is just to keep existing infinitely caching behaviour between 
+         * clients and OSTs that don't have the grant code in yet.. it can 
+         * be yanked once everything speaks grants */
+        char                 cl_ost_can_grant;
 };
 
 struct mds_obd {
@@ -255,6 +271,7 @@ struct obd_device {
         wait_queue_head_t obd_refcount_waitq;
         struct proc_dir_entry *obd_proc_entry;
         struct list_head       obd_exports;
+        int                    obd_num_exports;
         struct list_head       obd_imports;
         struct ldlm_namespace *obd_namespace;
         struct ptlrpc_client   obd_ldlm_client; /* XXX OST/MDS only */
@@ -309,7 +326,7 @@ struct obd_ops {
                          struct obd_uuid *cluuid);
         int (*o_disconnect)(struct lustre_handle *conn, int failover);
 
-        int (*o_statfs)(struct lustre_handle *conn, struct obd_statfs *osfs);
+        int (*o_statfs)(struct obd_export *exp, struct obd_statfs *osfs);
         int (*o_syncfs)(struct obd_export *);
         int (*o_packmd)(struct lustre_handle *, struct lov_mds_md **disk_tgt,
                         struct lov_stripe_md *mem_src);
@@ -354,7 +371,7 @@ struct obd_ops {
         int (*o_iterate)(struct lustre_handle *conn,
                          int (*)(obd_id, obd_gr, void *),
                          obd_id *startid, obd_gr group, void *data);
-        int (*o_preprw)(int cmd, struct obd_export *,
+        int (*o_preprw)(int cmd, struct obd_export *, struct obdo *obdo,
                         int objcount, struct obd_ioobj *obj,
                         int niocount, struct niobuf_remote *remote,
                         struct niobuf_local *local, void **desc_private, 
@@ -378,6 +395,17 @@ struct obd_ops {
         int (*o_san_preprw)(int cmd, struct lustre_handle *conn,
                             int objcount, struct obd_ioobj *obj,
                             int niocount, struct niobuf_remote *remote);
+        int (*o_mark_page_dirty)(struct lustre_handle *conn,
+                                 struct lov_stripe_md *ea,
+                                 unsigned long offset);
+        int (*o_clear_dirty_pages)(struct lustre_handle *conn,
+                                   struct lov_stripe_md *ea,
+                                   unsigned long start,
+                                   unsigned long end,
+                                   unsigned long *cleared);
+        int (*o_last_dirty_offset)(struct lustre_handle *conn,
+                                   struct lov_stripe_md *ea,
+                                   unsigned long *offset);
         void (*o_destroy_export)(struct obd_export *export);
 };
 
index e93032a..0c33ceb 100644 (file)
@@ -57,6 +57,7 @@ struct obd_export *class_conn2export(struct lustre_handle *);
 int class_register_type(struct obd_ops *ops, struct lprocfs_vars *, char *nm);
 int class_unregister_type(char *nm);
 int class_name2dev(char *name);
+struct obd_device *class_name2obd(char *name);
 int class_uuid2dev(struct obd_uuid *uuid);
 struct obd_device *class_uuid2obd(struct obd_uuid *uuid);
 
@@ -540,18 +541,15 @@ static inline void obd_destroy_export(struct obd_export *exp)
         EXIT;
 }
 
-static inline int obd_statfs(struct lustre_handle *conn,struct obd_statfs *osfs)
+static inline int obd_statfs(struct obd_export *exp, struct obd_statfs *osfs)
 {
-        struct obd_export *exp;
         int rc;
         ENTRY;
 
-        OBD_CHECK_ACTIVE(conn, exp);
         OBD_CHECK_OP(exp->exp_obd, statfs);
         OBD_COUNTER_INCREMENT(exp->exp_obd, statfs);
 
-        rc = OBP(exp->exp_obd, statfs)(conn, osfs);
-        class_export_put(exp);
+        rc = OBP(exp->exp_obd, statfs)(exp, osfs);
         RETURN(rc);
 }
 
@@ -631,7 +629,7 @@ static inline int obd_brw_async(int cmd, struct lustre_handle *conn,
         RETURN(rc);
 }
 
-static inline int obd_preprw(int cmd, struct obd_export *exp,
+static inline int obd_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
                              int objcount, struct obd_ioobj *obj,
                              int niocount, struct niobuf_remote *remote,
                              struct niobuf_local *local, void **desc_private,
@@ -643,7 +641,7 @@ static inline int obd_preprw(int cmd, struct obd_export *exp,
         OBD_CHECK_OP(exp->exp_obd, preprw);
         OBD_COUNTER_INCREMENT(exp->exp_obd, preprw);
 
-        rc = OBP(exp->exp_obd, preprw)(cmd, exp, objcount, obj, niocount,
+        rc = OBP(exp->exp_obd, preprw)(cmd, exp, obdo, objcount, obj, niocount,
                                        remote, local, desc_private, oti);
         RETURN(rc);
 }
@@ -773,6 +771,53 @@ static inline int obd_san_preprw(int cmd, struct lustre_handle *conn,
         RETURN(rc);
 }
 
+static inline int obd_mark_page_dirty(struct lustre_handle *conn,
+                                      struct lov_stripe_md *lsm,  
+                                      unsigned long offset)
+{
+        struct obd_export *exp;
+        int rc;
+
+        OBD_CHECK_SETUP(conn, exp);
+        OBD_CHECK_OP(exp->exp_obd, mark_page_dirty);
+
+        rc = OBP(exp->exp_obd, mark_page_dirty)(conn, lsm, offset);
+        class_export_put(exp);
+        RETURN(rc);
+}
+
+static inline int obd_clear_dirty_pages(struct lustre_handle *conn,
+                                        struct lov_stripe_md *lsm,  
+                                        unsigned long start,
+                                        unsigned long end,
+                                        unsigned long *cleared)
+{
+        struct obd_export *exp;
+        int rc;
+
+        OBD_CHECK_SETUP(conn, exp);
+        OBD_CHECK_OP(exp->exp_obd, clear_dirty_pages);
+
+        rc = OBP(exp->exp_obd, clear_dirty_pages)(conn, lsm, start, end,
+                                                  cleared);
+        class_export_put(exp);
+        RETURN(rc);
+}
+
+static inline int obd_last_dirty_offset(struct lustre_handle *conn,
+                                      struct lov_stripe_md *lsm,
+                                      unsigned long *offset)
+{
+        struct obd_export *exp;
+        int rc;
+
+        OBD_CHECK_SETUP(conn, exp);
+        OBD_CHECK_OP(exp->exp_obd, last_dirty_offset);
+
+        rc = OBP(exp->exp_obd, last_dirty_offset)(conn, lsm, offset);
+        class_export_put(exp);
+        RETURN(rc);
+}
 
 /* OBD Metadata Support */
 
index aecef05..2a76905 100644 (file)
@@ -38,6 +38,7 @@ extern atomic_t obd_memory;
 extern int obd_memmax;
 extern unsigned long obd_fail_loc;
 extern unsigned long obd_timeout;
+extern unsigned long obd_max_dirty_pages;
 extern char obd_lustre_upcall[128];
 extern unsigned long obd_sync_filter;
 
diff --git a/lustre/kernel_patches/patches/netconsole_sysrq.patch b/lustre/kernel_patches/patches/netconsole_sysrq.patch
new file mode 100644 (file)
index 0000000..b995461
--- /dev/null
@@ -0,0 +1,41 @@
+ 0 files changed
+
+--- linux-2.4.20-rh/drivers/net/netconsole.c~netconsole_sysrq  2003-04-11 14:04:57.000000000 +0800
++++ linux-2.4.20-rh-root/drivers/net/netconsole.c      2003-07-01 11:10:26.000000000 +0800
+@@ -988,7 +988,15 @@ static void netconsole_netdump (struct p
+                       reply.info = 0;
+                       send_netdump_skb(dev, tmp, strlen(tmp), &reply);
+                       break;
+-
++               case COMM_SYSRQ:
++                      Dprintk("got SYSRQ command.\n");
++                      printk("netdump: got SYSRQ command %d \n", req->from);
++                      handle_sysrq(req->from, regs, NULL, NULL);
++                      reply.code = REPLY_SYSRQ;
++                      reply.nr = req->nr;
++                      reply.info = req->from;
++                      send_netdump_skb(dev, tmp, strlen(tmp), &reply);
++                      break;
+               default:
+                       reply.code = REPLY_ERROR;
+                       reply.nr = req->nr;
+--- linux-2.4.20-rh/drivers/net/netconsole.h~netconsole_sysrq  2003-04-11 14:04:57.000000000 +0800
++++ linux-2.4.20-rh-root/drivers/net/netconsole.h      2003-07-01 11:11:29.000000000 +0800
+@@ -42,6 +42,7 @@ enum netdump_commands {
+       COMM_START_NETDUMP_ACK = 7,
+       COMM_GET_REGS = 8,
+       COMM_SHOW_STATE = 9,
++      COMM_SYSRQ=10,
+ };
+ #define NETDUMP_REQ_SIZE (8+4*4)
+@@ -69,6 +70,7 @@ enum netdump_replies {
+       REPLY_REGS = 10,
+       REPLY_MAGIC = 11,
+       REPLY_SHOW_STATE = 12,
++      REPLY_SYSRQ=13,
+ };
+ typedef struct netdump_reply_s {
+
+_
diff --git a/lustre/kernel_patches/patches/tg3_netconsole.patch b/lustre/kernel_patches/patches/tg3_netconsole.patch
new file mode 100644 (file)
index 0000000..267dedd
--- /dev/null
@@ -0,0 +1,247 @@
+ 0 files changed
+
+--- linux-2.4.20-rh/drivers/net/tg3.c~tg3_netconsole   2003-04-11 14:04:56.000000000 +0800
++++ linux-2.4.20-rh-root/drivers/net/tg3.c     2003-07-01 11:27:46.000000000 +0800
+@@ -170,6 +170,10 @@ static void tg3_write_indirect_reg32(str
+       }
+ }
++#ifdef HAVE_POLL_CONTROLLER
++static void     Poll_tg3(struct net_device *);
++#endif
++
+ #define tw32(reg,val)         tg3_write_indirect_reg32(tp,(reg),(val))
+ #define tw32_mailbox(reg, val)        writel(((val) & 0xffffffff), tp->regs + (reg))
+ #define tw16(reg,val)         writew(((val) & 0xffff), tp->regs + (reg))
+@@ -1899,7 +1903,138 @@ static int tg3_vlan_rx(struct tg3 *tp, s
+       return vlan_hwaccel_receive_skb(skb, tp->vlgrp, vlan_tag);
+ }
+ #endif
++/* for netconsole */
++static int upcall_rx_hook(struct net_device *dev)
++{
++      struct tg3 *tp = dev->priv;     
++      u32 work_mask;
++      u32 rx_rcb_ptr = tp->rx_rcb_ptr;
++      u16 hw_idx, sw_idx;
++      int received;
++
++      hw_idx = tp->hw_status->idx[0].rx_producer;
++      sw_idx = rx_rcb_ptr % TG3_RX_RCB_RING_SIZE;
++      work_mask = 0;
++      received = 0;
++      while (sw_idx != hw_idx) {
++              struct tg3_rx_buffer_desc *desc = &tp->rx_rcb[sw_idx];
++              unsigned int len;
++              struct sk_buff *skb;
++              dma_addr_t dma_addr;
++              u32 opaque_key, desc_idx, *post_ptr;
++
++              desc_idx = desc->opaque & RXD_OPAQUE_INDEX_MASK;
++              opaque_key = desc->opaque & RXD_OPAQUE_RING_MASK;
++              if (opaque_key == RXD_OPAQUE_RING_STD) {
++                      dma_addr = pci_unmap_addr(&tp->rx_std_buffers[desc_idx],
++                                                mapping);
++                      skb = tp->rx_std_buffers[desc_idx].skb;
++                      post_ptr = &tp->rx_std_ptr;
++              } else if (opaque_key == RXD_OPAQUE_RING_JUMBO) {
++                      dma_addr = pci_unmap_addr(&tp->rx_jumbo_buffers[desc_idx],
++                                                mapping);
++                      skb = tp->rx_jumbo_buffers[desc_idx].skb;
++                      post_ptr = &tp->rx_jumbo_ptr;
++              }
++              else {
++                      goto next_pkt_nopost;
++              }
++
++              work_mask |= opaque_key;
++
++              if ((desc->err_vlan & RXD_ERR_MASK) != 0 &&
++                  (desc->err_vlan != RXD_ERR_ODD_NIBBLE_RCVD_MII)) {
++              drop_it:
++                      tg3_recycle_rx(tp, opaque_key,
++                                     desc_idx, *post_ptr);
++              drop_it_no_recycle:
++                      /* Other statistics kept track of by card. */
++                      tp->net_stats.rx_dropped++;
++                      goto next_pkt;
++              }
++
++              len = ((desc->idx_len & RXD_LEN_MASK) >> RXD_LEN_SHIFT) - 4; /* omit crc */
++
++              if (len > RX_COPY_THRESHOLD) {
++                      int skb_size;
++
++                      skb_size = tg3_alloc_rx_skb(tp, opaque_key,
++                                                  desc_idx, *post_ptr);
++                      if (skb_size < 0)
++                              goto drop_it;
++
++                      pci_unmap_single(tp->pdev, dma_addr,
++                                       skb_size - tp->rx_offset,
++                                       PCI_DMA_FROMDEVICE);
++
++                      skb_put(skb, len);
++              } else {
++                      struct sk_buff *copy_skb;
++
++                      tg3_recycle_rx(tp, opaque_key,
++                                     desc_idx, *post_ptr);
++
++                      copy_skb = dev_alloc_skb(len + 2);
++                      if (copy_skb == NULL)
++                              goto drop_it_no_recycle;
++
++                      copy_skb->dev = tp->dev;
++                      skb_reserve(copy_skb, 2);
++                      skb_put(copy_skb, len);
++                      pci_dma_sync_single(tp->pdev, dma_addr, len, PCI_DMA_FROMDEVICE);
++                      memcpy(copy_skb->data, skb->data, len);
++
++                      /* We'll reuse the original ring buffer. */
++                      skb = copy_skb;
++              }
++              
++              if ((tp->tg3_flags & TG3_FLAG_RX_CHECKSUMS) &&
++                  (desc->type_flags & RXD_FLAG_TCPUDP_CSUM) &&
++                  (((desc->ip_tcp_csum & RXD_TCPCSUM_MASK)
++                    >> RXD_TCPCSUM_SHIFT) == 0xffff))
++                      skb->ip_summed = CHECKSUM_UNNECESSARY;
++              else
++                      skb->ip_summed = CHECKSUM_NONE;
++
++              skb->protocol = eth_type_trans(skb, tp->dev);
++/*into netconsole driver*/    
++              dev->rx_hook(skb);
++                      kfree_skb(skb);
++              tp->dev->last_rx = jiffies;
++              received++;
++next_pkt:
++              (*post_ptr)++;
++next_pkt_nopost:
++              rx_rcb_ptr++;
++              sw_idx = rx_rcb_ptr % TG3_RX_RCB_RING_SIZE;
++      }
++
++      /* ACK the status ring. */
++      tp->rx_rcb_ptr = rx_rcb_ptr;
++      tw32_mailbox(MAILBOX_RCVRET_CON_IDX_0 + TG3_64BIT_REG_LOW,
++                   (rx_rcb_ptr % TG3_RX_RCB_RING_SIZE));
++      if (tp->tg3_flags & TG3_FLAG_MBOX_WRITE_REORDER)
++              tr32(MAILBOX_RCVRET_CON_IDX_0 + TG3_64BIT_REG_LOW);
++      /* Refill RX ring(s). */
++      if (work_mask & RXD_OPAQUE_RING_STD) {
++              sw_idx = tp->rx_std_ptr % TG3_RX_RING_SIZE;
++              tw32_mailbox(MAILBOX_RCV_STD_PROD_IDX + TG3_64BIT_REG_LOW,
++                           sw_idx);
++              if (tp->tg3_flags & TG3_FLAG_MBOX_WRITE_REORDER)
++                      tr32(MAILBOX_RCV_STD_PROD_IDX + TG3_64BIT_REG_LOW);
++      }
++      if (work_mask & RXD_OPAQUE_RING_JUMBO) {
++              sw_idx = tp->rx_jumbo_ptr % TG3_RX_JUMBO_RING_SIZE;
++              tw32_mailbox(MAILBOX_RCV_JUMBO_PROD_IDX + TG3_64BIT_REG_LOW,
++                           sw_idx);
++              if (tp->tg3_flags & TG3_FLAG_MBOX_WRITE_REORDER)
++                      tr32(MAILBOX_RCV_JUMBO_PROD_IDX + TG3_64BIT_REG_LOW);
++      }
++
++      return received;
++
++}
+ /* The RX ring scheme is composed of multiple rings which post fresh
+  * buffers to the chip, and one special ring the chip uses to report
+  * status back to the host.
+@@ -2006,7 +2141,7 @@ static int tg3_rx(struct tg3 *tp, int bu
+                       /* We'll reuse the original ring buffer. */
+                       skb = copy_skb;
+               }
+-
++              
+               if ((tp->tg3_flags & TG3_FLAG_RX_CHECKSUMS) &&
+                   (desc->type_flags & RXD_FLAG_TCPUDP_CSUM) &&
+                   (((desc->ip_tcp_csum & RXD_TCPCSUM_MASK)
+@@ -2016,6 +2151,8 @@ static int tg3_rx(struct tg3 *tp, int bu
+                       skb->ip_summed = CHECKSUM_NONE;
+               skb->protocol = eth_type_trans(skb, tp->dev);
++
++                                      
+ #if TG3_VLAN_TAG_USED
+               if (tp->vlgrp != NULL &&
+                   desc->type_flags & RXD_FLAG_VLAN) {
+@@ -2058,7 +2195,6 @@ next_pkt_nopost:
+               if (tp->tg3_flags & TG3_FLAG_MBOX_WRITE_REORDER)
+                       tr32(MAILBOX_RCV_JUMBO_PROD_IDX + TG3_64BIT_REG_LOW);
+       }
+-
+       return received;
+ }
+@@ -2151,7 +2287,6 @@ static void tg3_interrupt(int irq, void 
+       unsigned long flags;
+       spin_lock_irqsave(&tp->lock, flags);
+-
+       if (sblk->status & SD_STATUS_UPDATED) {
+               /*
+                * writing any value to intr-mbox-0 clears PCI INTA# and
+@@ -2169,8 +2304,17 @@ static void tg3_interrupt(int irq, void 
+               tr32(MAILBOX_INTERRUPT_0 + TG3_64BIT_REG_LOW);
+               sblk->status &= ~SD_STATUS_UPDATED;
+-              if (likely(tg3_has_work(dev, tp)))
+-                      netif_rx_schedule(dev);         /* schedule NAPI poll */
++              if (likely(tg3_has_work(dev, tp))){
++                      if (unlikely(dev->rx_hook != NULL) && netdump_mode) {
++                              int ret;
++                              struct sk_buff *skb;
++                              ret = upcall_rx_hook(dev);
++                              if (!ret){
++                                      goto out;
++                              }
++                      }
++                       netif_rx_schedule(dev);                /* schedule NAPI poll */
++              }
+               else {
+                       /* no work, shared interrupt perhaps?  re-enable
+                        * interrupts, and flush that PCI write
+@@ -2180,7 +2324,7 @@ static void tg3_interrupt(int irq, void 
+                       tr32(MAILBOX_INTERRUPT_0 + TG3_64BIT_REG_LOW);
+               }
+       }
+-
++out:
+       spin_unlock_irqrestore(&tp->lock, flags);
+ }
+@@ -6804,7 +6948,10 @@ static int __devinit tg3_init_one(struct
+       dev->watchdog_timeo = TG3_TX_TIMEOUT;
+       dev->change_mtu = tg3_change_mtu;
+       dev->irq = pdev->irq;
+-
++#ifdef HAVE_POLL_CONTROLLER
++      dev->poll_controller = &Poll_tg3;
++#endif
++              
+       err = tg3_get_invariants(tp);
+       if (err) {
+               printk(KERN_ERR PFX "Problem fetching invariants of chip, "
+@@ -6882,6 +7029,15 @@ err_out_disable_pdev:
+       return err;
+ }
++#ifdef HAVE_POLL_CONTROLLER
++static void Poll_tg3(struct net_device *dev)
++{
++      if (!netdump_mode) disable_irq(dev->irq);
++      tg3_interrupt(dev->irq, dev, NULL);
++      if (!netdump_mode) enable_irq(dev->irq);
++}
++#endif
++
+ static void __devexit tg3_remove_one(struct pci_dev *pdev)
+ {
+       struct net_device *dev = pci_get_drvdata(pdev);
+
+_
diff --git a/lustre/kernel_patches/pc/netconsole_sysrq.pc b/lustre/kernel_patches/pc/netconsole_sysrq.pc
new file mode 100644 (file)
index 0000000..030fc19
--- /dev/null
@@ -0,0 +1,2 @@
+drivers/net/netconsole.c
+drivers/net/netconsole.h
diff --git a/lustre/kernel_patches/pc/tg3_netconsole.pc b/lustre/kernel_patches/pc/tg3_netconsole.pc
new file mode 100644 (file)
index 0000000..6653b7b
--- /dev/null
@@ -0,0 +1 @@
+drivers/net/tg3.c
index 735e383..9b53b54 100644 (file)
@@ -92,6 +92,7 @@ int client_import_connect(struct lustre_handle *dlm_handle,
                 imp->imp_replayable = 1;
                 CDEBUG(D_HA, "connected to replayable target: %s\n",
                        imp->imp_target_uuid.uuid);
+                ptlrpc_pinger_add_import(imp);
         }
         imp->imp_level = LUSTRE_CONN_FULL;
         imp->imp_remote_handle = request->rq_repmsg->handle;
@@ -151,7 +152,7 @@ int client_import_disconnect(struct lustre_handle *dlm_handle, int failover)
 
         /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
         if (obd->obd_no_recov) {
-                ptlrpc_abort_inflight(imp);
+                ptlrpc_set_import_active(imp, 0);
         } else {
                 request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
                 if (!request)
@@ -159,13 +160,13 @@ int client_import_disconnect(struct lustre_handle *dlm_handle, int failover)
 
                 request->rq_replen = lustre_msg_size(0, NULL);
 
-                /* Process disconnects even if we're waiting for recovery. */
-                request->rq_level = LUSTRE_CONN_RECOVD;
-
                 rc = ptlrpc_queue_wait(request);
                 if (rc)
                         GOTO(out_req, rc);
         }
+        if (imp->imp_replayable)
+                ptlrpc_pinger_del_import(imp);
+
         EXIT;
  out_req:
         if (request)
@@ -468,6 +469,7 @@ void target_abort_recovery(void *data)
         class_disconnect_exports(obd, 0);
         abort_delayed_replies(obd);
         abort_recovery_queue(obd);
+        ptlrpc_run_recovery_over_upcall(obd);
 }
 
 static void target_recovery_expired(unsigned long castmeharder)
index c5f8873..2dc60cf 100644 (file)
@@ -652,8 +652,11 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode,
                     !(lock->l_flags & LDLM_FL_LOCAL))
                         continue;
 
-                if ((flags & LDLM_FL_MATCH_DATA) && lock->l_data != data)
+                if ((flags & LDLM_FL_MATCH_DATA) && lock->l_data != data) {
+                        LDLM_DEBUG(lock, "data mismatch: have %p, want %p",
+                                   lock->l_data, data);
                         continue;
+                }
 
                 ldlm_lock_addref_internal(lock, mode);
                 return lock;
index 3f46618..de304d4 100644 (file)
@@ -311,7 +311,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         ldlm_add_waiting_lock(lock);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        req->rq_level = LUSTRE_CONN_RECOVD;
+        req->rq_level = LUSTRE_CONN_RECOVER;
         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
         rc = ptlrpc_queue_wait(req);
         if (rc == -ETIMEDOUT || rc == -EINTR) {
@@ -373,7 +373,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         LDLM_DEBUG(lock, "server preparing completion AST");
         req->rq_replen = lustre_msg_size(0, NULL);
 
-        req->rq_level = LUSTRE_CONN_RECOVD;
+        req->rq_level = LUSTRE_CONN_RECOVER;
         req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
         rc = ptlrpc_queue_wait(req);
         if (rc == -ETIMEDOUT || rc == -EINTR) {
index 008adab..e6a8229 100644 (file)
@@ -865,7 +865,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
                 RETURN(-ENOMEM);
 
         /* We're part of recovery, so don't wait for it. */
-        req->rq_level = LUSTRE_CONN_RECOVD;
+        req->rq_level = LUSTRE_CONN_RECOVER;
         
         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
         ldlm_lock2desc(lock, &body->lock_desc);
index 27ac231..0e88933 100644 (file)
@@ -674,8 +674,7 @@ llu_fsswop_mount(const char *source,
                 goto out_free;
         }
 
-        strncpy(param_uuid.uuid, mdc, sizeof(param_uuid.uuid));
-        obd = class_uuid2obd(&param_uuid);
+        obd = class_name2obd(mdc);
         if (!obd) {
                 CERROR("MDC %s: not setup or attached\n", mdc);
                 err = -EINVAL;
@@ -693,8 +692,7 @@ llu_fsswop_mount(const char *source,
         mdc_conn = sbi2mdc(sbi)->cl_import->imp_connection;
 
         /* setup osc */
-        strncpy(param_uuid.uuid, osc, sizeof(param_uuid.uuid));
-        obd = class_uuid2obd(&param_uuid);
+        obd = class_name2obd(osc);
         if (!obd) {
                 CERROR("OSC %s: not setup or attached\n", osc);
                 err = -EINVAL;
index 20924fc..0684968 100644 (file)
@@ -73,6 +73,9 @@ void ll_intent_release(struct dentry *de, struct lookup_intent *it)
 
         if (it->it_lock_mode) {
                 handle = (struct lustre_handle *)it->it_lock_handle;
+                CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
+                       " from it %p\n",
+                       handle->cookie, it);
                 ldlm_lock_decref(handle, it->it_lock_mode);
 
                 /* intent_release may be called multiple times, from
@@ -259,16 +262,18 @@ int ll_revalidate2(struct dentry *de, int flags, struct lookup_intent *it)
 
         rc = ll_intent_lock(de->d_parent->d_inode, &de, it, revalidate2_finish);
         if (rc < 0) {
-                CERROR("ll_intent_lock: rc %d : it->it_status %d\n", rc,
-                       it->it_status);
+                if (rc != -ESTALE) {
+                        CERROR("ll_intent_lock: rc %d : it->it_status %d\n", rc,
+                               it->it_status);
+                }
                 RETURN(0);
         }
         /* unfortunately ll_intent_lock may cause a callback and revoke our
            dentry */
         spin_lock(&dcache_lock);
         list_del_init(&de->d_hash);
+        __d_rehash(de, 0);
         spin_unlock(&dcache_lock);
-        d_rehash(de);
 
         RETURN(1);
 }
index 2d5954d..115ed4e 100644 (file)
@@ -74,7 +74,10 @@ static int ll_dir_readpage(struct file *file, struct page *page)
         struct mds_body *body;
         struct lookup_intent it = { .it_op = IT_READDIR };
         struct mdc_op_data data;
-
+        struct obd_device *obddev = class_conn2obd(&sbi->ll_mdc_conn);
+        struct ldlm_res_id res_id =
+                { .name = {inode->i_ino, (__u64)inode->i_generation} };
+        int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_MATCH_DATA;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
@@ -90,18 +93,24 @@ static int ll_dir_readpage(struct file *file, struct page *page)
                 GOTO(readpage_out, rc);
         }
 
-        ll_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0);
-
-        rc = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_PLAIN, &it, LCK_PR,
-                         &data, &lockh, NULL, 0,
-                         ldlm_completion_ast, ll_mdc_blocking_ast, inode);
-        request = (struct ptlrpc_request *)it.it_data;
-        if (request)
-                ptlrpc_req_finished(request);
-        if (rc < 0) {
-                CERROR("lock enqueue: err: %d\n", rc);
-                unlock_page(page);
-                RETURN(rc);
+        rc = ldlm_lock_match(obddev->obd_namespace, flags, &res_id,
+                             LDLM_PLAIN, NULL, 0, LCK_PR, inode,
+                             &lockh);
+        if (!rc) {
+                ll_prepare_mdc_op_data(&data, inode, NULL, NULL, 0, 0);
+                
+                rc = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_PLAIN, &it, LCK_PR,
+                                 &data, &lockh, NULL, 0,
+                                 ldlm_completion_ast, ll_mdc_blocking_ast,
+                                 inode);
+                request = (struct ptlrpc_request *)it.it_data;
+                if (request)
+                        ptlrpc_req_finished(request);
+                if (rc < 0) {
+                        CERROR("lock enqueue: err: %d\n", rc);
+                        unlock_page(page);
+                        RETURN(rc);
+                }
         }
         ldlm_lock_dump_handle(D_OTHER, &lockh);
 
@@ -770,7 +779,7 @@ static int ll_dir_ioctl(struct inode *inode, struct file *file,
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%u\n", inode->i_ino,
                inode->i_generation, inode, cmd);
 
-        if ((cmd & 0xffffff00) == ((int)'T') << 8) /* tty ioctls */
+        if (_IOC_TYPE(cmd) == 'T') /* tty ioctls */
                 return -ENOTTY;
 
         switch(cmd) {
index bd3fa7d..943ba1b 100644 (file)
@@ -425,8 +425,12 @@ int ll_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm,
         /* getattr can race with writeback.  we don't want to trust a getattr
          * that doesn't include the writeback of our farthest cached pages
          * that it raced with. */
+        /* Now that the OSC knows the cached-page status, it can and should be
+         * adjusting its getattr results to include the maximum cached offset
+         * for its stripe(s). */
         do {
-                bef = ll_farthest_dirty(&lli->lli_dirty, &before);
+                bef = obd_last_dirty_offset(ll_i2obdconn(inode), lli->lli_smd,
+                                            &before);
 #if 0
                 rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
 #else
@@ -444,7 +448,8 @@ int ll_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm,
                 if (rc)
                         RETURN(rc);
 
-                aft = ll_farthest_dirty(&lli->lli_dirty, &after);
+                aft = obd_last_dirty_offset(ll_i2obdconn(inode), lli->lli_smd,
+                                            &after);
                 CDEBUG(D_INODE, " %d,%lu -> %d,%lu\n", bef, before, aft, after);
         } while (bef == 0 &&
                  (aft != 0 || after < before) &&
@@ -985,7 +990,7 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%u\n", inode->i_ino,
                inode->i_generation, inode, cmd);
 
-        if ((cmd & 0xffffff00) == ((int)'T') << 8) /* tty ioctls */
+        if (_IOC_TYPE(cmd) == 'T') /* tty ioctls */
                 return -ENOTTY;
 
         lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats, LPROC_LL_IOCTL);
index 836a9aa..e3fabe6 100644 (file)
@@ -38,6 +38,7 @@
 #include <linux/rbtree.h>
 #include <linux/seq_file.h>
 #include <linux/time.h>
+#include "llite_internal.h"
 
 /* PG_inactive_clean is shorthand for rmap, we want free_high/low here.. */
 #ifdef PG_inactive_clean
@@ -73,7 +74,14 @@ static int llwp_consume_page(struct ll_writeback_pages *llwp,
 
         /* we raced with truncate? */
         if ( off >= inode->i_size ) {
-                ll_remove_dirty(inode, page->index, page->index);
+                int rc;
+                rc = ll_clear_dirty_pages(ll_i2obdconn(inode),
+                                          ll_i2info(inode)->lli_smd,
+                                          page->index, page->index);
+
+                LASSERT(rc == 0);
+                CDEBUG(D_CACHE, "offset "LPU64" (index %lu) > i_size %llu\n",
+                       off, page->index, inode->i_size);
                 unlock_page(page);
                 return 0;
         }
@@ -85,7 +93,7 @@ static int llwp_consume_page(struct ll_writeback_pages *llwp,
 
         pg->pg = page;
         pg->off = off;
-        pg->flag = OBD_BRW_CREATE;
+        pg->flag = OBD_BRW_CREATE|OBD_BRW_FROM_GRANT;
         pg->count = PAGE_CACHE_SIZE;
 
         /* catch partial writes for files that end mid-page */
@@ -176,9 +184,10 @@ static void ll_writeback(struct inode *inode, struct ll_writeback_pages *llwp)
                 CERROR("error from obd_brw_async: rc = %d\n", rc);
                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
                                     LPROC_LL_WB_FAIL, llwp->npgs);
-        } else
+        } else {
                 lprocfs_counter_add(ll_i2sbi(inode)->ll_stats,
                                     LPROC_LL_WB_OK, (llwp->npgs));
+        }
 
         for (i = 0 ; i < llwp->npgs ; i++) {
                 struct page *page = llwp->pga[i].pg;
@@ -186,7 +195,11 @@ static void ll_writeback(struct inode *inode, struct ll_writeback_pages *llwp)
                 CDEBUG(D_CACHE, "finished page %p at index %lu\n", page,
                        page->index);
                 LASSERT(PageLocked(page));
-                ll_remove_dirty(inode, page->index, page->index);
+
+                rc = ll_clear_dirty_pages(ll_i2obdconn(inode),
+                                          ll_i2info(inode)->lli_smd,
+                                          page->index, page->index);
+                LASSERT(rc == 0);
                 unlock_page(page);
                 page_cache_release(page);
         }
@@ -380,7 +393,7 @@ int ll_batch_writepage(struct inode *inode, struct page *page)
         current->flags |= PF_MEMALLOC;
         rc = ll_alloc_brw(inode, &llwp);
         if (rc != 0)
-                GOTO(cleanup, rc);
+                GOTO(restore_flags, rc);
 
         if (llwp_consume_page(&llwp, inode, page) == 0)
                 ll_get_dirty_pages(inode, &llwp);
@@ -390,241 +403,9 @@ int ll_batch_writepage(struct inode *inode, struct page *page)
                                     LPROC_LL_WB_WRITEPAGE, llwp.npgs);
                 ll_writeback(inode, &llwp);
         }
-
         kfree(llwp.pga);
-cleanup:
-        current->flags = old_flags;
-        RETURN(rc);
-}
-
-/*
- * we aggressively track offsets of pages that have been dirtied.  we need this
- * to make file size decisions around lock acquisition and cancelation.  all
- * extents include the offsets at their endpoints.
- */
-struct offset_extent {
-        rb_node_t       oe_node;
-        unsigned long   oe_start, oe_end;
-};
-
-static struct offset_extent *ll_find_oe(rb_root_t *root,
-                                        struct offset_extent *needle)
-{
-        struct rb_node_s *node = root->rb_node;
-        struct offset_extent *oe;
-        ENTRY;
-
-        CDEBUG(D_INODE, "searching [%lu -> %lu]\n", needle->oe_start,
-               needle->oe_end);
-
-        while (node) {
-                oe = rb_entry(node, struct offset_extent, oe_node);
-                if (needle->oe_end < oe->oe_start)
-                        node = node->rb_left;
-                else if (needle->oe_start > oe->oe_end)
-                        node = node->rb_right;
-                else {
-                        CDEBUG(D_INODE, "returning [%lu -> %lu]\n",
-                               oe->oe_start, oe->oe_end);
-                        RETURN(oe);
-                }
-        }
-        RETURN(NULL);
-}
-
-/* do the rbtree mechanics to insert a node, callers are responsible
- * for making sure that this new node doesn't overlap with existing
- * nodes */
-static void ll_insert_oe(rb_root_t *root, struct offset_extent *new_oe)
-{
-        rb_node_t ** p = &root->rb_node;
-        rb_node_t * parent = NULL;
-        struct offset_extent *oe;
-        ENTRY;
-
-        LASSERT(new_oe->oe_start <= new_oe->oe_end);
-
-        while (*p) {
-                parent = *p;
-                oe = rb_entry(parent, struct offset_extent, oe_node);
-                if ( new_oe->oe_end < oe->oe_start )
-                        p = &(*p)->rb_left;
-                else if ( new_oe->oe_start > oe->oe_end )
-                        p = &(*p)->rb_right;
-                else
-                        LBUG();
-        }
-        rb_link_node(&new_oe->oe_node, parent, p);
-        rb_insert_color(&new_oe->oe_node, root);
-        EXIT;
-}
-
-static inline void lldo_dirty_add(struct inode *inode,
-                                  struct ll_dirty_offsets *lldo,
-                                  long val)
-{
-        lldo->do_num_dirty += val;
-        lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_DIRTY_PAGES,
-                            val);
-}
-
-void ll_record_dirty(struct inode *inode, unsigned long offset)
-{
-        struct ll_dirty_offsets *lldo = &ll_i2info(inode)->lli_dirty;
-        struct offset_extent needle, *oe, *new_oe;
-        int rc;
-        ENTRY;
-
-        /* will allocate more intelligently later */
-        OBD_ALLOC(new_oe, sizeof(*new_oe));
-        LASSERT(new_oe); /* will have to do for now :/ */
-
-        spin_lock(&lldo->do_lock);
-
-        /* find neighbours that we might glom on to */
-        needle.oe_start = (offset > 0) ? offset - 1 : offset;
-        needle.oe_end = (offset < ~0) ? offset + 1 : offset;
-        oe = ll_find_oe(&lldo->do_root, &needle);
-        if ( oe == NULL ) {
-                new_oe->oe_start = offset;
-                new_oe->oe_end = offset;
-                ll_insert_oe(&lldo->do_root, new_oe);
-                lldo_dirty_add(inode, lldo, 1);
-                new_oe = NULL;
-                GOTO(out, rc = 1);
-        }
-
-        /* already recorded */
-        if ( offset >= oe->oe_start && offset <= oe->oe_end )
-                GOTO(out, rc = 2);
-
-        /* ok, need to check for adjacent neighbours */
-        needle.oe_start = offset;
-        needle.oe_end = offset;
-        if (ll_find_oe(&lldo->do_root, &needle))
-                GOTO(out, rc = 3);
-
-        /* ok, its safe to extend the oe we found */
-        if ( offset == oe->oe_start - 1 )
-                oe->oe_start--;
-        else if ( offset == oe->oe_end + 1 )
-                oe->oe_end++;
-        else
-                LBUG();
-        lldo_dirty_add(inode, lldo, 1);
-
-out:
-        CDEBUG(D_INODE, "%lu now dirty\n", lldo->do_num_dirty);
-        spin_unlock(&lldo->do_lock);
-        if ( new_oe )
-                OBD_FREE(new_oe, sizeof(*new_oe));
-        EXIT;
-        return;
-}
 
-void ll_remove_dirty(struct inode *inode, unsigned long start,
-                     unsigned long end)
-{
-        struct ll_dirty_offsets *lldo = &ll_i2info(inode)->lli_dirty;
-        struct offset_extent needle, *oe, *new_oe;
-        ENTRY;
-
-        /* will allocate more intelligently later */
-        OBD_ALLOC(new_oe, sizeof(*new_oe));
-        LASSERT(new_oe); /* will have to do for now :/ */
-
-        needle.oe_start = start;
-        needle.oe_end = end;
-
-        spin_lock(&lldo->do_lock);
-        for ( ; (oe = ll_find_oe(&lldo->do_root, &needle)) ; ) {
-
-                /* see if we're punching a hole and need to create a node */
-                if (oe->oe_start < start && oe->oe_end > end) {
-                        new_oe->oe_start = end + 1;
-                        new_oe->oe_end = oe->oe_end;
-                        oe->oe_end = start - 1;
-                        ll_insert_oe(&lldo->do_root, new_oe);
-                        new_oe = NULL;
-                        lldo_dirty_add(inode, lldo, -(end - start + 1));
-                        break;
-                }
-
-                /* overlapping edges */
-                if (oe->oe_start < start && oe->oe_end <= end) {
-                        lldo_dirty_add(inode, lldo, -(oe->oe_end - start + 1));
-                        oe->oe_end = start - 1;
-                        oe = NULL;
-                        continue;
-                }
-                if (oe->oe_end > end && oe->oe_start >= start) {
-                        lldo_dirty_add(inode, lldo, -(end - oe->oe_start + 1));
-                        oe->oe_start = end + 1;
-                        oe = NULL;
-                        continue;
-                }
-
-                /* an extent entirely within the one we're clearing */
-                rb_erase(&oe->oe_node, &lldo->do_root);
-                lldo_dirty_add(inode, lldo, -(oe->oe_end - oe->oe_start + 1));
-                spin_unlock(&lldo->do_lock);
-                OBD_FREE(oe, sizeof(*oe));
-                spin_lock(&lldo->do_lock);
-        }
-        CDEBUG(D_INODE, "%lu now dirty\n", lldo->do_num_dirty);
-        spin_unlock(&lldo->do_lock);
-        if (new_oe)
-                OBD_FREE(new_oe, sizeof(*new_oe));
-        EXIT;
-}
-
-int ll_find_dirty(struct ll_dirty_offsets *lldo, unsigned long *start,
-                  unsigned long *end)
-{
-        struct offset_extent needle, *oe;
-        int rc = -ENOENT;
-        ENTRY;
-
-        needle.oe_start = *start;
-        needle.oe_end = *end;
-
-        spin_lock(&lldo->do_lock);
-        oe = ll_find_oe(&lldo->do_root, &needle);
-        if (oe) {
-                *start = oe->oe_start;
-                *end = oe->oe_end;
-                rc = 0;
-        }
-        spin_unlock(&lldo->do_lock);
-
-        RETURN(rc);
-}
-
-int ll_farthest_dirty(struct ll_dirty_offsets *lldo, unsigned long *farthest)
-{
-        struct rb_node_s *last, *node;
-        struct offset_extent *oe;
-        int rc = -1;
-        ENTRY;
-
-        spin_lock(&lldo->do_lock);
-        for (node = lldo->do_root.rb_node, last = NULL;
-             node;
-             last = node, node = node->rb_right)
-                ;
-
-        if (last) {
-                oe = rb_entry(last, struct offset_extent, oe_node);
-                *farthest = oe->oe_end;
-                rc = 0;
-        }
-        spin_unlock(&lldo->do_lock);
+restore_flags:
+        current->flags = old_flags;
         RETURN(rc);
 }
-
-void ll_lldo_init(struct ll_dirty_offsets *lldo)
-{
-        spin_lock_init(&lldo->do_lock);
-        lldo->do_num_dirty = 0;
-        lldo->do_root.rb_node = NULL;
-}
index e53b605..4684383 100644 (file)
@@ -1,2 +1,29 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2003 Cluster File Systems, Inc.
+ *
+ * This code is issued under the GNU General Public License.
+ * See the file COPYING in this distribution
+ */
+
+#ifndef LLITE_INTERNAL_H
+#define LLITE_INTERNAL_H
+
+struct lustre_handle;
+struct lov_stripe_md;
+
 int ll_mdc_cancel_unused(struct lustre_handle *conn, struct inode *inode,
                          int flags, void *opaque);
+int ll_rd_dirty_pages(char *page, char **start, off_t off, int count,
+                      int *eof, void *data);
+int ll_rd_max_dirty_pages(char *page, char **start, off_t off, int count,
+                          int *eof, void *data);
+int ll_wr_max_dirty_pages(struct file *file, const char *buffer,
+                          unsigned long count, void *data);
+int ll_clear_dirty_pages(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+                         unsigned long start, unsigned long end);
+int ll_mark_dirty_page(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+                       unsigned long index);
+
+#endif /* LLITE_INTERNAL_H */
index 14eac3f..42fea4b 100644 (file)
@@ -28,6 +28,8 @@
 #include <linux/lustre_lite.h>
 #include <linux/lprocfs_status.h>
 
+#include "llite_internal.h"
+
 /* /proc/lustre/llite mount point registration */
 
 #ifndef LPROCFS
@@ -97,6 +99,8 @@ struct lprocfs_vars lprocfs_obd_vars[] = {
         { "filestotal",  rd_filestotal,  0, 0 },
         { "filesfree",   rd_filesfree,   0, 0 },
         { "filegroups",  rd_filegroups,  0, 0 },
+        { "dirty_pages", ll_rd_dirty_pages, 0, 0},
+        { "max_dirty_pages", ll_rd_max_dirty_pages, ll_wr_max_dirty_pages, 0},
         { 0 }
 };
 
@@ -108,8 +112,6 @@ struct llite_file_opcode {
         const char *opname;
 } llite_opcode_table[LPROC_LL_FILE_OPCODES] = {
         /* file operation */
-        { LPROC_LL_DIRTY_PAGES,    LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_PAGES,
-                                   "dirty_pages" },
         { LPROC_LL_DIRTY_HITS,     LPROCFS_TYPE_REGS, "dirty_pages_hits" },
         { LPROC_LL_DIRTY_MISSES,   LPROCFS_TYPE_REGS, "dirty_pages_misses" },
         { LPROC_LL_WB_WRITEPAGE,   LPROCFS_CNTR_AVGMINMAX|LPROCFS_TYPE_PAGES,
@@ -161,7 +163,6 @@ int lprocfs_register_mountpoint(struct proc_dir_entry *parent,
         struct ll_sb_info *sbi = ll_s2sbi(sb);
         struct obd_device *obd;
         char name[MAX_STRING_SIZE + 1];
-        struct obd_uuid uuid;
         int err, id;
         struct lprocfs_stats *svc_stats = NULL;
         ENTRY;
@@ -222,8 +223,7 @@ int lprocfs_register_mountpoint(struct proc_dir_entry *parent,
                 goto out;
 
         /* MDC info */
-        strncpy(uuid.uuid, mdc, sizeof(uuid.uuid));
-        obd = class_uuid2obd(&uuid);
+        obd = class_name2obd(mdc);
 
         LASSERT(obd != NULL);
         LASSERT(obd->obd_type != NULL);
@@ -243,8 +243,7 @@ int lprocfs_register_mountpoint(struct proc_dir_entry *parent,
                 goto out;
 
         /* OSC */
-        strncpy(uuid.uuid, osc, sizeof(uuid.uuid));
-        obd = class_uuid2obd(&uuid);
+        obd = class_name2obd(osc);
 
         LASSERT(obd != NULL);
         LASSERT(obd->obd_type != NULL);
index c14fd61..da6e670 100644 (file)
@@ -489,11 +489,39 @@ int ll_intent_lock(struct inode *parent, struct dentry **de,
         LASSERT (request != NULL);
 
         if (intent_finish != NULL) {
+                struct lustre_handle old_lock;
+                struct ldlm_lock *lock;
+
                 rc = intent_finish(flag, request, parent, de, it, offset, ino);
                 dentry = *de; /* intent_finish may change *de */
                 inode = dentry->d_inode;
                 if (rc != 0)
                         GOTO(drop_lock, rc);
+
+                /* The intent processing may well have given us a lock different
+                 * from the one we requested.  If we already have a matching
+                 * lock, then cancel the new one.  (We have to do this here,
+                 * instead of in mdc_enqueue, because we need to use the child's
+                 * inode as the l_data to match, and that's not available until
+                 * intent_finish has performed the iget().) */
+                lock = ldlm_handle2lock(&lockh);
+                if (lock) {
+                        LDLM_DEBUG(lock, "matching against this");
+                        LDLM_LOCK_PUT(lock);
+                        memcpy(&old_lock, &lockh, sizeof(lockh));
+                        if (ldlm_lock_match(NULL,
+                                            LDLM_FL_BLOCK_GRANTED |
+                                            LDLM_FL_MATCH_DATA,
+                                            NULL, LDLM_PLAIN, NULL, 0, LCK_NL,
+                                            inode, &old_lock)) {
+                                ldlm_lock_decref_and_cancel(&lockh,
+                                                            it->it_lock_mode);
+                                memcpy(&lockh, &old_lock, sizeof(old_lock));
+                                memcpy(it->it_lock_handle, &lockh,
+                                       sizeof(lockh));
+                        }
+                }
+
         }
         ptlrpc_req_finished(request);
 
@@ -912,6 +940,13 @@ static int ll_create(struct inode *dir, struct dentry *dentry, int mode)
         RETURN(rc);
 }
 
+static int ll_mknod(struct inode *dir, struct dentry *dentry, int mode,
+                    int rdev)
+{
+        LBUG();
+        return -ENOSYS;
+}
+
 static int ll_mknod2(struct inode *dir, const char *name, int len, int mode,
                      int rdev)
 {
@@ -950,34 +985,11 @@ static int ll_mknod2(struct inode *dir, const char *name, int len, int mode,
         RETURN(err);
 }
 
-static int ll_mknod(struct inode *dir, struct dentry *dentry, int mode,
-                    int rdev)
+static int ll_symlink(struct inode *dir, struct dentry *dentry,
+                      const char *symname)
 {
-        struct lookup_intent *it;
-        struct inode *inode;
-        int rc = 0;
-
-        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
-               dentry->d_name.name, dir->i_ino, dir->i_generation, dir,
-               LL_IT2STR(dentry->d_it));
-
-        LL_GET_INTENT(dentry, it);
-
-        if ((mode & S_IFMT) == 0)
-                mode |= S_IFREG;
-        inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
-                               NULL, 0, mode, rdev, it);
-
-        if (IS_ERR(inode))
-                RETURN(PTR_ERR(inode));
-
-        /* no directory data updates when intents rule */
-        if (it && it->it_disposition)
-                d_instantiate(dentry, inode);
-        else
-                rc = ext2_add_nondir(dentry, inode);
-
-        return rc;
+        LBUG();
+        return -ENOSYS;
 }
 
 static int ll_symlink2(struct inode *dir, const char *name, int len,
@@ -1004,46 +1016,11 @@ static int ll_symlink2(struct inode *dir, const char *name, int len,
         RETURN(err);
 }
 
-static int ll_symlink(struct inode *dir, struct dentry *dentry,
-                      const char *symname)
+static int ll_link(struct dentry *old_dentry, struct inode * dir,
+                   struct dentry *dentry)
 {
-        struct lookup_intent *it;
-        unsigned l = strlen(symname) + 1;
-        struct inode *inode;
-        struct ll_inode_info *lli;
-        int err = 0;
-        ENTRY;
-
-        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
-               dentry->d_name.name, dir->i_ino, dir->i_generation, dir,
-               LL_IT2STR(dentry->d_it));
-
-        LL_GET_INTENT(dentry, it);
-
-        inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
-                               symname, l, S_IFLNK | S_IRWXUGO, 0, it);
-        if (IS_ERR(inode))
-                RETURN(PTR_ERR(inode));
-
-        lli = ll_i2info(inode);
-
-        OBD_ALLOC(lli->lli_symlink_name, l);
-        /* this _could_ be a non-fatal error, since the symlink is already
-         * stored on the MDS by this point, and we can re-get it in readlink.
-         */
-        if (!lli->lli_symlink_name)
-                RETURN(-ENOMEM);
-
-        memcpy(lli->lli_symlink_name, symname, l);
-        inode->i_size = l - 1;
-
-        /* no directory data updates when intents rule */
-        if (it && it->it_disposition)
-                d_instantiate(dentry, inode);
-        else
-                err = ext2_add_nondir(dentry, inode);
-
-        RETURN(err);
+        LBUG();
+        return -ENOSYS;
 }
 
 static int ll_link2(struct inode *src, struct inode *dir,
@@ -1066,47 +1043,10 @@ static int ll_link2(struct inode *src, struct inode *dir,
         RETURN(err);
 }
 
-static int ll_link(struct dentry *old_dentry, struct inode * dir,
-                   struct dentry *dentry)
+static int ll_mkdir(struct inode *dir, struct dentry *dentry, int mode)
 {
-        struct lookup_intent *it;
-        struct inode *inode = old_dentry->d_inode;
-        int rc;
-        CDEBUG(D_VFSTRACE,
-               "VFS Op:inode=%lu/%u(%p),dir=%lu/%u(%p),target=%s,intent=%s\n",
-               inode->i_ino, inode->i_generation, inode, dir->i_ino,
-               dir->i_generation, dir, dentry->d_name.name,
-               LL_IT2STR(dentry->d_it));
-
-        LL_GET_INTENT(dentry, it);
-
-        if (it && it->it_disposition) {
-                if (it->it_status)
-                        RETURN(it->it_status);
-                LTIME_S(inode->i_ctime) = LTIME_S(CURRENT_TIME);
-                ext2_inc_count(inode);
-                atomic_inc(&inode->i_count);
-                d_instantiate(dentry, inode);
-                ll_invalidate_inode_pages(dir);
-                RETURN(0);
-        }
-
-        if (S_ISDIR(inode->i_mode))
-                return -EPERM;
-
-        if (inode->i_nlink >= EXT2_LINK_MAX)
-                return -EMLINK;
-
-        rc = ll_link2(old_dentry->d_inode, dir,
-                      dentry->d_name.name, dentry->d_name.len);
-        if (rc)
-                RETURN(rc);
-
-        LTIME_S(inode->i_ctime) = LTIME_S(CURRENT_TIME);
-        ext2_inc_count(inode);
-        atomic_inc(&inode->i_count);
-
-        return ext2_add_nondir(dentry, inode);
+        LBUG();
+        return -ENOSYS;
 }
 
 static int ll_mkdir2(struct inode *dir, const char *name, int len, int mode)
@@ -1132,58 +1072,6 @@ static int ll_mkdir2(struct inode *dir, const char *name, int len, int mode)
         RETURN(err);
 }
 
-
-static int ll_mkdir(struct inode *dir, struct dentry *dentry, int mode)
-{
-        struct lookup_intent *it;
-        struct inode * inode;
-        int err = -EMLINK;
-        ENTRY;
-        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
-               dentry->d_name.name, dir->i_ino, dir->i_generation, dir,
-               LL_IT2STR(dentry->d_it));
-
-        LL_GET_INTENT(dentry, it);
-
-        if (dir->i_nlink >= EXT2_LINK_MAX)
-                goto out;
-
-        ext2_inc_count(dir);
-        inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
-                               NULL, 0, S_IFDIR | mode, 0, it);
-        err = PTR_ERR(inode);
-        if (IS_ERR(inode))
-                goto out_dir;
-
-        err = ext2_make_empty(inode, dir);
-        if (err)
-                goto out_fail;
-
-        /* no directory data updates when intents rule */
-        if (!it || !it->it_disposition) {
-                /* XXX FIXME This code needs re-checked for non-intents */
-                ext2_inc_count(inode);
-                err = ll_add_link(dentry, inode);
-                if (err)
-                        goto out_fail;
-        }
-
-        d_instantiate(dentry, inode);
-out:
-        EXIT;
-        return err;
-
-out_fail:
-        ext2_dec_count(inode);
-        ext2_dec_count(inode);
-        iput(inode);
-        EXIT;
-out_dir:
-        ext2_dec_count(dir);
-        EXIT;
-        goto out;
-}
-
 static int ll_rmdir2(struct inode *dir, const char *name, int len)
 {
         int rc;
@@ -1206,84 +1094,23 @@ static int ll_unlink2(struct inode *dir, const char *name, int len)
         RETURN(rc);
 }
 
-static int ll_common_unlink(struct inode *dir, struct dentry *dentry,
-                            struct lookup_intent *it, __u32 mode)
-{
-        struct inode *inode = dentry->d_inode;
-        struct ext2_dir_entry_2 * de;
-        struct page * page;
-        int rc = 0;
-        ENTRY;
-
-        if (it && it->it_disposition) {
-                rc = it->it_status;
-                ll_invalidate_inode_pages(dir);
-                if (rc)
-                        GOTO(out, rc);
-                GOTO(out_dec, 0);
-        }
-
-        de = ext2_find_entry(dir, dentry, &page);
-        if (!de)
-                GOTO(out, rc = -ENOENT);
-        rc = ll_mdc_unlink(dir, dentry->d_inode, mode,
-                           dentry->d_name.name, dentry->d_name.len);
-        if (rc)
-                GOTO(out, rc);
-
-        rc = ext2_delete_entry(de, page);
-        if (rc)
-                GOTO(out, rc);
-
-        /* AED: not sure if needed - directory lock revocation should do it
-         * in the case where the client has cached it for non-intent ops.
-         */
-        ll_invalidate_inode_pages(dir);
-
-        inode->i_ctime = dir->i_ctime;
-        EXIT;
-out_dec:
-        ext2_dec_count(inode);
-out:
-        return rc;
-}
-
 static int ll_unlink(struct inode *dir, struct dentry *dentry)
 {
-        struct lookup_intent * it;
-        ENTRY;
-        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
-               dentry->d_name.name, dir->i_ino, dir->i_generation, dir,
-               LL_IT2STR(dentry->d_it));
-
-        LL_GET_INTENT(dentry, it);
-
-        RETURN(ll_common_unlink(dir, dentry, it, S_IFREG));
+        LBUG();
+        return -ENOSYS;
 }
 
 static int ll_rmdir(struct inode *dir, struct dentry *dentry)
 {
-        struct inode * inode = dentry->d_inode;
-        struct lookup_intent *it;
-        int rc;
-        ENTRY;
-        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
-               dentry->d_name.name, dir->i_ino, dir->i_generation, dir,
-               LL_IT2STR(dentry->d_it));
-
-        LL_GET_INTENT(dentry, it);
-
-        if ((!it || !it->it_disposition) && !ext2_empty_dir(inode))
-                RETURN(-ENOTEMPTY);
-
-        rc = ll_common_unlink(dir, dentry, it, S_IFDIR);
-        if (!rc) {
-                inode->i_size = 0;
-                ext2_dec_count(inode);
-                ext2_dec_count(dir);
-        }
+        LBUG();
+        return -ENOSYS;
+}
 
-        RETURN(rc);
+static int ll_rename(struct inode * old_dir, struct dentry * old_dentry,
+                     struct inode * new_dir, struct dentry * new_dentry)
+{
+        LBUG();
+        return -ENOSYS;
 }
 
 static int ll_rename2(struct inode *src, struct inode *tgt,
@@ -1307,126 +1134,23 @@ static int ll_rename2(struct inode *src, struct inode *tgt,
         RETURN(err);
 }
 
-
-
-static int ll_rename(struct inode * old_dir, struct dentry * old_dentry,
-                     struct inode * new_dir, struct dentry * new_dentry)
-{
-        struct lookup_intent *it;
-        struct inode * old_inode = old_dentry->d_inode;
-        struct inode * tgt_inode = new_dentry->d_inode;
-        struct page * dir_page = NULL;
-        struct ext2_dir_entry_2 * dir_de = NULL;
-        struct ext2_dir_entry_2 * old_de;
-        struct page * old_page;
-        int err;
-        CDEBUG(D_VFSTRACE, "VFS Op:oldname=%s,src_dir=%lu/%u(%p),newname=%s,"
-               "tgt_dir=%lu/%u(%p),intent=%s\n",
-               old_dentry->d_name.name, old_dir->i_ino, old_dir->i_generation,
-               old_dir, new_dentry->d_name.name, new_dir->i_ino,
-               new_dir->i_generation, new_dir, LL_IT2STR(new_dentry->d_it));
-
-        LL_GET_INTENT(new_dentry, it);
-
-        if (it && it->it_disposition) {
-                if (tgt_inode) {
-                        tgt_inode->i_ctime = CURRENT_TIME;
-                        tgt_inode->i_nlink--;
-                }
-                ll_invalidate_inode_pages(old_dir);
-                ll_invalidate_inode_pages(new_dir);
-                GOTO(out, err = it->it_status);
-        }
-
-        err = ll_rename2(old_dir, new_dir,
-                         old_dentry->d_name.name, old_dentry->d_name.len,
-                         new_dentry->d_name.name, new_dentry->d_name.len);
-        if (err)
-                goto out;
-
-        old_de = ext2_find_entry (old_dir, old_dentry, &old_page);
-        if (!old_de)
-                goto out;
-
-        if (S_ISDIR(old_inode->i_mode)) {
-                err = -EIO;
-                dir_de = ext2_dotdot(old_inode, &dir_page);
-                if (!dir_de)
-                        goto out_old;
-        }
-
-        if (tgt_inode) {
-                struct page *new_page;
-                struct ext2_dir_entry_2 *new_de;
-
-                err = -ENOTEMPTY;
-                if (dir_de && !ext2_empty_dir (tgt_inode))
-                        goto out_dir;
-
-                err = -ENOENT;
-                new_de = ext2_find_entry (new_dir, new_dentry, &new_page);
-                if (!new_de)
-                        goto out_dir;
-                ext2_inc_count(old_inode);
-                ext2_set_link(new_dir, new_de, new_page, old_inode);
-                tgt_inode->i_ctime = CURRENT_TIME;
-                if (dir_de)
-                        tgt_inode->i_nlink--;
-                ext2_dec_count(tgt_inode);
-        } else {
-                if (dir_de) {
-                        err = -EMLINK;
-                        if (new_dir->i_nlink >= EXT2_LINK_MAX)
-                                goto out_dir;
-                }
-                ext2_inc_count(old_inode);
-                err = ll_add_link(new_dentry, old_inode);
-                if (err) {
-                        ext2_dec_count(old_inode);
-                        goto out_dir;
-                }
-                if (dir_de)
-                        ext2_inc_count(new_dir);
-        }
-
-        ext2_delete_entry (old_de, old_page);
-        ext2_dec_count(old_inode);
-
-        if (dir_de) {
-                ext2_set_link(old_inode, dir_de, dir_page, new_dir);
-                ext2_dec_count(old_dir);
-        }
-        return 0;
-
-out_dir:
-        if (dir_de) {
-                kunmap(dir_page);
-                page_cache_release(dir_page);
-        }
-out_old:
-        kunmap(old_page);
-        page_cache_release(old_page);
-out:
-        return err;
-}
-
 extern int ll_inode_revalidate(struct dentry *dentry);
 struct inode_operations ll_dir_inode_operations = {
         create:          ll_create,
         lookup2:         ll_lookup2,
-        link:            ll_link,
+        link:            ll_link,          /* LBUG() */
         link2:           ll_link2,
-        unlink:          ll_unlink,
+        unlink:          ll_unlink,        /* LBUG() */
         unlink2:         ll_unlink2,
-        symlink:         ll_symlink,
+        symlink:         ll_symlink,       /* LBUG() */
         symlink2:        ll_symlink2,
-        mkdir:           ll_mkdir,
+        mkdir:           ll_mkdir,         /* LBUG() */
         mkdir2:          ll_mkdir2,
-        rmdir:           ll_rmdir,
+        rmdir:           ll_rmdir,         /* LBUG() */
         rmdir2:          ll_rmdir2,
-        mknod:           ll_mknod,
+        mknod:           ll_mknod,         /* LBUG() */
         mknod2:          ll_mknod2,
-        rename:          ll_rename,
+        rename:          ll_rename,        /* LBUG() */
         rename2:         ll_rename2,
         setattr:         ll_setattr,
         setattr_raw:     ll_setattr_raw,
index af90d66..98f6086 100644 (file)
@@ -32,7 +32,7 @@
 #include <linux/version.h>
 #include <asm/system.h>
 #include <asm/uaccess.h>
-
+#include "llite_internal.h"
 
 #include <linux/fs.h>
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
@@ -323,10 +323,10 @@ void ll_truncate(struct inode *inode)
                 return;
         }
 
-        /* vmtruncate just threw away our dirty pages, make sure
+        /* vmtruncate will just throw away our dirty pages, make sure
          * we don't think they're still dirty, being careful to round
          * i_size to the first whole page that was tossed */
-        ll_remove_dirty(inode,
+        err = ll_clear_dirty_pages(ll_i2obdconn(inode), lsm,
                         (inode->i_size + PAGE_CACHE_SIZE-1) >> PAGE_CACHE_SHIFT,
                         ~0);
 
@@ -417,6 +417,130 @@ static int ll_prepare_write(struct file *file, struct page *page, unsigned from,
  * yet.
  */
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+static unsigned long ll_local_cache_dirty_pages;
+static unsigned long ll_max_dirty_pages = 20 * 1024 * 1024 / PAGE_SIZE;
+
+static spinlock_t ll_local_cache_page_count_lock = SPIN_LOCK_UNLOCKED;
+
+int ll_rd_dirty_pages(char *page, char **start, off_t off, int count, int *eof,
+                      void *data)
+{
+        unsigned long dirty_count;
+        spin_lock(&ll_local_cache_page_count_lock);
+        dirty_count = ll_local_cache_dirty_pages;
+        spin_unlock(&ll_local_cache_page_count_lock);
+        return snprintf(page, count, "%lu\n", dirty_count);
+}
+
+int ll_rd_max_dirty_pages(char *page, char **start, off_t off, int count,
+                          int *eof, void *data)
+{
+        unsigned long max_dirty;
+        spin_lock(&ll_local_cache_page_count_lock);
+        max_dirty = ll_max_dirty_pages;
+        spin_unlock(&ll_local_cache_page_count_lock);
+        return snprintf(page, count, "%lu\n", max_dirty);
+}
+
+int ll_wr_max_dirty_pages(struct file *file, const char *buffer,
+                          unsigned long count, void *data)
+{
+        unsigned long max_dirty;
+        signed long max_dirty_signed;
+        char kernbuf[20], *end;
+        
+        if (count > (sizeof(kernbuf) - 1))
+                return -EINVAL;
+
+        if (copy_from_user(kernbuf, buffer, count))
+                return -EFAULT;
+
+        kernbuf[count] = '\0';
+
+        max_dirty_signed = simple_strtol(kernbuf, &end, 0);
+        if (kernbuf == end)
+                return -EINVAL;
+        max_dirty = (unsigned long)max_dirty_signed;
+
+#if 0
+        if (max_dirty < ll_local_cache_dirty_pages)
+                flush_to_new_max_dirty();
+#endif
+
+        spin_lock(&ll_local_cache_page_count_lock);
+        CDEBUG(D_CACHE, "changing max_dirty from %lu to %lu\n",
+               ll_max_dirty_pages, max_dirty);
+        ll_max_dirty_pages = max_dirty;
+        spin_unlock(&ll_local_cache_page_count_lock);
+        return count;
+}
+
+static int ll_local_cache_full(void)
+{
+        int full = 0;
+        spin_lock(&ll_local_cache_page_count_lock);
+        if (ll_max_dirty_pages &&
+            ll_local_cache_dirty_pages >= ll_max_dirty_pages) {
+                full = 1;
+        }
+        spin_unlock(&ll_local_cache_page_count_lock);
+        /* XXX instrument? */
+        /* XXX trigger async writeback when full, or 75% of full? */
+        return full;
+}
+
+static void ll_local_cache_flushed_pages(unsigned long pgcount)
+{
+        unsigned long dirty_count;
+        spin_lock(&ll_local_cache_page_count_lock);
+        dirty_count = ll_local_cache_dirty_pages;
+        ll_local_cache_dirty_pages -= pgcount;
+        CDEBUG(D_CACHE, "dirty pages: %lu->%lu)\n",
+               dirty_count, ll_local_cache_dirty_pages);
+        spin_unlock(&ll_local_cache_page_count_lock);
+        LASSERT(dirty_count >= pgcount);
+}
+
+static void ll_local_cache_dirtied_pages(unsigned long pgcount)
+{
+        unsigned long dirty_count;
+        spin_lock(&ll_local_cache_page_count_lock);
+        dirty_count = ll_local_cache_dirty_pages;
+        ll_local_cache_dirty_pages += pgcount;
+        CDEBUG(D_CACHE, "dirty pages: %lu->%lu\n",
+               dirty_count, ll_local_cache_dirty_pages);
+        spin_unlock(&ll_local_cache_page_count_lock);
+        /* XXX track maximum cached, report to lprocfs */
+}
+
+int ll_clear_dirty_pages(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+                         unsigned long start, unsigned long end)
+{
+        unsigned long cleared;
+        int rc;
+
+        ENTRY;
+        rc = obd_clear_dirty_pages(conn, lsm, start, end, &cleared);
+        if (!rc)
+                ll_local_cache_flushed_pages(cleared);
+        RETURN(rc);
+}
+
+int ll_mark_dirty_page(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+                       unsigned long index)
+{
+        int rc;
+
+        ENTRY;
+        if (ll_local_cache_full())
+                RETURN(-EDQUOT);
+
+        rc = obd_mark_page_dirty(conn, lsm, index);
+        if (!rc)
+                ll_local_cache_dirtied_pages(1);
+        RETURN(rc);
+}
+
 static int ll_writepage(struct page *page)
 {
         struct inode *inode = page->mapping->host;
@@ -440,6 +564,7 @@ static int ll_commit_write(struct file *file, struct page *page,
 {
         struct inode *inode = page->mapping->host;
         loff_t size;
+        int rc = 0;
         ENTRY;
 
         LASSERT(inode == file->f_dentry->d_inode);
@@ -447,34 +572,33 @@ static int ll_commit_write(struct file *file, struct page *page,
 
         CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
                inode, page, from, to, page->index);
-        /* to match full page case in prepare_write */
-        SetPageUptodate(page);
-        /* mark the page dirty, put it on mapping->dirty,
-         * mark the inode PAGES_DIRTY, put it on sb->dirty */
-        if (!PageDirty(page))
+        if (!PageDirty(page)) {
                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
                                      LPROC_LL_DIRTY_MISSES);
-        else
+                rc = ll_mark_dirty_page(ll_i2obdconn(inode),
+                                        ll_i2info(inode)->lli_smd,
+                                        page->index);
+                if (rc < 0 && rc != -EDQUOT)
+                        RETURN(rc); /* XXX lproc counter here? */
+        } else {
                 lprocfs_counter_incr(ll_i2sbi(inode)->ll_stats,
                                      LPROC_LL_DIRTY_HITS);
+        }
 
         size = (((obd_off)page->index) << PAGE_SHIFT) + to;
         if (size > inode->i_size)
                 inode->i_size = size;
 
-        /* XXX temporary, bug 1286 */
-        {
-                struct ll_dirty_offsets *lldo = &ll_i2info(inode)->lli_dirty;
-                int rc;
-                if ((lldo->do_num_dirty * PAGE_CACHE_SIZE) > 10 * 1024 * 1024) {
-                        rc = ll_batch_writepage(inode, page);
-                        lock_page(page); /* caller expects to unlock */
-                        RETURN(rc);
-                }
-        }
-
+        SetPageUptodate(page);
         set_page_dirty(page);
-        ll_record_dirty(inode, page->index);
+
+        /* This means that we've hit either the local cache limit or the limit
+         * of the OST's grant. */
+        if (rc == -EDQUOT) {
+                int rc = ll_batch_writepage(inode, page);
+                lock_page(page); /* caller expects to unlock */
+                RETURN(rc);
+        }
 
         RETURN(0);
 } /* ll_commit_write */
index aef3c06..85532f0 100644 (file)
@@ -121,6 +121,7 @@ static struct super_block *ll_read_super(struct super_block *sb,
         struct inode *root = 0;
         struct obd_device *obd;
         struct ll_sb_info *sbi;
+        struct obd_export *mdc_export;
         char *osc = NULL;
         char *mdc = NULL;
         int err;
@@ -130,7 +131,6 @@ static struct super_block *ll_read_super(struct super_block *sb,
         struct ptlrpc_connection *mdc_conn;
         struct ll_read_inode2_cookie lic;
         class_uuid_t uuid;
-        struct obd_uuid param_uuid;
 
         ENTRY;
 
@@ -158,8 +158,7 @@ static struct super_block *ll_read_super(struct super_block *sb,
                 GOTO(out_free, sb = NULL);
         }
 
-        strncpy(param_uuid.uuid, mdc, sizeof(param_uuid.uuid));
-        obd = class_uuid2obd(&param_uuid);
+        obd = class_name2obd(mdc);
         if (!obd) {
                 CERROR("MDC %s: not setup or attached\n", mdc);
                 GOTO(out_free, sb = NULL);
@@ -173,8 +172,7 @@ static struct super_block *ll_read_super(struct super_block *sb,
 
         mdc_conn = sbi2mdc(sbi)->cl_import->imp_connection;
 
-        strncpy(param_uuid.uuid, osc, sizeof(param_uuid.uuid));
-        obd = class_uuid2obd(&param_uuid);
+        obd = class_name2obd(osc);
         if (!obd) {
                 CERROR("OSC %s: not setup or attached\n", osc);
                 GOTO(out_mdc, sb = NULL);
@@ -195,7 +193,13 @@ static struct super_block *ll_read_super(struct super_block *sb,
         sbi->ll_rootino = rootfid.id;
 
         memset(&osfs, 0, sizeof(osfs));
-        err = obd_statfs(&sbi->ll_mdc_conn, &osfs);
+        mdc_export = class_conn2export(&sbi->ll_mdc_conn);
+        if (mdc_export == NULL) {
+                CERROR("null mdc_export\n");
+                GOTO(out_osc, sb = NULL);
+        }
+        err = obd_statfs(mdc_export, &osfs);
+        class_export_put(mdc_export);
         sb->s_blocksize = osfs.os_bsize;
         sb->s_blocksize_bits = log2(osfs.os_bsize);
         sb->s_magic = LL_SUPER_MAGIC;
@@ -595,14 +599,19 @@ int ll_setattr(struct dentry *de, struct iattr *attr)
 static int ll_statfs(struct super_block *sb, struct statfs *sfs)
 {
         struct ll_sb_info *sbi = ll_s2sbi(sb);
+        struct obd_export *mdc_exp = class_conn2export(&sbi->ll_mdc_conn);
+        struct obd_export *osc_exp;
         struct obd_statfs osfs;
         int rc;
         ENTRY;
 
+        if (mdc_exp == NULL)
+                RETURN(-EINVAL);
+
         CDEBUG(D_VFSTRACE, "VFS Op:\n");
         lprocfs_counter_incr(sbi->ll_stats, LPROC_LL_STAFS);
         memset(sfs, 0, sizeof(*sfs));
-        rc = obd_statfs(&sbi->ll_mdc_conn, &osfs);
+        rc = obd_statfs(mdc_exp, &osfs);
         statfs_unpack(sfs, &osfs);
         if (rc)
                 CERROR("mdc_statfs fails: rc = %d\n", rc);
@@ -614,7 +623,11 @@ static int ll_statfs(struct super_block *sb, struct statfs *sfs)
 
         /* temporary until mds_statfs returns statfs info for all OSTs */
         if (!rc) {
-                rc = obd_statfs(&sbi->ll_osc_conn, &osfs);
+                osc_exp = class_conn2export(&sbi->ll_osc_conn);
+                if (osc_exp == NULL)
+                        GOTO(out, rc = -EINVAL);
+                rc = obd_statfs(osc_exp, &osfs);
+                class_export_put(osc_exp);
                 if (rc) {
                         CERROR("obd_statfs fails: rc = %d\n", rc);
                         GOTO(out, rc);
@@ -648,6 +661,7 @@ static int ll_statfs(struct super_block *sb, struct statfs *sfs)
         }
 
 out:
+        class_export_put(mdc_exp);
         RETURN(rc);
 }
 
@@ -727,7 +741,6 @@ static void ll_read_inode2(struct inode *inode, void *opaque)
         sema_init(&lli->lli_open_sem, 1);
         spin_lock_init(&lli->lli_read_extent_lock);
         INIT_LIST_HEAD(&lli->lli_read_extents);
-        ll_lldo_init(&lli->lli_dirty);
         lli->lli_flags = 0;
         /* We default to 2T-4k until the LSM is created/read, at which point
          * it'll be updated. */
index e942736..980bfcd 100644 (file)
@@ -134,7 +134,6 @@ static int ll_fill_super(struct super_block *sb, void *data, int silent)
         struct ptlrpc_connection *mdc_conn;
         struct ll_read_inode2_cookie lic;
         class_uuid_t uuid;
-        struct obd_uuid param_uuid;
 
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:\n");
@@ -162,8 +161,7 @@ static int ll_fill_super(struct super_block *sb, void *data, int silent)
                 GOTO(out_free, sb = NULL);
         }
 
-        strncpy(param_uuid.uuid, mdc, sizeof(param_uuid.uuid));
-        obd = class_uuid2obd(&param_uuid);
+        obd = class_name2obd(mdc);
         if (!obd) {
                 CERROR("MDC %s: not setup or attached\n", mdc);
                 GOTO(out_free, sb = NULL);
@@ -176,9 +174,8 @@ static int ll_fill_super(struct super_block *sb, void *data, int silent)
         }
 
         mdc_conn = sbi2mdc(sbi)->cl_import->imp_connection;
-        strncpy(param_uuid.uuid, osc, sizeof(param_uuid.uuid));
 
-        obd = class_uuid2obd(&param_uuid);
+        obd = class_name2obd(osc);
         if (!obd) {
                 CERROR("OSC %s: not setup or attached\n", osc);
                 GOTO(out_mdc, sb = NULL);
index 87c3fb9..2974b2a 100644 (file)
@@ -30,6 +30,7 @@
 #include <linux/init.h>
 #include <linux/random.h>
 #include <linux/slab.h>
+#include <linux/pagemap.h>
 #include <asm/div64.h>
 #else
 #include <liblustre.h>
@@ -503,23 +504,16 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf)
 {
         struct obd_ioctl_data *data = buf;
         struct lov_obd *lov = &obd->u.lov;
-        struct obd_uuid uuid;
         int rc = 0;
         ENTRY;
 
         if (data->ioc_inllen1 < 1) {
-                CERROR("LOV setup requires an MDC UUID\n");
-                RETURN(-EINVAL);
-        }
-
-        if (data->ioc_inllen1 > 37) {
-                CERROR("mdc UUID must be 36 characters or less\n");
+                CERROR("LOV setup requires an MDC name\n");
                 RETURN(-EINVAL);
         }
 
         spin_lock_init(&lov->lov_lock);
-        obd_str2uuid(&uuid, data->ioc_inlbuf1);
-        lov->mdcobd = class_uuid2obd(&uuid);
+        lov->mdcobd = class_name2obd(data->ioc_inlbuf1);
         if (!lov->mdcobd) {
                 CERROR("LOV %s cannot locate MDC %s\n", obd->obd_uuid.uuid,
                        data->ioc_inlbuf1);
@@ -669,9 +663,11 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
                        lsm->lsm_object_id, loi->loi_id, ost_idx);
 
-                if (!set)
+                if (set == 0)
                         lsm->lsm_stripe_offset = ost_idx;
                 lov_merge_attrs(oa, tmp, OBD_MD_FLBLKSZ, lsm, obj_alloc, &set);
+                ot_init(&loi->loi_dirty_ot_inline);
+                loi->loi_dirty_ot = &loi->loi_dirty_ot_inline;
 
                 ++obj_alloc;
                 ++loi;
@@ -736,6 +732,21 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
         goto out_tmp;
 }
 
+#define lsm_bad_magic(LSMP)                                     \
+({                                                              \
+        struct lov_stripe_md *_lsm__ = (LSMP);                  \
+        int _ret__ = 0;                                         \
+        if (!_lsm__) {                                          \
+                CERROR("LOV requires striping ea\n");           \
+                _ret__ = 1;                                     \
+        } else if (_lsm__->lsm_magic != LOV_MAGIC) {            \
+                CERROR("LOV striping magic bad %#x != %#x\n",   \
+                       _lsm__->lsm_magic, LOV_MAGIC);           \
+                _ret__ = 1;                                     \
+        }                                                       \
+        _ret__;                                                 \
+})
+
 static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
                        struct lov_stripe_md *lsm, struct obd_trans_info *oti)
 {
@@ -747,16 +758,8 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
         int rc = 0, i;
         ENTRY;
 
-        if (!lsm) {
-                CERROR("LOV requires striping ea for destruction\n");
-                GOTO(out, rc = -EINVAL);
-        }
-
-        if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#x != %#x\n",
-                       lsm->lsm_magic, LOV_MAGIC);
+        if (lsm_bad_magic(lsm))
                 GOTO(out, rc = -EINVAL);
-        }
 
         if (!export || !export->exp_obd)
                 GOTO(out, rc = -ENODEV);
@@ -809,16 +812,8 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
         int i, rc = 0, set = 0;
         ENTRY;
 
-        if (!lsm) {
-                CERROR("LOV requires striping ea\n");
+        if (lsm_bad_magic(lsm))
                 GOTO(out, rc = -EINVAL);
-        }
-
-        if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#x != %#x\n",
-                       lsm->lsm_magic, LOV_MAGIC);
-                GOTO(out, rc = -EINVAL);
-        }
 
         if (!export || !export->exp_obd)
                 GOTO(out, rc = -ENODEV);
@@ -1008,16 +1003,8 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
         int rc = 0, i, set = 0;
         ENTRY;
 
-        if (!lsm) {
-                CERROR("LOV requires striping ea\n");
+        if (lsm_bad_magic(lsm))
                 GOTO(out, rc = -EINVAL);
-        }
-
-        if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#x != %#x\n",
-                       lsm->lsm_magic, LOV_MAGIC);
-                GOTO(out, rc = -EINVAL);
-        }
 
         if (!export || !export->exp_obd)
                 GOTO(out, rc = -ENODEV);
@@ -1092,16 +1079,8 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
         ENTRY;
         LASSERT(och != NULL);
 
-        if (!lsm) {
-                CERROR("LOV requires striping ea for opening\n");
-                GOTO(out_exp, rc = -EINVAL);
-        }
-
-        if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#x != %#x\n",
-                       lsm->lsm_magic, LOV_MAGIC);
+        if (lsm_bad_magic(lsm))
                 GOTO(out_exp, rc = -EINVAL);
-        }
 
         if (!export || !export->exp_obd)
                 GOTO(out_exp, rc = -ENODEV);
@@ -1202,16 +1181,8 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
         int rc = 0, i;
         ENTRY;
 
-        if (!lsm) {
-                CERROR("LOV requires striping ea\n");
-                GOTO(out, rc = -EINVAL);
-        }
-
-        if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#x != %#x\n",
-                       lsm->lsm_magic, LOV_MAGIC);
+        if (lsm_bad_magic(lsm))
                 GOTO(out, rc = -EINVAL);
-        }
 
         if (!export || !export->exp_obd)
                 GOTO(out, rc = -ENODEV);
@@ -1407,16 +1378,8 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
         int rc = 0, i;
         ENTRY;
 
-        if (!lsm) {
-                CERROR("LOV requires striping ea\n");
-                GOTO(out, rc = -EINVAL);
-        }
-
-        if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#x != %#x\n",
-                       lsm->lsm_magic, LOV_MAGIC);
+        if (lsm_bad_magic(lsm))
                 GOTO(out, rc = -EINVAL);
-        }
 
         if (!export || !export->exp_obd)
                 GOTO(out, rc = -ENODEV);
@@ -1510,16 +1473,8 @@ static int lov_brw(int cmd, struct lustre_handle *conn,
         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
         ENTRY;
 
-        if (!lsm) {
-                CERROR("LOV requires striping ea\n");
+        if (lsm_bad_magic(lsm))
                 GOTO(out_exp, rc = -EINVAL);
-        }
-
-        if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#x != %#x\n",
-                       lsm->lsm_magic, LOV_MAGIC);
-                GOTO(out_exp, rc = -EINVAL);
-        }
 
         lov = &export->exp_obd->u.lov;
 
@@ -1624,16 +1579,8 @@ static int lov_brw_async(int cmd, struct lustre_handle *conn,
         int rc = 0, i, *where, stripe_count = lsm->lsm_stripe_count;
         ENTRY;
 
-        if (!lsm) {
-                CERROR("LOV requires striping ea\n");
+        if (lsm_bad_magic(lsm))
                 GOTO(out_exp, rc = -EINVAL);
-        }
-
-        if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#x != %#x\n",
-                       lsm->lsm_magic, LOV_MAGIC);
-                GOTO(out_exp, rc = -EINVAL);
-        }
 
         lov = &export->exp_obd->u.lov;
 
@@ -1731,16 +1678,8 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         int i;
         ENTRY;
 
-        if (!lsm) {
-                CERROR("LOV requires striping ea\n");
-                GOTO(out_exp, rc = -EINVAL);
-        }
-
-        if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#x != %#x\n",
-                       lsm->lsm_magic, LOV_MAGIC);
+        if (lsm_bad_magic(lsm))
                 GOTO(out_exp, rc = -EINVAL);
-        }
 
         /* we should never be asked to replay a lock this way. */
         LASSERT((*flags & LDLM_FL_REPLAY) == 0);
@@ -1843,16 +1782,8 @@ static int lov_match(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         int i;
         ENTRY;
 
-        if (!lsm) {
-                CERROR("LOV requires striping ea\n");
-                GOTO(out_exp, rc = -EINVAL);
-        }
-
-        if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#x != %#x\n",
-                       lsm->lsm_magic, LOV_MAGIC);
+        if (lsm_bad_magic(lsm))
                 GOTO(out_exp, rc = -EINVAL);
-        }
 
         if (!export || !export->exp_obd)
                 GOTO(out_exp, rc = -ENODEV);
@@ -1941,16 +1872,8 @@ static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         int rc = 0, i;
         ENTRY;
 
-        if (!lsm) {
-                CERROR("LOV requires striping ea\n");
-                GOTO(out, rc = -EINVAL);
-        }
-
-        if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#x != %#x\n",
-                       lsm->lsm_magic, LOV_MAGIC);
+        if (lsm_bad_magic(lsm))
                 GOTO(out, rc = -EINVAL);
-        }
 
         if (!export || !export->exp_obd)
                 GOTO(out, rc = -ENODEV);
@@ -2016,10 +1939,8 @@ static int lov_cancel_unused(struct lustre_handle *conn,
         int rc = 0, i;
         ENTRY;
 
-        if (!lsm) {
-                CERROR("LOV requires striping ea for lock cancellation\n");
+        if (lsm_bad_magic(lsm))
                 GOTO(out, rc = -EINVAL);
-        }
 
         if (!export || !export->exp_obd)
                 GOTO(out, rc = -ENODEV);
@@ -2059,9 +1980,9 @@ static int lov_cancel_unused(struct lustre_handle *conn,
                         (tot) += (add);                                 \
         } while(0)
 
-static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
+static int lov_statfs(struct obd_export *export, struct obd_statfs *osfs)
 {
-        struct obd_export *export = class_conn2export(conn);
+        struct obd_export *tgt_export;
         struct lov_obd *lov;
         struct obd_statfs lov_sfs;
         int set = 0;
@@ -2070,7 +1991,7 @@ static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
         ENTRY;
 
         if (!export || !export->exp_obd)
-                GOTO(out, rc = -ENODEV);
+                RETURN(-ENODEV);
 
         lov = &export->exp_obd->u.lov;
 
@@ -2083,7 +2004,14 @@ static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
                         continue;
                 }
 
-                err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
+                tgt_export = class_conn2export(&lov->tgts[i].conn);
+                if (!tgt_export) {
+                        CDEBUG(D_HA, "lov idx %d NULL export\n", i);
+                        continue;
+                }
+
+                err = obd_statfs(tgt_export, &lov_sfs);
+                class_export_put(tgt_export);
                 if (err) {
                         if (lov->tgts[i].active) {
                                 CERROR("error: statfs OSC %s on OST idx %d: "
@@ -2127,10 +2055,7 @@ static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
                         do_div(osfs->os_ffree, expected_stripes);
         } else if (!rc)
                 rc = -EIO;
-        GOTO(out, rc);
- out:
-        class_export_put(export);
-        return rc;
+        RETURN(rc);
 }
 
 static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
@@ -2266,6 +2191,153 @@ static int lov_get_info(struct lustre_handle *conn, __u32 keylen,
         RETURN(-EINVAL);
 }
 
+static int lov_mark_page_dirty(struct lustre_handle *conn, 
+                               struct lov_stripe_md *lsm, unsigned long offset)
+{
+        struct lov_obd *lov = &class_conn2obd(conn)->u.lov;
+        struct lov_oinfo *loi;
+        struct lov_stripe_md *submd;
+        int stripe, rc;
+        obd_off off;
+        ENTRY;
+
+        if (lsm_bad_magic(lsm))
+                RETURN(-EINVAL);
+
+        OBD_ALLOC(submd, lov_stripe_md_size(1));
+        if (submd == NULL)
+                RETURN(-ENOMEM);
+
+        stripe = lov_stripe_number(lsm, (obd_off)offset << PAGE_CACHE_SHIFT);
+        lov_stripe_offset(lsm, (obd_off)offset << PAGE_CACHE_SHIFT, stripe, 
+                          &off);
+        off >>= PAGE_CACHE_SHIFT;
+
+        loi = &lsm->lsm_oinfo[stripe];
+        CDEBUG(D_INODE, "off %lu => off %lu on stripe %d\n", offset, 
+               (unsigned long)off, stripe);
+        submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
+
+        rc = obd_mark_page_dirty(&lov->tgts[loi->loi_ost_idx].conn, submd, off);
+        OBD_FREE(submd, lov_stripe_md_size(1));
+        RETURN(rc);
+}
+
+static int lov_clear_dirty_pages(struct lustre_handle *conn, 
+                                 struct lov_stripe_md *lsm, unsigned long start,
+                                 unsigned long end, unsigned long *cleared)
+
+{
+        struct obd_export *export = class_conn2export(conn);
+        __u64 start_off = (__u64)start << PAGE_CACHE_SHIFT;
+        __u64 end_off = (__u64)end << PAGE_CACHE_SHIFT;
+        __u64 obd_start, obd_end;
+        struct lov_stripe_md *submd = NULL;
+        struct lov_obd *lov;
+        struct lov_oinfo *loi;
+        int i, rc;
+        unsigned long osc_cleared;
+        ENTRY;
+
+        *cleared = 0;
+
+        if (lsm_bad_magic(lsm))
+                GOTO(out_exp, rc = -EINVAL);
+
+        if (!export || !export->exp_obd)
+                GOTO(out_exp, rc = -ENODEV);
+
+        OBD_ALLOC(submd, lov_stripe_md_size(1));
+        if (submd == NULL)
+                GOTO(out_exp, rc = -ENOMEM);
+
+        lov = &export->exp_obd->u.lov;
+        rc = 0;
+        for (i = 0, loi = lsm->lsm_oinfo;
+             i < lsm->lsm_stripe_count;
+             i++, loi++) {
+                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
+                        continue;
+                }
+
+                if(!lov_stripe_intersects(lsm, i, start_off, end_off,
+                                          &obd_start, &obd_end))
+                        continue;
+                obd_start >>= PAGE_CACHE_SHIFT;
+                obd_end >>= PAGE_CACHE_SHIFT;
+
+                CDEBUG(D_INODE, "offs [%lu,%lu] => offs [%lu,%lu] stripe %d\n", 
+                       start, end, (unsigned long)obd_start, 
+                       (unsigned long)obd_end, loi->loi_ost_idx);
+                submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
+                rc = obd_clear_dirty_pages(&lov->tgts[loi->loi_ost_idx].conn, 
+                                           submd, obd_start, obd_end,
+                                           &osc_cleared);
+                if (rc)
+                        break;
+                *cleared += osc_cleared;
+        }
+out_exp:
+        if (submd)
+                OBD_FREE(submd, lov_stripe_md_size(1));
+        class_export_put(export);
+        RETURN(rc);
+}
+
+static int lov_last_dirty_offset(struct lustre_handle *conn,
+                                 struct lov_stripe_md *lsm,
+                                 unsigned long *offset)
+{
+        struct obd_export *export = class_conn2export(conn);
+        struct lov_stripe_md *submd = NULL;
+        struct lov_obd *lov;
+        struct lov_oinfo *loi;
+        unsigned long tmp, count, skip;
+        int err, i, rc;
+        ENTRY;
+
+        if (lsm_bad_magic(lsm))
+                GOTO(out_exp, rc = -EINVAL);
+
+        if (!export || !export->exp_obd)
+                GOTO(out_exp, rc = -ENODEV);
+
+        OBD_ALLOC(submd, lov_stripe_md_size(1));
+        if (submd == NULL)
+                GOTO(out_exp, rc = -ENOMEM);
+
+        *offset = 0;
+        lov = &export->exp_obd->u.lov;
+        rc = -ENOENT;
+        for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; 
+                                          i++, loi++) {
+
+                count = lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
+                skip = (lsm->lsm_stripe_count - 1) * count;
+
+                submd->lsm_oinfo[0].loi_dirty_ot = &loi->loi_dirty_ot_inline;
+
+                err = obd_last_dirty_offset(&lov->tgts[loi->loi_ost_idx].conn, 
+                                            submd, &tmp);
+                if (err == -ENOENT)
+                        continue;
+                if (err)
+                        GOTO(out_exp, rc = err);
+
+                rc = 0;
+                if (tmp != ~0) 
+                        tmp += (tmp/count * skip) + (i * count);
+                if (tmp > *offset)
+                        *offset = tmp;
+        }
+out_exp:
+        if (submd)
+                OBD_FREE(submd, lov_stripe_md_size(1));
+        class_export_put(export);
+        RETURN(rc);
+}
+
 struct obd_ops lov_obd_ops = {
         o_owner:       THIS_MODULE,
         o_attach:      lov_attach,
@@ -2291,7 +2363,10 @@ struct obd_ops lov_obd_ops = {
         o_cancel:      lov_cancel,
         o_cancel_unused: lov_cancel_unused,
         o_iocontrol:   lov_iocontrol,
-        o_get_info:    lov_get_info
+        o_get_info:    lov_get_info,
+        .o_mark_page_dirty =    lov_mark_page_dirty,
+        .o_clear_dirty_pages =    lov_clear_dirty_pages,
+        .o_last_dirty_offset =    lov_last_dirty_offset,
 };
 
 int __init lov_init(void)
index 620dd5c..bbb40de 100644 (file)
@@ -234,8 +234,11 @@ int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
 
                 LMM_ASSERT(loi - lsm->lsm_oinfo < stripe_count);
                 /* XXX LOV STACKING call down to osc_unpackmd() */
-                loi->loi_id = le64_to_cpu (lmm->lmm_objects[ost_offset].l_object_id);
+                loi->loi_id =
+                        le64_to_cpu (lmm->lmm_objects[ost_offset].l_object_id);
                 loi->loi_ost_idx = ost_offset;
+                loi->loi_dirty_ot = &loi->loi_dirty_ot_inline;
+                ot_init(loi->loi_dirty_ot);
                 loi++;
         }
         LMM_ASSERT(loi - lsm->lsm_oinfo > 0);
index 1396f8d..806a830 100644 (file)
@@ -98,8 +98,7 @@ void mds_create_pack(struct ptlrpc_request *req, int offset,
 void mds_open_pack(struct ptlrpc_request *req, int offset,
                    struct mdc_op_data *op_data,
                    __u32 mode, __u64 rdev, __u32 uid, __u32 gid, __u64 time,
-                   __u32 flags,
-                   const void *data, int datalen)
+                   __u32 flags, const void *data, int datalen)
 {
         struct mds_rec_create *rec;
         char *tmp;
index 68d7f0d..2da2fdb 100644 (file)
@@ -140,7 +140,7 @@ int mdc_create(struct lustre_handle *conn,
         rc = mdc_reint(req, level);
         /* Resend if we were told to. */
         if (rc == -ERESTARTSYS) {
-                level = LUSTRE_CONN_RECOVD;
+                level = LUSTRE_CONN_RECOVER;
                 goto resend;
         }
 
index dc90885..204a836 100644 (file)
@@ -352,10 +352,9 @@ int mdc_enqueue(struct lustre_handle *conn,
                 lit->opc = (__u64)it->it_op;
 
                 /* pack the intended request */
-                mds_open_pack(req, 2, data, it->it_mode, 0,
-                              current->fsuid, current->fsgid,
-                              LTIME_S(CURRENT_TIME), it->it_flags,
-                              tgt, tgtlen);
+                mds_open_pack(req, 2, data, it->it_mode, 0, current->fsuid,
+                              current->fsgid, LTIME_S(CURRENT_TIME),
+                              it->it_flags, tgt, tgtlen);
                 /* get ready for the reply */
                 reply_buffers = 3;
                 req->rq_replen = lustre_msg_size(3, repsize);
@@ -434,7 +433,6 @@ int mdc_enqueue(struct lustre_handle *conn,
                 RETURN(rc);
         } else { /* rc = 0 */
                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
-                struct lustre_handle lockh2;
                 LASSERT(lock);
 
                 /* If the server gave us back a different lock mode, we should
@@ -445,20 +443,6 @@ int mdc_enqueue(struct lustre_handle *conn,
                         lock_mode = lock->l_req_mode;
                 }
 
-                /* The server almost certainly gave us a lock other than the
-                 * one that we asked for.  If we already have a matching lock,
-                 * then cancel this one--we don't need two. */
-                LDLM_DEBUG(lock, "matching against this");
-
-                memcpy(&lockh2, lockh, sizeof(lockh2));
-                if (ldlm_lock_match(NULL,
-                                    LDLM_FL_BLOCK_GRANTED | LDLM_FL_MATCH_DATA,
-                                    NULL, LDLM_PLAIN, NULL, 0, LCK_NL, cb_data,
-                                    &lockh2)) {
-                        /* We already have a lock; cancel the new one */
-                        ldlm_lock_decref_and_cancel(lockh, lock_mode);
-                        memcpy(lockh, &lockh2, sizeof(lockh2));
-                }
                 LDLM_LOCK_PUT(lock);
         }
 
@@ -650,15 +634,15 @@ static int mdc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
         }
 }
 
-static int mdc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
+static int mdc_statfs(struct obd_export *exp, struct obd_statfs *osfs)
 {
         struct ptlrpc_request *req;
         struct obd_statfs *msfs;
         int rc, size = sizeof(*msfs);
         ENTRY;
 
-        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_STATFS, 0, NULL,
-                              NULL);
+        req = ptlrpc_prep_req(exp->exp_obd->u.cli.cl_import, MDS_STATFS, 0, 
+                              NULL, NULL);
         if (!req)
                 RETURN(-ENOMEM);
 
index 3c2aa89..de3f2ed 100644 (file)
@@ -1582,7 +1582,8 @@ static int mds_cleanup(struct obd_device *obddev, int force, int failover)
         RETURN(0);
 }
 
-inline void fixup_handle_for_resent_req(struct ptlrpc_request *req,
+static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
+                                        struct ldlm_lock *new_lock,
                                         struct lustre_handle *lockh)
 {
         struct obd_export *exp = req->rq_export;
@@ -1599,6 +1600,8 @@ inline void fixup_handle_for_resent_req(struct ptlrpc_request *req,
         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
                 struct ldlm_lock *lock;
                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
+                if (lock == new_lock)
+                        continue;
                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
                         lockh->cookie = lock->l_handle.h_cookie;
                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
@@ -1658,7 +1661,7 @@ static int ldlm_intent_policy(struct ldlm_namespace *ns,
                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
                 rep->lock_policy_res1 = IT_INTENT_EXEC;
 
-                fixup_handle_for_resent_req(req, &lockh);
+                fixup_handle_for_resent_req(req, lock, &lockh);
 
                 /* execute policy */
                 switch ((long)it->opc) {
index d83e4ee..04d6ee9 100644 (file)
@@ -116,6 +116,8 @@ void reconstruct_open(struct mds_update_record *rec, int offset,
         struct ldlm_reply *rep;
         struct mds_body *body;
         int disp, rc;
+        struct list_head *t;
+        int put_child = 1;
         ENTRY;
 
         LASSERT(offset == 2);                  /* only called via intent */
@@ -183,44 +185,43 @@ void reconstruct_open(struct mds_update_record *rec, int offset,
         /* If we didn't get as far as trying to open, then some locking thing
          * probably went wrong, and we'll just bail here.
          */
-        if ((disp & IT_OPEN_OPEN) == 0) {
+        if ((disp & IT_OPEN_OPEN) == 0)
                 GOTO(out_dput, 0);
-        }
 
         /* If we failed, then we must have failed opening, so don't look for
          * file descriptor or anything, just give the client the bad news.
          */
-        if (req->rq_status) {
+        if (req->rq_status)
                 GOTO(out_dput, 0);
+
+        mfd = NULL;
+        list_for_each(t, &med->med_open_head) {
+                mfd = list_entry(t, struct mds_file_data, mfd_list);
+                if (mfd->mfd_xid == req->rq_xid) 
+                        break;
+                mfd = NULL;
         }
 
         if (req->rq_export->exp_outstanding_reply) {
-                struct list_head *t;
-                mfd = NULL;
-                /* XXX can we just look in the old reply to find the handle in
-                 * XXX O(1) here? */
-                list_for_each(t, &med->med_open_head) {
-                        mfd = list_entry(t, struct mds_file_data, mfd_list);
-                        if (mfd->mfd_xid == req->rq_xid)
-                                break;
-                        mfd = NULL;
-                }
                 /* if we're not recovering, it had better be found */
                 LASSERT(mfd);
-        } else {
+        } else if (mfd == NULL) {
                 mntget(mds->mds_vfsmnt);
+                CERROR("Re-opened file \n");
                 mfd = mds_dentry_open(child, mds->mds_vfsmnt,
                                    rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
                 if (!mfd) {
                         CERROR("mds: out of memory\n");
                         GOTO(out_dput, req->rq_status = -ENOMEM);
                 }
+                put_child = 0;
         }
 
         body->handle.cookie = mfd->mfd_handle.h_cookie;
 
  out_dput:
-        l_dput(child);
+        if (put_child)
+                l_dput(child);
         l_dput(parent);
         EXIT;
 }
@@ -376,6 +377,9 @@ int mds_open(struct mds_update_record *rec, int offset,
         if (S_ISLNK(dchild->d_inode->i_mode))
                 GOTO(cleanup, rc = 0);
 
+        if ((rec->ur_flags & O_DIRECTORY) && !S_ISDIR(dchild->d_inode->i_mode))
+                GOTO(cleanup, rc = -ENOTDIR);
+
         /* Step 5: mds_open it */
         rep->lock_policy_res1 |= IT_OPEN_OPEN;
 
index 7b7c5b9..61f4bc2 100644 (file)
@@ -25,7 +25,7 @@ EXTRA_PROGRAMS = obdclass $(FSMOD) fsfilt_reiserfs
 
 obdclass_SOURCES = class_obd.c debug.c genops.c sysctl.c uuid.c simple.c
 obdclass_SOURCES += lprocfs_status.c lustre_handles.c lustre_peer.c
-obdclass_SOURCES += fsfilt.c statfs_pack.c
+obdclass_SOURCES += fsfilt.c statfs_pack.c otree.c
 endif
 
 include $(top_srcdir)/Rules
index 9619861..b497aa3 100644 (file)
@@ -465,6 +465,7 @@ int class_handle_ioctl(struct obd_class_user_state *ocus, unsigned int cmd,
                 obd->obd_minor = minor;
                 obd->obd_type = type;
                 INIT_LIST_HEAD(&obd->obd_exports);
+                obd->obd_num_exports = 0;
                 INIT_LIST_HEAD(&obd->obd_imports);
                 spin_lock_init(&obd->obd_dev_lock);
                 init_waitqueue_head(&obd->obd_refcount_waitq);
@@ -788,6 +789,7 @@ EXPORT_SYMBOL(class_unregister_type);
 EXPORT_SYMBOL(class_get_type);
 EXPORT_SYMBOL(class_put_type);
 EXPORT_SYMBOL(class_name2dev);
+EXPORT_SYMBOL(class_name2obd);
 EXPORT_SYMBOL(class_uuid2dev);
 EXPORT_SYMBOL(class_uuid2obd);
 EXPORT_SYMBOL(class_export_get);
index 9000771..cd6c856 100644 (file)
@@ -190,6 +190,14 @@ int class_name2dev(char *name)
         return -1;
 }
 
+struct obd_device *class_name2obd(char *name)
+{
+        int dev = class_name2dev(name);
+        if (dev < 0)
+                return NULL;
+        return &obd_dev[dev];
+}
+
 int class_uuid2dev(struct obd_uuid *uuid)
 {
         int i;
@@ -205,15 +213,10 @@ int class_uuid2dev(struct obd_uuid *uuid)
 
 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
 {
-        int i;
-
-        for (i = 0; i < MAX_OBD_DEVICES; i++) {
-                struct obd_device *obd = &obd_dev[i];
-                if (obd_uuid_equals(uuid, &obd->obd_uuid))
-                        return obd;
-        }
-
-        return NULL;
+        int dev = class_uuid2dev(uuid);
+        if (dev < 0)
+                return NULL;
+        return &obd_dev[dev];
 }
 
 void obd_cleanup_caches(void)
@@ -327,6 +330,7 @@ void class_export_put(struct obd_export *exp)
 {
         ENTRY;
 
+        LASSERT(exp);
         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
                atomic_read(&exp->exp_refcount) - 1);
         LASSERT(atomic_read(&exp->exp_refcount) > 0);
@@ -376,6 +380,7 @@ struct obd_export *class_new_export(struct obd_device *obddev)
         LASSERT(!obddev->obd_stopping); /* shouldn't happen, but might race */
         atomic_inc(&obddev->obd_refcount);
         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
+        export->exp_obd->obd_num_exports++;
         spin_unlock(&obddev->obd_dev_lock);
         return export;
 }
@@ -386,6 +391,7 @@ void class_unlink_export(struct obd_export *exp)
 
         spin_lock(&exp->exp_obd->obd_dev_lock);
         list_del_init(&exp->exp_obd_chain);
+        exp->exp_obd->obd_num_exports--;
         spin_unlock(&exp->exp_obd->obd_dev_lock);
 
         class_export_put(exp);
@@ -458,6 +464,9 @@ void class_destroy_import(struct obd_import *import)
 
         /* Abort any inflight DLM requests and NULL out their (about to be
          * freed) import. */
+        /* Invalidate all requests on import, would be better to call
+           ptlrpc_set_import_active(imp, 0); */
+        import->imp_generation++;
         ptlrpc_abort_inflight_superhack(import);
 
         class_import_put(import);
index 8ec50d8..4862cf3 100644 (file)
@@ -95,8 +95,12 @@ int lprocfs_add_vars(struct proc_dir_entry *root, struct lprocfs_vars *list,
                         if (next)
                                 cur_root = (proc ? proc :
                                                    proc_mkdir(cur, cur_root));
-                        else if (!proc)
-                                proc = create_proc_entry(cur, 0444, cur_root);
+                        else if (!proc) {
+                                mode_t mode = 0444;
+                                if (list->write_fptr)
+                                        mode = 0644;
+                                proc = create_proc_entry(cur, mode, cur_root);
+                        }
                 }
 
                 OBD_FREE(pathcopy, pathsize);
diff --git a/lustre/obdclass/otree.c b/lustre/obdclass/otree.c
new file mode 100644 (file)
index 0000000..16ef088
--- /dev/null
@@ -0,0 +1,266 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ *  Copyright (C) 2002, 2003  Cluster File Systems, Inc
+ *
+ *  our offset trees (otrees) track single-bit state of offsets in an
+ *  extent tree.  
+ */
+
+#define EXPORT_SYMTAB
+#include <linux/version.h>
+#include <linux/config.h>
+#include <linux/module.h>
+
+#define DEBUG_SUBSYSTEM S_OSC
+#include <linux/kp30.h>
+#include <linux/obd.h>
+#include <linux/lustre_debug.h>
+#include <linux/lustre_otree.h>
+
+struct offset_extent {
+        rb_node_t       oe_node;
+        unsigned long   oe_start, oe_end;
+};
+
+static struct offset_extent * ot_find_oe(rb_root_t *root,
+                                         struct offset_extent *needle)
+{
+        struct rb_node_s *node = root->rb_node;
+        struct offset_extent *oe;
+        ENTRY;
+
+        CDEBUG(D_INODE, "searching [%lu -> %lu]\n", needle->oe_start,
+               needle->oe_end);
+
+        while (node) {
+                oe = rb_entry(node, struct offset_extent, oe_node);
+                if (needle->oe_end < oe->oe_start)
+                        node = node->rb_left;
+                else if (needle->oe_start > oe->oe_end)
+                        node = node->rb_right;
+                else {
+                        CDEBUG(D_INODE, "returning [%lu -> %lu]\n",
+                               oe->oe_start, oe->oe_end);
+                        RETURN(oe);
+                }
+        }
+        RETURN(NULL);
+}
+
+/* do the rbtree mechanics to insert a node, callers are responsible
+ * for making sure that this new node doesn't overlap with existing
+ * nodes */
+static void ot_indert_oe(rb_root_t *root, struct offset_extent *new_oe)
+{
+        rb_node_t ** p = &root->rb_node;
+        rb_node_t * parent = NULL;
+        struct offset_extent *oe;
+        ENTRY;
+
+        LASSERT(new_oe->oe_start <= new_oe->oe_end);
+
+        while (*p) {
+                parent = *p;
+                oe = rb_entry(parent, struct offset_extent, oe_node);
+                if ( new_oe->oe_end < oe->oe_start )
+                        p = &(*p)->rb_left;
+                else if ( new_oe->oe_start > oe->oe_end )
+                        p = &(*p)->rb_right;
+                else
+                        LBUG();
+        }
+        rb_link_node(&new_oe->oe_node, parent, p);
+        rb_insert_color(&new_oe->oe_node, root);
+        EXIT;
+}
+
+int ot_mark_offset(struct otree *ot, unsigned long offset)
+{
+        struct offset_extent needle, *oe, *new_oe;
+        int rc = 0;
+        ENTRY;
+
+        OBD_ALLOC(new_oe, sizeof(*new_oe));
+        if (new_oe == NULL)
+                RETURN(-ENOMEM);
+
+        spin_lock(&ot->ot_lock);
+
+        /* find neighbours that we might glom on to */
+        needle.oe_start = (offset > 0) ? offset - 1 : offset;
+        needle.oe_end = (offset < ~0) ? offset + 1 : offset;
+        oe = ot_find_oe(&ot->ot_root, &needle);
+        if ( oe == NULL ) {
+                new_oe->oe_start = offset;
+                new_oe->oe_end = offset;
+                ot_indert_oe(&ot->ot_root, new_oe);
+                ot->ot_num_marked++;
+                new_oe = NULL;
+                GOTO(out, rc);
+        }
+
+        /* already recorded */
+        if ( offset >= oe->oe_start && offset <= oe->oe_end )
+                GOTO(out, rc);
+
+        /* ok, need to check for adjacent neighbours */
+        needle.oe_start = offset;
+        needle.oe_end = offset;
+        if (ot_find_oe(&ot->ot_root, &needle))
+                GOTO(out, rc);
+
+        /* ok, its safe to extend the oe we found */
+        if ( offset == oe->oe_start - 1 )
+                oe->oe_start--;
+        else if ( offset == oe->oe_end + 1 )
+                oe->oe_end++;
+        else
+                LBUG();
+        ot->ot_num_marked++;
+
+out:
+        CDEBUG(D_INODE, "%lu now dirty\n", ot->ot_num_marked);
+        spin_unlock(&ot->ot_lock);
+        if (new_oe)
+                OBD_FREE(new_oe, sizeof(*new_oe));
+        RETURN(rc);
+}
+
+int ot_clear_extent(struct otree *ot, unsigned long start, unsigned long end)
+{
+        struct offset_extent needle, *oe, *new_oe;
+        int rc = 0;
+        ENTRY;
+
+        /* will allocate more intelligently later */
+        OBD_ALLOC(new_oe, sizeof(*new_oe));
+        if (new_oe == NULL)
+                RETURN(-ENOMEM);
+
+        needle.oe_start = start;
+        needle.oe_end = end;
+
+        spin_lock(&ot->ot_lock);
+        for ( ; (oe = ot_find_oe(&ot->ot_root, &needle)) ; ) {
+                rc = 0;
+
+                /* see if we're punching a hole and need to create a node */
+                if (oe->oe_start < start && oe->oe_end > end) {
+                        new_oe->oe_start = end + 1;
+                        new_oe->oe_end = oe->oe_end;
+                        oe->oe_end = start - 1;
+                        ot_indert_oe(&ot->ot_root, new_oe);
+                        new_oe = NULL;
+                        ot->ot_num_marked -= end - start + 1;
+                        break;
+                }
+
+                /* overlapping edges */
+                if (oe->oe_start < start && oe->oe_end <= end) {
+                        ot->ot_num_marked -= oe->oe_end - start + 1;
+                        oe->oe_end = start - 1;
+                        oe = NULL;
+                        continue;
+                }
+                if (oe->oe_end > end && oe->oe_start >= start) {
+                        ot->ot_num_marked -= end - oe->oe_start + 1;
+                        oe->oe_start = end + 1;
+                        oe = NULL;
+                        continue;
+                }
+
+                /* an extent entirely within the one we're clearing */
+                rb_erase(&oe->oe_node, &ot->ot_root);
+                ot->ot_num_marked -= oe->oe_end - oe->oe_start + 1;
+                spin_unlock(&ot->ot_lock);
+                OBD_FREE(oe, sizeof(*oe));
+                spin_lock(&ot->ot_lock);
+        }
+        CDEBUG(D_INODE, "%lu now dirty\n", ot->ot_num_marked);
+        spin_unlock(&ot->ot_lock);
+        if (new_oe)
+                OBD_FREE(new_oe, sizeof(*new_oe));
+        RETURN(rc);
+}
+
+int ot_find_marked_extent(struct otree *ot, unsigned long *start,
+                  unsigned long *end)
+{
+        struct offset_extent needle, *oe;
+        int rc = -ENOENT;
+        ENTRY;
+
+        needle.oe_start = *start;
+        needle.oe_end = *end;
+
+        spin_lock(&ot->ot_lock);
+        oe = ot_find_oe(&ot->ot_root, &needle);
+        if (oe) {
+                *start = oe->oe_start;
+                *end = oe->oe_end;
+                rc = 0;
+        }
+        spin_unlock(&ot->ot_lock);
+
+        RETURN(rc);
+}
+
+int ot_last_marked(struct otree *ot, unsigned long *last)
+{
+        struct rb_node_s *found, *node;
+        struct offset_extent *oe;
+        int rc = -ENOENT;
+        ENTRY;
+
+        spin_lock(&ot->ot_lock);
+        for (node = ot->ot_root.rb_node, found = NULL;
+             node;
+             found = node, node = node->rb_right)
+                ;
+
+        if (found) {
+                oe = rb_entry(found, struct offset_extent, oe_node);
+                *last = oe->oe_end;
+                rc = 0;
+        }
+        spin_unlock(&ot->ot_lock);
+        RETURN(rc);
+}
+
+unsigned long ot_num_marked(struct otree *ot)
+{
+        return ot->ot_num_marked;
+}
+
+void ot_init(struct otree *ot)
+{
+        CDEBUG(D_INODE, "initializing %p\n", ot);
+        spin_lock_init(&ot->ot_lock);
+        ot->ot_num_marked = 0;
+        ot->ot_root.rb_node = NULL;
+}
+
+EXPORT_SYMBOL(ot_mark_offset);
+EXPORT_SYMBOL(ot_clear_extent);
+EXPORT_SYMBOL(ot_find_marked_extent);
+EXPORT_SYMBOL(ot_last_marked);
+EXPORT_SYMBOL(ot_num_marked);
+EXPORT_SYMBOL(ot_init);
index 1a5f6fa..786a768 100644 (file)
@@ -66,7 +66,6 @@ void statfs_unpack(struct statfs *sfs, struct obd_statfs *osfs)
 
 int obd_self_statfs(struct obd_device *obd, struct statfs *sfs)
 {
-        struct lustre_handle conn;
         struct obd_export *export, *my_export = NULL;
         struct obd_statfs osfs = { 0 };
         int rc;
@@ -86,9 +85,8 @@ int obd_self_statfs(struct obd_device *obd, struct statfs *sfs)
                 export = class_export_get(export);
                 spin_unlock(&obd->obd_dev_lock);
         }
-        conn.cookie = export->exp_handle.h_cookie;
 
-        rc = obd_statfs(&conn, &osfs);
+        rc = obd_statfs(export, &osfs);
         if (!rc)
                 statfs_unpack(sfs, &osfs);
 
index 603a166..f89df07 100644 (file)
@@ -232,10 +232,10 @@ static int echo_setattr(struct lustre_handle *conn, struct obdo *oa,
 /* This allows us to verify that desc_private is passed unmolested */
 #define DESC_PRIV 0x10293847
 
-int echo_preprw(int cmd, struct obd_export *export, int objcount,
-                struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb,
-                struct niobuf_local *res, void **desc_private,
-                struct obd_trans_info *oti)
+int echo_preprw(int cmd, struct obd_export *export, struct obdo *oa,
+                int objcount, struct obd_ioobj *obj, int niocount,
+                struct niobuf_remote *nb, struct niobuf_local *res,
+                void **desc_private, struct obd_trans_info *oti)
 {
         struct obd_device *obd;
         struct niobuf_local *r = res;
index 1d8233b..79da7ea 100644 (file)
@@ -969,27 +969,21 @@ static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
         struct obd_ioctl_data* data = buf;
         struct echo_client_obd *ec = &obddev->u.echo_client;
         struct obd_device *tgt;
-        struct obd_uuid uuid;
         struct lov_stripe_md *lsm = NULL;
         struct obd_uuid echo_uuid = { "ECHO_UUID" };
         int rc;
         ENTRY;
 
         if (data->ioc_inllen1 < 1) {
-                CERROR("requires a TARGET OBD UUID\n");
-                RETURN(-EINVAL);
-        }
-        if (data->ioc_inllen1 > 37) {
-                CERROR("OBD UUID must be less than 38 characters\n");
+                CERROR("requires a TARGET OBD name\n");
                 RETURN(-EINVAL);
         }
 
-        obd_str2uuid(&uuid, data->ioc_inlbuf1);
-        tgt = class_uuid2obd(&uuid);
+        tgt = class_name2obd(data->ioc_inlbuf1);
         if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
-                CERROR("device not attached or not set up (%d)\n",
-                       data->ioc_dev);
-                RETURN(rc = -EINVAL);
+                CERROR("device not attached or not set up (%d/%s)\n",
+                       data->ioc_dev, data->ioc_inlbuf1);
+                RETURN(-EINVAL);
         }
 
         spin_lock_init (&ec->ec_lock);
index e6c223c..6f2d96c 100644 (file)
@@ -156,44 +156,37 @@ int filter_finish_transno(struct obd_export *export, void *handle,
                 RETURN(rc);
 
         /* we don't allocate new transnos for replayed requests */
-#if 0
-        /* perhaps if transno already set? or should level be in oti? */
-        if (req->rq_level == LUSTRE_CONN_RECOVD)
-                GOTO(out, rc = 0);
-#endif
-
-        off = fed->fed_lr_off;
-
-        spin_lock(&filter->fo_translock);
-        last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd);
-        filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd + 1);
-        spin_unlock(&filter->fo_translock);
-        if (oti)
+        if (oti && oti->oti_transno == 0) {
+                spin_lock(&filter->fo_translock);
+                last_rcvd = le64_to_cpu(filter->fo_fsd->fsd_last_rcvd) + 1;
+                filter->fo_fsd->fsd_last_rcvd = cpu_to_le64(last_rcvd);
+                spin_unlock(&filter->fo_translock);
                 oti->oti_transno = last_rcvd;
-        fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
-        fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
-
-        /* get this from oti */
-#if 0
-        if (oti)
-                fcd->fcd_last_xid = cpu_to_le64(oti->oti_xid);
-        else
-#else
-        fcd->fcd_last_xid = 0;
-#endif
-        fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_commit_cb);
-        written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, sizeof(*fcd),
-                                &off);
-        CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
-               LPSZ"\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, written);
+                fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
+                fcd->fcd_mount_count = filter->fo_fsd->fsd_mount_count;
+
+                /* could get xid from oti, if it's ever needed */
+                fcd->fcd_last_xid = 0;
+
+                off = fed->fed_lr_off;
+                fsfilt_set_last_rcvd(obd, last_rcvd, handle, filter_commit_cb);
+                written = lustre_fwrite(filter->fo_rcvd_filp, (char *)fcd, 
+                                        sizeof(*fcd), &off);
+                CDEBUG(D_HA, "wrote trans #"LPD64" for client %s at #%d: "
+                       "written = "LPSZ"\n", last_rcvd, fcd->fcd_uuid, 
+                       fed->fed_lr_idx, written);
+
+                if (written == sizeof(*fcd))
+                        RETURN(0);
+                CERROR("error writing to last_rcvd file: rc = %d\n", 
+                       (int)written);
+                if (written >= 0)
+                        RETURN(-EIO);
 
-        if (written == sizeof(*fcd))
-                RETURN(0);
-        CERROR("error writing to last_rcvd file: rc = %d\n", (int)written);
-        if (written >= 0)
-                RETURN(-EIO);
+                RETURN(written);
+        }                 
 
-        RETURN(written);
+        RETURN(0);
 }
 
 static inline void f_dput(struct dentry *dentry)
@@ -237,7 +230,7 @@ int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
 
         LASSERT(bitmap != NULL);
 
-        /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
+        /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
         if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
                 RETURN(0);
 
@@ -319,14 +312,17 @@ int filter_client_free(struct obd_export *exp, int failover)
         struct obd_run_ctxt saved;
         int written;
         loff_t off;
+        ENTRY;
 
         if (!fed->fed_fcd)
                 RETURN(0);
 
-        if (failover != 0) {
-                OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
-                RETURN(0);
-        }
+        if (failover != 0)
+                GOTO(free, 0);
+
+        /* XXX if fcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
+        if (!strcmp(fed->fed_fcd->fcd_uuid, "OBD_CLASS_UUID"))
+                GOTO(free, 0);
 
         LASSERT(filter->fo_last_rcvd_slots != NULL);
 
@@ -362,9 +358,10 @@ int filter_client_free(struct obd_export *exp, int failover)
                        fed->fed_fcd->fcd_uuid, fed->fed_lr_idx,fed->fed_lr_off);
         }
 
+free:
         OBD_FREE(fed->fed_fcd, sizeof(*fed->fed_fcd));
 
-        return 0;
+        RETURN(0);
 }
 
 static int filter_free_server_data(struct filter_obd *filter)
@@ -1126,7 +1123,7 @@ static int filter_close_internal(struct obd_export *exp,
         struct filter_dentry_data *fdd = dchild->d_fsdata;
         struct lustre_handle parent_lockh;
         int rc, rc2, cleanup_phase = 0;
-        struct dentry *dparent;
+        struct dentry *dparent = NULL;
         struct obd_run_ctxt saved;
         ENTRY;
 
@@ -2174,7 +2171,7 @@ static int filter_commit_write(struct niobuf_local *lnb, int err)
         return lustre_commit_write(lnb);
 }
 
-static int filter_preprw(int cmd, struct obd_export *exp,
+static int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
                          int objcount, struct obd_ioobj *obj,
                          int niocount, struct niobuf_remote *nb,
                          struct niobuf_local *res, void **desc_private,
@@ -2561,7 +2558,7 @@ static int filter_brw(int cmd, struct lustre_handle *conn,
         ioo.ioo_type = S_IFREG;
         ioo.ioo_bufcnt = oa_bufs;
 
-        ret = filter_preprw(cmd, export, 1, &ioo, oa_bufs, rnb, lnb,
+        ret = filter_preprw(cmd, export, NULL, 1, &ioo, oa_bufs, rnb, lnb,
                             &desc_private, oti);
         if (ret != 0)
                 GOTO(out, ret);
@@ -2664,13 +2661,11 @@ out:
         RETURN(rc);
 }
 
-static int filter_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
+static int filter_statfs(struct obd_export *exp, struct obd_statfs *osfs)
 {
-        struct obd_device *obd;
+        struct obd_device *obd = exp->exp_obd;
         ENTRY;
 
-        obd = class_conn2obd(conn);
-
         RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs));
 }
 
index 9e46952..4bda8de 100644 (file)
@@ -47,7 +47,9 @@
 
 #include <linux/kp30.h>
 #include <linux/lustre_mds.h> /* for mds_objid */
+#include <linux/lustre_otree.h>
 #include <linux/obd_ost.h>
+#include <linux/obd_lov.h>
 
 #ifndef  __CYGWIN__
 #include <linux/ctype.h>
@@ -127,7 +129,7 @@ static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
                 }
         }
 
-        lsm_size = sizeof(**lsmp);
+        lsm_size = lov_stripe_md_size(1);
         if (!lsmp)
                 RETURN(lsm_size);
 
@@ -141,15 +143,20 @@ static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
                 OBD_ALLOC(*lsmp, lsm_size);
                 if (!*lsmp)
                         RETURN(-ENOMEM);
+
+                (*lsmp)->lsm_oinfo[0].loi_dirty_ot =
+                        &(*lsmp)->lsm_oinfo[0].loi_dirty_ot_inline;
+                ot_init((*lsmp)->lsm_oinfo[0].loi_dirty_ot);
         }
 
         if (lmm) {
                 /* XXX zero *lsmp? */
                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
-                (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
                 LASSERT((*lsmp)->lsm_object_id);
         }
 
+        (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+
         RETURN(lsm_size);
 }
 
@@ -169,8 +176,7 @@ static int osc_getattr_interpret(struct ptlrpc_request *req,
                 RETURN (rc);
         }
 
-        body = lustre_swab_repbuf (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
+        body = lustre_swab_repbuf(req, 0, sizeof (*body), lustre_swab_ost_body);
         if (body == NULL) {
                 CERROR ("can't unpack ost_body\n");
                 RETURN (-EPROTO);
@@ -520,6 +526,10 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa,
         oa->o_blksize = OSC_BRW_MAX_SIZE;
         oa->o_valid |= OBD_MD_FLBLKSZ;
 
+        /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
+         * have valid lsm_oinfo data structs, so don't go touching that.
+         * This needs to be fixed in a big way.
+         */
         lsm->lsm_object_id = oa->o_id;
         lsm->lsm_stripe_count = 0;
         lsm->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
@@ -627,6 +637,40 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
         return rc;
 }
 
+static void osc_announce_cached(struct client_obd *cli, struct ost_body *body)
+{
+        obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLRDEV;
+
+        LASSERT(!(body->oa.o_valid & bits));
+
+        body->oa.o_valid |= bits;
+        down(&cli->cl_dirty_sem);
+        body->oa.o_blocks = cli->cl_dirty;
+        body->oa.o_rdev = cli->cl_dirty_granted;
+        up(&cli->cl_dirty_sem);
+        CDEBUG(D_INODE, "announcing "LPU64" dirty "LPU64" granted\n",
+               cli->cl_dirty, cli->cl_dirty_granted);
+}
+
+static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
+{
+        if(!(body->oa.o_valid & OBD_MD_FLRDEV)) {
+                if (cli->cl_ost_can_grant) {
+                        CDEBUG(D_INODE, "%s can't grant\n",
+                               cli->cl_import->imp_target_uuid.uuid);
+                }
+                cli->cl_ost_can_grant = 0;
+                return;
+        }
+
+        CDEBUG(D_INODE, "got "LPU64" grant\n", body->oa.o_rdev);
+        down(&cli->cl_dirty_sem);
+        cli->cl_dirty_granted = body->oa.o_rdev;
+        /* XXX check for over-run and wake up the io thread that
+         * doesn't exist yet */
+        up(&cli->cl_dirty_sem);
+}
+
 /* We assume that the reason this OSC got a short read is because it read
  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
  * via the LOV, and it _knows_ it's reading inside the file, it's just that
@@ -710,9 +754,10 @@ static inline int can_merge_pages (struct brw_page *p1, struct brw_page *p2)
 }
 
 #if CHECKSUM_BULK
-static __u64 cksum_pages(int nob, obd_count page_count, struct brw_page *pga)
+static obd_count cksum_pages(int nob, obd_count page_count,
+                             struct brw_page *pga)
 {
-        __u64 cksum = 0;
+        obd_count cksum = 0;
         char *ptr;
         int   i;
 
@@ -741,6 +786,7 @@ static int osc_brw_prep_request(struct obd_import *imp,
 {
         struct ptlrpc_request   *req;
         struct ptlrpc_bulk_desc *desc;
+        struct client_obd       *cli = &imp->imp_obd->u.cli;
         struct ost_body         *body;
         struct obd_ioobj        *ioobj;
         struct niobuf_remote    *niobuf;
@@ -802,8 +848,7 @@ static int osc_brw_prep_request(struct obd_import *imp,
 
                 requested_nob += pg->count;
 
-                if (i > 0 &&
-                    can_merge_pages (pg_prev, pg)) {
+                if (i > 0 && can_merge_pages (pg_prev, pg)) {
                         niobuf--;
                         niobuf->len += pg->count;
                 } else {
@@ -818,8 +863,9 @@ static int osc_brw_prep_request(struct obd_import *imp,
 #if CHECKSUM_BULK
         body->oa.o_valid |= OBD_MD_FLCKSUM;
         if (opc == OST_BRW_WRITE)
-                body->oa.o_rdev = cksum_pages (requested_nob, page_count, pga);
+                body->oa.o_nlink = cksum_pages (requested_nob, page_count, pga);
 #endif
+        osc_announce_cached(cli, body);
         spin_lock_irqsave (&req->rq_lock, flags);
         req->rq_no_resend = 1;
         spin_unlock_irqrestore (&req->rq_lock, flags);
@@ -849,9 +895,18 @@ static int osc_brw_fini_request (struct ptlrpc_request *req,
                                  obd_count page_count, struct brw_page *pga,
                                  int rc)
 {
+        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+        struct ost_body *body;
         if (rc < 0)
                 return (rc);
 
+        body = lustre_swab_repbuf(req, 0, sizeof (*body), lustre_swab_ost_body);
+        if (body == NULL) {
+                CERROR ("Can't unpack body\n");
+                RETURN(-EPROTO);
+        }
+        osc_update_grant(cli, body);
+
         if (req->rq_reqmsg->opc == OST_WRITE) {
                 if (rc > 0) {
                         CERROR ("Unexpected +ve rc %d\n", rc);
@@ -868,18 +923,13 @@ static int osc_brw_fini_request (struct ptlrpc_request *req,
         }
 
         if (rc < requested_nob)
-                handle_short_read (rc, page_count, pga);
+                handle_short_read(rc, page_count, pga);
 
 #if CHECKSUM_BULK
-        imp = req->rq_import;
-        body = lustre_swab_repmsg (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
-        if (body == NULL) {
-                CERROR ("Can't unpack body\n");
-        } else if (body->oa.o_valid & OBD_MD_FLCKSUM) {
+        if (body->oa.o_valid & OBD_MD_FLCKSUM) {
                 static int cksum_counter;
-                __u64 server_cksum = body->oa.o_rdev;
-                __u64 cksum = cksum_pages (rc, page_count, pga);
+                obd_count server_cksum = body->oa.o_nlink;
+                obd_count cksum = cksum_pages(rc, page_count, pga);
 
                 cksum_counter++;
                 if (server_cksum != cksum) {
@@ -888,7 +938,7 @@ static int osc_brw_fini_request (struct ptlrpc_request *req,
                                imp->imp_connection->c_peer.peer_nid);
                         cksum_counter = 0;
                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
-                        CERROR("Checksum %u from "LPX64" OK: "LPX64"\n",
+                        CERROR("Checksum %u from "LPX64" OK: %x\n",
                                cksum_counter,
                                imp->imp_connection->c_peer.peer_nid, cksum);
         } else {
@@ -1395,6 +1445,80 @@ static int sanosc_brw(int cmd, struct lustre_handle *conn,
 #endif
 #endif
 
+static int osc_mark_page_dirty(struct lustre_handle *conn, 
+                               struct lov_stripe_md *lsm, unsigned long offset)
+{
+        struct client_obd *cli = &class_conn2obd(conn)->u.cli;
+        struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
+        int rc;
+        ENTRY;
+
+        down(&cli->cl_dirty_sem);
+
+        if (cli->cl_ost_can_grant && 
+            (cli->cl_dirty + PAGE_CACHE_SIZE >= cli->cl_dirty_granted)) {
+                CDEBUG(D_INODE, "granted "LPU64" < "LPU64"\n",
+                       cli->cl_dirty_granted, cli->cl_dirty + PAGE_CACHE_SIZE);
+                GOTO(out, rc = -EDQUOT);
+        }
+
+        rc = ot_mark_offset(dirty_ot, offset);
+        if (rc)
+                GOTO(out, rc);
+
+        cli->cl_dirty += PAGE_CACHE_SIZE;
+        CDEBUG(D_INODE, "dirtied off %lu, now "LPU64" bytes dirty\n",
+                        offset, cli->cl_dirty);
+out:
+        up(&cli->cl_dirty_sem);
+        RETURN(rc);
+}
+
+static int osc_clear_dirty_pages(struct lustre_handle *conn, 
+                                 struct lov_stripe_md *lsm,
+                                 unsigned long start, unsigned long end,
+                                 unsigned long *cleared)
+{
+        struct client_obd *cli = &class_conn2obd(conn)->u.cli;
+        struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
+        unsigned long old_marked, new_marked;
+        int rc;
+        ENTRY;
+
+        down(&cli->cl_dirty_sem);
+
+        old_marked = ot_num_marked(dirty_ot);
+
+        rc = ot_clear_extent(dirty_ot, start, end);
+        if (rc)
+                GOTO(out, rc);
+
+        new_marked = ot_num_marked(dirty_ot);
+
+        LASSERT(new_marked <= old_marked);
+        LASSERT(old_marked * PAGE_CACHE_SIZE <= cli->cl_dirty);
+        *cleared = old_marked - new_marked;
+        cli->cl_dirty -= (__u64)*cleared << PAGE_CACHE_SHIFT;
+        CDEBUG(D_INODE, "cleared [%lu,%lu], now "LPU64" bytes dirty\n",
+                        start, end, cli->cl_dirty);
+
+out:
+        up(&cli->cl_dirty_sem);
+        RETURN(rc);
+}
+
+static int osc_last_dirty_offset(struct lustre_handle *conn,
+                                 struct lov_stripe_md *lsm,
+                                 unsigned long *offset)
+{
+        struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
+        int rc;
+        ENTRY;
+
+        rc = ot_last_marked(dirty_ot, offset);
+        RETURN(rc);
+}
+
 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
                        struct lustre_handle *parent_lock,
                        __u32 type, void *extentp, int extent_len, __u32 mode,
@@ -1511,15 +1635,15 @@ static int osc_cancel_unused(struct lustre_handle *connh,
                                       opaque);
 }
 
-static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
+static int osc_statfs(struct obd_export *exp, struct obd_statfs *osfs)
 {
         struct obd_statfs *msfs;
         struct ptlrpc_request *request;
         int rc, size = sizeof(*osfs);
         ENTRY;
 
-        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
-                                  NULL);
+        request = ptlrpc_prep_req(exp->exp_obd->u.cli.cl_import, OST_STATFS, 0, 
+                                  NULL, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -1711,7 +1835,10 @@ struct obd_ops osc_obd_ops = {
         o_cancel:       osc_cancel,
         o_cancel_unused: osc_cancel_unused,
         o_iocontrol:    osc_iocontrol,
-        o_get_info:     osc_get_info
+        o_get_info:     osc_get_info,
+        .o_mark_page_dirty =    osc_mark_page_dirty,
+        .o_clear_dirty_pages =  osc_clear_dirty_pages,
+        .o_last_dirty_offset =  osc_last_dirty_offset,
 };
 
 struct obd_ops sanosc_obd_ops = {
@@ -1741,6 +1868,9 @@ struct obd_ops sanosc_obd_ops = {
         o_cancel:       osc_cancel,
         o_cancel_unused: osc_cancel_unused,
         o_iocontrol:    osc_iocontrol,
+        .o_mark_page_dirty =    osc_mark_page_dirty,
+        .o_clear_dirty_pages =  osc_clear_dirty_pages,
+        .o_last_dirty_offset =  osc_last_dirty_offset,
 };
 
 int __init osc_init(void)
index 7569a7a..023deb2 100644 (file)
 #include <linux/init.h>
 #include <linux/lprocfs_status.h>
 
+inline void oti_init(struct obd_trans_info *oti,
+                           struct ptlrpc_request *req)
+{
+        if(oti == NULL)
+                return;
+        memset(oti, 0, sizeof *oti);
+
+        
+        if (req->rq_repmsg && req->rq_reqmsg != 0)
+                oti->oti_transno = req->rq_repmsg->transno;
+
+        EXIT;
+}
+
 inline void oti_to_request(struct obd_trans_info *oti,
                            struct ptlrpc_request *req)
 {
@@ -108,7 +122,6 @@ static int ost_getattr(struct ptlrpc_request *req)
 
 static int ost_statfs(struct ptlrpc_request *req)
 {
-        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct obd_statfs *osfs;
         int rc, size = sizeof(*osfs);
         ENTRY;
@@ -120,7 +133,7 @@ static int ost_statfs(struct ptlrpc_request *req)
         osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*osfs));
         memset(osfs, 0, size);
 
-        req->rq_status = obd_statfs(conn, osfs);
+        req->rq_status = obd_statfs(req->rq_export, osfs);
         if (req->rq_status != 0)
                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
 
@@ -453,7 +466,7 @@ static int ost_brw_read(struct ptlrpc_request *req)
         if (desc == NULL)
                 GOTO(out_local, rc = -ENOMEM);
 
-        rc = obd_preprw(OBD_BRW_READ, req->rq_export, 1, ioo, npages,
+        rc = obd_preprw(OBD_BRW_READ, req->rq_export, NULL, 1, ioo, npages,
                         pp_rnb, local_nb, &desc_priv, NULL);
         if (rc != 0)
                 GOTO(out_bulk, rc);
@@ -627,7 +640,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (desc == NULL)
                 GOTO(out_local, rc = -ENOMEM);
 
-        rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, objcount, ioo,
+        rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, NULL, objcount, ioo,
                         npages, pp_rnb, local_nb, &desc_priv, oti);
         if (rc != 0)
                 GOTO (out_bulk, rc);
@@ -857,7 +870,8 @@ static int filter_recovery_request(struct ptlrpc_request *req,
 
 static int ost_handle(struct ptlrpc_request *req)
 {
-        struct obd_trans_info trans_info = { 0, }, *oti = &trans_info;
+        struct obd_trans_info trans_info = { 0, };
+        struct obd_trans_info *oti = &trans_info;
         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
         ENTRY;
 
@@ -892,6 +906,8 @@ static int ost_handle(struct ptlrpc_request *req)
         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
                 GOTO(out, rc = -EINVAL);
 
+        oti_init(oti, req);
+
         switch (req->rq_reqmsg->opc) {
         case OST_CONNECT:
                 CDEBUG(D_INODE, "connect\n");
index 5e3fcb5..14cc325 100644 (file)
@@ -429,6 +429,7 @@ static int kportal_ioctl(struct inode *inode, struct file *file,
                         return (-EINVAL);
 
                 err = PtlFailNid (*nip, data->ioc_nid, data->ioc_count);
+                kportal_put_ni (data->ioc_nal);
                 break;
         }
 
index f36a3c7..af76523 100644 (file)
@@ -169,7 +169,7 @@ int ptlbd_cl_disconnect(struct lustre_handle *conn, int failover)
                 GOTO(out_req, rc = -ENOMEM);
 
         request->rq_replen = lustre_msg_size(0, NULL);
-        request->rq_level = LUSTRE_CONN_RECOVD;
+        request->rq_level = LUSTRE_CONN_RECOVER;
 
         rc = ptlrpc_queue_wait(request);
 
index c79329c..a98af3e 100644 (file)
@@ -109,7 +109,6 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
                                                int type, int portal)
 {
         struct obd_import       *imp = req->rq_import;
-        unsigned long            flags;
         struct ptlrpc_bulk_desc *desc;
 
         LASSERT (type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
@@ -118,13 +117,7 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
         if (desc == NULL)
                 RETURN(NULL);
 
-        /* Is this sampled at the right place?  Do we want to get the import
-         * generation just before we send?  Should it match the generation of
-         * the request? */
-        spin_lock_irqsave(&imp->imp_lock, flags);
-        desc->bd_import_generation = imp->imp_generation;
-        spin_unlock_irqrestore(&imp->imp_lock, flags);
-
+        desc->bd_import_generation = req->rq_import_generation;
         desc->bd_import = class_import_get(imp);
         desc->bd_req = req;
         desc->bd_type = type;
@@ -449,13 +442,7 @@ static int after_reply(struct ptlrpc_request *req, int *restartp)
                         RETURN(-ENOTCONN);
                 }
 
-                rc = ptlrpc_request_handle_eviction(req);
-                if (rc)
-                        CERROR("can't reconnect to %s@%s: %d\n",
-                               imp->imp_target_uuid.uuid,
-                               imp->imp_connection->c_remote_uuid.uuid, rc);
-                else
-                        ptlrpc_wake_delayed(imp);
+                ptlrpc_request_handle_eviction(req);
 
                 if (req->rq_err)
                         RETURN(-EIO);
@@ -486,15 +473,6 @@ static int after_reply(struct ptlrpc_request *req, int *restartp)
 
                 /* Replay-enabled imports return commit-status information. */
                 if (req->rq_repmsg->last_committed) {
-                        if (req->rq_repmsg->last_committed <
-                            imp->imp_peer_committed_transno) {
-                                CERROR("%s went back in time (transno "LPD64
-                                       " was committed, server claims "LPD64
-                                       ")! is shared storage not coherent?\n",
-                                       imp->imp_target_uuid.uuid,
-                                       imp->imp_peer_committed_transno,
-                                       req->rq_repmsg->last_committed);
-                        }
                         imp->imp_peer_committed_transno =
                                 req->rq_repmsg->last_committed;
                 }
@@ -505,7 +483,7 @@ static int after_reply(struct ptlrpc_request *req, int *restartp)
         RETURN(rc);
 }
 
-static int check_set(struct ptlrpc_request_set *set)
+int ptlrpc_check_set(struct ptlrpc_request_set *set)
 {
         unsigned long flags;
         struct list_head *tmp;
@@ -574,6 +552,15 @@ static int check_set(struct ptlrpc_request_set *set)
                                 list_del(&req->rq_list);
                                 list_add_tail(&req->rq_list,
                                               &imp->imp_sending_list);
+
+                                if (req->rq_import_generation < 
+                                    imp->imp_generation) {
+                                        req->rq_status = -EIO;
+                                        req->rq_phase = RQ_PHASE_INTERPRET;
+                                        spin_unlock_irqrestore(&imp->imp_lock, 
+                                                               flags);
+                                        GOTO (interpret, req->rq_status);
+                                }
                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
 
                                 req->rq_waiting = 0;
@@ -641,6 +628,7 @@ static int check_set(struct ptlrpc_request_set *set)
                 LASSERT (req->rq_phase == RQ_PHASE_INTERPRET);
                 LASSERT (!req->rq_receiving_reply);
 
+                ptlrpc_unregister_reply(req);
                 if (req->rq_bulk != NULL)
                         ptlrpc_unregister_bulk (req);
 
@@ -666,7 +654,7 @@ static int check_set(struct ptlrpc_request_set *set)
         RETURN (set->set_remaining == 0);
 }
 
-static int expire_one_request(struct ptlrpc_request *req)
+int ptlrpc_expire_one_request(struct ptlrpc_request *req)
 {
         unsigned long      flags;
         struct obd_import *imp = req->rq_import;
@@ -724,7 +712,7 @@ static int expired_set(void *data)
                         continue;
 
                 /* deal with this guy */
-                expire_one_request (req);
+                ptlrpc_expire_one_request (req);
         }
 
         /* When waiting for a whole set, we always to break out of the
@@ -787,6 +775,8 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                         continue;
                 }
 
+                req->rq_import_generation = imp->imp_generation;
+
                 if (req->rq_level > imp->imp_level) {
                         if (req->rq_no_recov || imp->imp_obd->obd_no_recov ||
                             imp->imp_dlm_fake) {
@@ -809,7 +799,6 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                 /* XXX this is the same as ptlrpc_queue_wait */
                 LASSERT(list_empty(&req->rq_list));
                 list_add_tail(&req->rq_list, &imp->imp_sending_list);
-                req->rq_import_generation = imp->imp_generation;
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
 
                 CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
@@ -853,9 +842,9 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
                  * req times out */
                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
                        set, timeout);
-                lwi = LWI_TIMEOUT_INTR(timeout ? timeout * HZ : 1,
+                lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
                                        expired_set, interrupted_set, set);
-                rc = l_wait_event(set->set_waitq, check_set(set), &lwi);
+                rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
 
                 LASSERT (rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
 
@@ -951,7 +940,8 @@ static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
         if (request == NULL)
                 RETURN(1);
 
-        if (request == (void *)(long)(0x5a5a5a5a5a5a5a5a)) {
+        if (request == (void *)(long)(0x5a5a5a5a5a5a5a5a) || 
+            request->rq_obd == (void *)(long)(0x5a5a5a5a5a5a5a5a)) {
                 CERROR("dereferencing freed request (bug 575)\n");
                 LBUG();
                 RETURN(1);
@@ -1074,6 +1064,11 @@ void ptlrpc_free_committed(struct obd_import *imp)
                 LASSERT (req != last_req);
                 last_req = req;
 
+                if (req->rq_import_generation < imp->imp_generation) {
+                        DEBUG_REQ(D_HA, req, "freeing request with old gen");
+                        GOTO(free_req, 0);
+                }
+
                 if (req->rq_replay) {
                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
                         continue;
@@ -1087,6 +1082,7 @@ void ptlrpc_free_committed(struct obd_import *imp)
 
                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
                           imp->imp_peer_committed_transno);
+free_req:
                 list_del_init(&req->rq_list);
                 __ptlrpc_req_finished(req, 1);
         }
@@ -1146,7 +1142,7 @@ static int expired_request(void *data)
         struct ptlrpc_request *req = data;
         ENTRY;
 
-        RETURN(expire_one_request(req));
+        RETURN(ptlrpc_expire_one_request(req));
 }
 
 static void interrupted_request(void *data)
@@ -1234,6 +1230,8 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         /* Mark phase here for a little debug help */
         req->rq_phase = RQ_PHASE_RPC;
 
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        req->rq_import_generation = imp->imp_generation;
 restart:
         /*
          * If the import has been invalidated (such as by an OST failure), the
@@ -1241,13 +1239,18 @@ restart:
          * through, though, so that they have a chance to revalidate the
          * import.
          */
-        spin_lock_irqsave(&imp->imp_lock, flags);
         if (req->rq_import->imp_invalid && req->rq_level == LUSTRE_CONN_FULL) {
                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID:");
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
                 GOTO (out, rc = -EIO);
         }
 
+        if (req->rq_import_generation < imp->imp_generation) {
+                DEBUG_REQ(D_ERROR, req, "req old gen:");
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+                GOTO (out, rc = -EIO);
+        }
+
         if (req->rq_level > imp->imp_level) {
                 list_del(&req->rq_list);
                 if (req->rq_no_recov || obd->obd_no_recov ||
@@ -1272,9 +1275,11 @@ restart:
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 list_del_init(&req->rq_list);
 
-                if (req->rq_err)
+                if (req->rq_err || 
+                    req->rq_import_generation < imp->imp_generation)
                         rc = -EIO;
 
+
                 if (rc) {
                         spin_unlock_irqrestore(&imp->imp_lock, flags);
                         GOTO (out, rc);
@@ -1286,7 +1291,6 @@ restart:
         /* XXX this is the same as ptlrpc_set_wait */
         LASSERT(list_empty(&req->rq_list));
         list_add_tail(&req->rq_list, &imp->imp_sending_list);
-        req->rq_import_generation = imp->imp_generation;
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
         rc = ptl_send_rpc(req);
@@ -1376,6 +1380,7 @@ restart:
                         ptlrpc_unregister_bulk (req);
 
                 DEBUG_REQ(D_HA, req, "resending: ");
+                spin_lock_irqsave(&imp->imp_lock, flags);
                 goto restart;
         }
 
@@ -1404,6 +1409,7 @@ restart:
                 if (req->rq_bulk != NULL)
                         ptlrpc_unregister_bulk (req);
                 DEBUG_REQ(D_HA, req, "resending: ");
+                spin_lock_irqsave(&imp->imp_lock, flags);
                 goto restart;
         }
 
@@ -1457,7 +1463,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         old_level = req->rq_level;
         if (req->rq_replied)
                 old_status = req->rq_repmsg->status;
-        req->rq_level = LUSTRE_CONN_RECOVD;
+        req->rq_level = LUSTRE_CONN_RECOVER;
         rc = ptl_send_rpc(req);
         if (rc) {
                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
@@ -1535,13 +1541,6 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
          * this flag and then putting requests on sending_list or delayed_list.
          */
         spin_lock_irqsave(&imp->imp_lock, flags);
-        if (!imp->imp_replayable)
-                /* on b_devel, I moved this line to
-                   ptlrpc_set_import_active because I thought it made
-                   more sense there and possibly not all callers of
-                   this function expect this. I'll leave it here until
-                   I can figure out if it's correct or not. - rread 5/12/03  */
-                imp->imp_invalid = 1;
 
         /* XXX locking?  Maybe we should remove each request with the list
          * locked?  Also, how do we know if the requests on the list are
@@ -1554,11 +1553,13 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 DEBUG_REQ(D_HA, req, "inflight");
 
                 spin_lock (&req->rq_lock);
-                req->rq_err = 1;
-                if (req->rq_set != NULL)
-                        wake_up(&req->rq_set->set_waitq);
-                else
-                        wake_up(&req->rq_wait_for_rep);
+                if (req->rq_import_generation < imp->imp_generation) {
+                        req->rq_err = 1;
+                        if (req->rq_set != NULL)
+                                wake_up(&req->rq_set->set_waitq);
+                        else
+                                wake_up(&req->rq_wait_for_rep);
+                }
                 spin_unlock (&req->rq_lock);
         }
 
@@ -1569,12 +1570,14 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 DEBUG_REQ(D_HA, req, "aborting waiting req");
 
                 spin_lock (&req->rq_lock);
-                req->rq_err = 1;
-                if (req->rq_set != NULL)
-                        wake_up(&req->rq_set->set_waitq);
-                else
-                        wake_up(&req->rq_wait_for_rep);
-                spin_unlock (&req->rq_lock);
+                if (req->rq_import_generation < imp->imp_generation) {
+                        req->rq_err = 1;
+                        if (req->rq_set != NULL)
+                                wake_up(&req->rq_set->set_waitq);
+                        else
+                                wake_up(&req->rq_wait_for_rep);
+                        spin_unlock (&req->rq_lock);
+                }
         }
 
         /* Last chance to free reqs left on the replay list, but we
index 017fb8b..c0ccb4d 100644 (file)
@@ -29,6 +29,7 @@
 #include <linux/lustre_net.h>
 #include <linux/lustre_lib.h>
 #include <linux/obd.h>
+#include "ptlrpc_internal.h"
 
 static int ptl_send_buf(struct ptlrpc_request *request,
                         struct ptlrpc_connection *conn, int portal)
@@ -672,6 +673,7 @@ int ptl_send_rpc(struct ptlrpc_request *request)
         spin_unlock_irqrestore (&request->rq_lock, flags);
 
         request->rq_sent = LTIME_S(CURRENT_TIME);
+        ptlrpc_pinger_sending_on_import(request->rq_import);
         rc = ptl_send_buf(request, request->rq_connection,
                           request->rq_request_portal);
         if (rc == 0)
index 51a0cad..ebc69e1 100644 (file)
@@ -4,7 +4,8 @@
  * Portal-RPC reconnection and replay operations, for use in recovery.
  *
  *  Copyright (c) 2003 Cluster File Systems, Inc.
- *   Author: Phil Schwan <phil@clusterfs.com>
+ *   Authors: Phil Schwan <phil@clusterfs.com>
+ *            Mike Shaver <shaver@clusterfs.com>
  *
  *   This file is part of Lustre, http://www.lustre.org.
  *
@@ -23,6 +24,7 @@
  */
 
 #include <linux/version.h>
+#include <asm/semaphore.h>
 
 #define DEBUG_SUBSYSTEM S_RPC
 #include <linux/obd_support.h>
 #include "ptlrpc_internal.h"
 
 static struct ptlrpc_thread *pinger_thread = NULL;
-static spinlock_t pinger_lock = SPIN_LOCK_UNLOCKED;
+static DECLARE_MUTEX(pinger_sem);
 static struct list_head pinger_imports = LIST_HEAD_INIT(pinger_imports);
 
+int ptlrpc_start_pinger(void);
+int ptlrpc_stop_pinger(void);
+
+void ptlrpc_pinger_sending_on_import(struct obd_import *imp)
+{
+        down(&pinger_sem);
+        imp->imp_next_ping = jiffies + (obd_timeout * HZ);
+        up(&pinger_sem);
+}
+
 int ptlrpc_pinger_add_import(struct obd_import *imp)
 {
+        int rc;
         ENTRY;
+
+#ifndef ENABLE_PINGER
+        RETURN(0);
+#else
         if (!list_empty(&imp->imp_pinger_chain))
                 RETURN(-EALREADY);
 
-        spin_lock(&pinger_lock);
-        list_add(&imp->imp_pinger_chain, &pinger_imports);
-        spin_unlock(&pinger_lock);
+        down(&pinger_sem);
+        if (list_empty(&pinger_imports)) {
+                up(&pinger_sem);
+                rc = ptlrpc_start_pinger();
+                if (rc < 0)
+                        RETURN(rc);
+                down(&pinger_sem);
+        }
+                
+        CDEBUG(D_HA, "adding pingable import %s->%s\n",
+               imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
+        imp->imp_next_ping = jiffies + (obd_timeout * HZ);
+        list_add_tail(&imp->imp_pinger_chain, &pinger_imports); /* XXX sort, blah blah */
+        class_import_get(imp);
+        up(&pinger_sem);
         RETURN(0);
+#endif
 }
 
 int ptlrpc_pinger_del_import(struct obd_import *imp)
 {
+        int rc;
         ENTRY;
+
+#ifndef ENABLE_PINGER
+        RETURN(0);
+#else
         if (list_empty(&imp->imp_pinger_chain))
-                RETURN(-EALREADY);
+                RETURN(-ENOENT);
 
-        spin_lock(&pinger_lock);
+        down(&pinger_sem);
         list_del_init(&imp->imp_pinger_chain);
-        spin_unlock(&pinger_lock);
+        CDEBUG(D_HA, "removing pingable import %s->%s\n",
+               imp->imp_obd->obd_uuid.uuid, imp->imp_target_uuid.uuid);
+        class_import_put(imp);
+        if (list_empty(&pinger_imports)) {
+                up(&pinger_sem);
+                rc = ptlrpc_stop_pinger();
+                if (rc)
+                        RETURN(rc);
+                down(&pinger_sem);
+        }
+        up(&pinger_sem);
         RETURN(0);
-}
-
-static void ptlrpc_pinger_do_stuff(void)
-{
-
-
-
+#endif
 }
 
 static int ptlrpc_pinger_main(void *arg)
@@ -69,7 +108,6 @@ static int ptlrpc_pinger_main(void *arg)
         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
         struct ptlrpc_thread *thread = data->thread;
         unsigned long flags;
-        int rc = 0;
         ENTRY;
 
         lock_kernel();
@@ -94,36 +132,140 @@ static int ptlrpc_pinger_main(void *arg)
         thread->t_flags = SVC_RUNNING;
         wake_up(&thread->t_ctl_waitq);
 
-        /* And now, loop forever on requests */
+        /* And now, loop forever, pinging as needed. */
         while (1) {
-                struct l_wait_info lwi = LWI_TIMEOUT(5 * HZ, NULL, NULL);
-                l_wait_event(thread->t_ctl_waitq,
-                             thread->t_flags & SVC_STOPPING, &lwi);
+                unsigned long this_ping = jiffies;
+                long time_to_next_ping;
+                struct l_wait_info lwi = LWI_TIMEOUT(10 * HZ, NULL, NULL);
+                struct ptlrpc_request_set *set;
+                struct ptlrpc_request *req;
+                struct list_head *iter;
+                wait_queue_t set_wait;
+                int rc;
+
+                set = ptlrpc_prep_set();
+                down(&pinger_sem);
+                list_for_each(iter, &pinger_imports) {
+                        struct obd_import *imp =
+                                list_entry(iter, struct obd_import, imp_pinger_chain);
+                        int generation, level;
+                        unsigned long flags;
+
+                        if (imp->imp_next_ping <= this_ping) {
+                                /* Add a ping. */
+                                spin_lock_irqsave(&imp->imp_lock, flags);
+                                generation = imp->imp_generation;
+                                level = imp->imp_level;
+                                spin_unlock_irqrestore(&imp->imp_lock, flags);
+
+                                if (level != LUSTRE_CONN_FULL) {
+                                        CDEBUG(D_HA, "not pinging %s (in recovery)\n",
+                                               imp->imp_target_uuid.uuid);
+                                        continue;
+                                }
+
+                                req = ptlrpc_prep_req(imp, OBD_PING, 0, NULL, NULL);
+                                if (!req) {
+                                        CERROR("OOM trying to ping\n");
+                                        break;
+                                }
+                                req->rq_replen = lustre_msg_size(0, NULL);
+                                req->rq_level = LUSTRE_CONN_FULL;
+                                req->rq_phase = RQ_PHASE_RPC;
+                                req->rq_import_generation = generation;
+                                ptlrpc_set_add_req(set, req);
+                        } else {
+                                CDEBUG(D_HA, "don't need to ping %s (%lu > %lu)\n",
+                                       imp->imp_target_uuid.uuid, imp->imp_next_ping,
+                                       this_ping);
+                        }
+                }
+                up(&pinger_sem);
+
+                /* Might be empty, that's OK. */
+                if (set->set_remaining == 0)
+                        CDEBUG(D_HA, "nothing to ping\n");
+                list_for_each(iter, &set->set_requests) {
+                        struct ptlrpc_request *req =
+                                list_entry(iter, struct ptlrpc_request, rq_set_chain);
+                        DEBUG_REQ(D_HA, req, "pinging %s->%s",
+                                  req->rq_import->imp_obd->obd_uuid.uuid,
+                                  req->rq_import->imp_target_uuid.uuid);
+                        (void)ptl_send_rpc(req);
+                }
+
+                /* Have to wait on both the thread's queue and the set's. */
+                init_waitqueue_entry(&set_wait, current);
+                add_wait_queue(&set->set_waitq, &set_wait);
+                rc = l_wait_event(thread->t_ctl_waitq,
+                                  thread->t_flags & SVC_STOPPING || ptlrpc_check_set(set),
+                                  &lwi);
+                remove_wait_queue(&set->set_waitq, &set_wait);
+                CDEBUG(D_HA, "ping complete (%lu)\n", jiffies);
 
                 if (thread->t_flags & SVC_STOPPING) {
                         thread->t_flags &= ~SVC_STOPPING;
+                        list_for_each(iter, &set->set_requests) {
+                                req = list_entry(iter, struct ptlrpc_request,
+                                                 rq_set_chain);
+                                if (!req->rq_replied)
+                                        ptlrpc_unregister_reply(req);
+                        }
+                        ptlrpc_set_destroy(set);
                         EXIT;
                         break;
                 }
-                ptlrpc_pinger_do_stuff();
+
+                /* Expire all the requests that didn't come back. */
+                down(&pinger_sem);
+                list_for_each(iter, &set->set_requests) {
+                        req = list_entry(iter, struct ptlrpc_request, rq_set_chain);
+
+                        if (req->rq_replied)
+                                continue;
+
+                        req->rq_phase = RQ_PHASE_COMPLETE;
+                        set->set_remaining--;
+                        /* If it was disconnected, don't sweat it. */
+                        if (list_empty(&req->rq_import->imp_pinger_chain))
+                                continue;
+
+                        ptlrpc_expire_one_request(req);
+                }
+                up(&pinger_sem);
+                ptlrpc_set_destroy(set);
+
+                /* Wait until the next ping time, or until we're stopped. */
+                time_to_next_ping = this_ping + (obd_timeout * HZ) - jiffies;
+                CDEBUG(D_HA, "next ping in %lu (%lu)\n", time_to_next_ping,
+                       this_ping + (obd_timeout * HZ));
+                if (time_to_next_ping > 0) {
+                        lwi = LWI_TIMEOUT(time_to_next_ping, NULL, NULL);
+                        l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPING,
+                                     &lwi);
+                        if (thread->t_flags & SVC_STOPPING) {
+                                thread->t_flags &= ~SVC_STOPPING;
+                                EXIT;
+                                break;
+                        }
+                }
         }
 
         thread->t_flags = SVC_STOPPED;
         wake_up(&thread->t_ctl_waitq);
 
-        CDEBUG(D_NET, "pinger thread exiting, process %d: rc = %d\n",
-               current->pid, rc);
-        return rc;
+        CDEBUG(D_NET, "pinger thread exiting, process %d\n", current->pid);
+        return 0;
 }
 
-int ptlrpc_pinger_start(void)
+int ptlrpc_start_pinger(void)
 {
         struct l_wait_info lwi = { 0 };
         struct ptlrpc_svc_data d;
         int rc;
         ENTRY;
 
-        spin_lock(&pinger_lock);
+        down(&pinger_sem);
         if (pinger_thread != NULL)
                 GOTO(out, rc = -EALREADY);
 
@@ -132,7 +274,7 @@ int ptlrpc_pinger_start(void)
                 GOTO(out, rc = -ENOMEM);
         init_waitqueue_head(&pinger_thread->t_ctl_waitq);
 
-        d.name = "Lustre pinger";
+        d.name = "ll_ping";
         d.thread = pinger_thread;
 
         /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
@@ -147,7 +289,7 @@ int ptlrpc_pinger_start(void)
                      pinger_thread->t_flags & SVC_RUNNING, &lwi);
 
  out:
-        spin_unlock(&pinger_lock);
+        up(&pinger_sem);
         RETURN(rc);
 }
 
@@ -157,7 +299,7 @@ int ptlrpc_stop_pinger(void)
         int rc = 0;
         ENTRY;
 
-        spin_lock(&pinger_lock);
+        down(&pinger_sem);
         if (pinger_thread == NULL)
                 GOTO(out, rc = -EALREADY);
 
@@ -169,6 +311,6 @@ int ptlrpc_stop_pinger(void)
         OBD_FREE(pinger_thread, sizeof(*pinger_thread));
 
  out:
-        spin_unlock(&pinger_lock);
+        up(&pinger_sem);
         RETURN(rc);
 }
index 575ed07..cb96c3c 100644 (file)
@@ -28,6 +28,7 @@
 struct ldlm_namespace;
 struct obd_import;
 struct ldlm_res_id;
+struct ptlrpc_request_set;
 
 /* ldlm hooks that we need, managed via inter_module_{get,put} */
 extern int (*ptlrpc_ldlm_namespace_cleanup)(struct ldlm_namespace *, int);
@@ -38,7 +39,7 @@ extern int (*ptlrpc_ldlm_replay_locks)(struct obd_import *);
 int ptlrpc_get_ldlm_hooks(void);
 void ptlrpc_daemonize(void);
 
-int ptlrpc_request_handle_eviction(struct ptlrpc_request *);
+void ptlrpc_request_handle_eviction(struct ptlrpc_request *);
 void lustre_assert_wire_constants (void);
 
 void ptlrpc_lprocfs_register_service(struct obd_device *obddev,
@@ -90,4 +91,8 @@ enum {
         PTLRPC_LAST_CNTR
 };
 
+int ptlrpc_expire_one_request(struct ptlrpc_request *req);
+int ptlrpc_check_set(struct ptlrpc_request_set *set);
+
+void ptlrpc_pinger_sending_on_import(struct obd_import *imp);
 #endif /* PTLRPC_INTERNAL_H */
index 71142fa..ccc05dc 100644 (file)
@@ -80,6 +80,11 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         memcpy(server_uuid.uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
                                                         sizeof(server_uuid)));
 
+        init_MUTEX(&cli->cl_dirty_sem);
+        cli->cl_dirty = 0;
+        cli->cl_dirty_granted = 0;
+        cli->cl_ost_can_grant = 1;
+
         conn = ptlrpc_uuid_to_connection(&server_uuid);
         if (conn == NULL)
                 RETURN(-ENOENT);
@@ -97,6 +102,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         imp->imp_obd = obddev;
         imp->imp_connect_op = connect_op;
         imp->imp_generation = 0;
+        INIT_LIST_HEAD(&imp->imp_pinger_chain);
         memcpy(imp->imp_target_uuid.uuid, data->ioc_inlbuf1, data->ioc_inllen1);
         class_import_put(imp);
 
index 01ba349..57f3653 100644 (file)
@@ -227,6 +227,11 @@ EXPORT_SYMBOL(ptlrpc_recover_import);
 EXPORT_SYMBOL(client_obd_setup);
 EXPORT_SYMBOL(client_obd_cleanup);
 
+/* pinger.c */
+EXPORT_SYMBOL(ptlrpc_pinger_add_import);
+EXPORT_SYMBOL(ptlrpc_pinger_del_import);
+EXPORT_SYMBOL(ptlrpc_pinger_sending_on_import);
+
 #ifdef __KERNEL__
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Request Processor");
index a90df0e..ca2afad 100644 (file)
 
 #include "ptlrpc_internal.h"
 
-int ptlrpc_reconnect_import(struct obd_import *imp,
-                            struct ptlrpc_request **reqptr)
+enum reconnect_result {
+        RECON_RESULT_RECOVERING  =  1,
+        RECON_RESULT_RECONNECTED  = 2,
+        RECON_RESULT_EVICTED      = 3,
+};
+
+int ptlrpc_reconnect_import(struct obd_import *imp)
 {
         struct obd_device *obd = imp->imp_obd;
-        int flags, rc, size[] = {sizeof(imp->imp_target_uuid),
+        int rc, size[] = {sizeof(imp->imp_target_uuid),
                                  sizeof(obd->obd_uuid),
                                  sizeof(imp->imp_dlm_handle)};
         char *tmp[] = {imp->imp_target_uuid.uuid,
@@ -55,10 +60,7 @@ int ptlrpc_reconnect_import(struct obd_import *imp,
         struct ptlrpc_connection *conn = imp->imp_connection;
         struct ptlrpc_request *req;
         struct lustre_handle old_hdl;
-
-        spin_lock_irqsave(&imp->imp_lock, flags);
-        imp->imp_generation++;
-        spin_unlock_irqrestore(&imp->imp_lock, flags);
+        __u64 committed_before_reconnect = imp->imp_peer_committed_transno;
 
         CERROR("reconnect handle "LPX64"\n", 
                imp->imp_dlm_handle.cookie);
@@ -70,6 +72,7 @@ int ptlrpc_reconnect_import(struct obd_import *imp,
         req->rq_replen = lustre_msg_size(0, NULL);
         rc = ptlrpc_queue_wait(req);
         if (rc) {
+                /* what if rc > 0 ??*/
                 CERROR("cannot connect to %s@%s: rc = %d\n",
                        imp->imp_target_uuid.uuid, conn->c_remote_uuid.uuid, rc);
                 GOTO(out_disc, rc);
@@ -95,23 +98,36 @@ int ptlrpc_reconnect_import(struct obd_import *imp,
                                imp->imp_remote_handle.cookie,
                                req->rq_repmsg->handle.cookie);
                         imp->imp_remote_handle = req->rq_repmsg->handle;
-                        GOTO(out_disc, rc = 0);
+                        GOTO(out_disc, rc = RECON_RESULT_RECONNECTED);
                 }
 
                 CERROR("reconnected to %s@%s after partition\n",
                        imp->imp_target_uuid.uuid, conn->c_remote_uuid.uuid);
-                GOTO(out_disc, rc = 0);
+                GOTO(out_disc, rc = RECON_RESULT_RECONNECTED);
+        } else if (lustre_msg_get_op_flags(req->rq_repmsg) & MSG_CONNECT_RECOVERING) {
+                rc = RECON_RESULT_RECOVERING;
+        } else {
+                rc = RECON_RESULT_EVICTED;
         }
-
+        
         old_hdl = imp->imp_remote_handle;
         imp->imp_remote_handle = req->rq_repmsg->handle;
         CERROR("reconnected to %s@%s ("LPX64", was "LPX64")!\n",
                imp->imp_target_uuid.uuid, conn->c_remote_uuid.uuid,
                imp->imp_remote_handle.cookie, old_hdl.cookie);
-        GOTO(out_disc, rc = 0);
+        if (req->rq_repmsg->last_committed < committed_before_reconnect) {
+                CERROR("%s went back in time (transno "LPD64
+                       " was committed, server claims "LPD64
+                       ")! is shared storage not coherent?\n",
+                       imp->imp_target_uuid.uuid,
+                       imp->imp_peer_committed_transno,
+                       req->rq_repmsg->last_committed);
+        }
+
+        GOTO(out_disc, rc);
 
  out_disc:
-        *reqptr = req;
+        ptlrpc_req_finished(req);
         return rc;
 }
 
@@ -145,7 +161,7 @@ void ptlrpc_run_recovery_over_upcall(struct obd_device *obd)
 
 void ptlrpc_run_failed_import_upcall(struct obd_import* imp)
 {
-        char *argv[6];
+        char *argv[7];
         char *envp[3];
         int rc;
 
@@ -153,9 +169,10 @@ void ptlrpc_run_failed_import_upcall(struct obd_import* imp)
         argv[0] = obd_lustre_upcall;
         argv[1] = "FAILED_IMPORT";
         argv[2] = imp->imp_target_uuid.uuid;
-        argv[3] = imp->imp_obd->obd_uuid.uuid;
+        argv[3] = imp->imp_obd->obd_name;
         argv[4] = imp->imp_connection->c_remote_uuid.uuid;
-        argv[5] = NULL;
+        argv[5] = imp->imp_obd->obd_uuid.uuid;
+        argv[6] = NULL;
 
         envp[0] = "HOME=/";
         envp[1] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
@@ -241,8 +258,10 @@ int ptlrpc_resend(struct obd_import *imp)
          * list, so we don't need to hold the lock during this iteration and
          * resend process.
          */
+        /* Well... what if lctl recover is called twice at the same time?
+         */
         spin_lock_irqsave(&imp->imp_lock, flags);
-        LASSERT(imp->imp_level < LUSTRE_CONN_FULL);
+        LASSERT(imp->imp_level == LUSTRE_CONN_RECOVER);
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
         list_for_each_safe(tmp, pos, &imp->imp_sending_list) {
@@ -288,50 +307,48 @@ inline void ptlrpc_invalidate_import_state(struct obd_import *imp)
         ptlrpc_abort_inflight(imp);
 }
 
-int ptlrpc_request_handle_eviction(struct ptlrpc_request *failed_req)
+
+void ptlrpc_handle_failed_import(struct obd_import *imp)
 {
-        int rc = 0, in_recovery = 0;
+        ENTRY;
+        if (!imp->imp_replayable) {
+                CDEBUG(D_HA,
+                       "import %s@%s for %s not replayable, deactivating\n",
+                       imp->imp_target_uuid.uuid,
+                       imp->imp_connection->c_remote_uuid.uuid,
+                       imp->imp_obd->obd_name);
+                ptlrpc_set_import_active(imp, 0);
+        }
+
+        ptlrpc_run_failed_import_upcall(imp);
+        EXIT;
+}
+
+void ptlrpc_request_handle_eviction(struct ptlrpc_request *failed_req)
+{
+        int rc;
         struct obd_import *imp= failed_req->rq_import;
         unsigned long flags;
         struct ptlrpc_request *req;
-
-        spin_lock_irqsave(&imp->imp_lock, flags);
-
-        if (imp->imp_level == LUSTRE_CONN_NOTCONN)
-                in_recovery = 1;
-
-        if (failed_req->rq_import_generation == imp->imp_generation)
-                imp->imp_level = LUSTRE_CONN_NOTCONN;
-        else
-                in_recovery = 1;
-
-        spin_unlock_irqrestore(&imp->imp_lock, flags);
-
-        if (in_recovery) {
-                ptlrpc_resend_req(failed_req);
-                RETURN(rc);
-        }
+        ENTRY;
 
         CDEBUG(D_HA, "import %s of %s@%s evicted: reconnecting\n",
                imp->imp_obd->obd_name,
                imp->imp_target_uuid.uuid,
                imp->imp_connection->c_remote_uuid.uuid);
-        rc = ptlrpc_reconnect_import(imp, &req);
+        rc = ptlrpc_recover_import(imp, NULL);
         if (rc) {
                 ptlrpc_resend_req(failed_req);
-                ptlrpc_fail_import(imp, imp->imp_generation);
+                if (rc != -EALREADY)
+                        ptlrpc_handle_failed_import(imp);
         } else {
+                LASSERT(failed_req->rq_import_generation < imp->imp_generation);
                 spin_lock_irqsave (&failed_req->rq_lock, flags);
                 failed_req->rq_err = 1;
                 spin_unlock_irqrestore (&failed_req->rq_lock, flags);
-                spin_lock_irqsave(&imp->imp_lock, flags);
-                imp->imp_level = LUSTRE_CONN_FULL;
-                imp->imp_invalid = 0;
-                spin_unlock_irqrestore(&imp->imp_lock, flags);
-                ptlrpc_invalidate_import_state(imp/*, req->rq_import_generation*/);
         }
         ptlrpc_req_finished(req);
-        RETURN(rc);
+        EXIT;
 }
 
 int ptlrpc_set_import_active(struct obd_import *imp, int active)
@@ -347,15 +364,15 @@ int ptlrpc_set_import_active(struct obd_import *imp, int active)
         /* When deactivating, mark import invalid, and 
            abort in-flight requests. */
         if (!active) {
+                CDEBUG(D_ERROR, "setting import %s INVALID\n", imp->imp_target_uuid.uuid);
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 imp->imp_invalid = 1;
+                imp->imp_generation++;
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
-
-                ptlrpc_abort_inflight(imp);
+                ptlrpc_invalidate_import_state(imp);
+//                ptlrpc_abort_inflight(imp);
         } 
 
-        imp->imp_invalid = !active;
-
         if (notify_obd == NULL)
                 GOTO(out, rc = 0);
 
@@ -387,6 +404,7 @@ int ptlrpc_set_import_active(struct obd_import *imp, int active)
 out:
         /* When activating, mark import valid */
         if (active) {
+                CDEBUG(D_ERROR, "setting import %s VALID\n", imp->imp_target_uuid.uuid);
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 imp->imp_invalid = 0;
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
@@ -403,21 +421,11 @@ void ptlrpc_fail_import(struct obd_import *imp, int generation)
 
         LASSERT (!imp->imp_dlm_fake);
         
-        /* If we were already in recovery, or if the import's connection to its
-         * service is newer than the failing operation's original attempt, then
-         * we don't want to recover again. */
         spin_lock_irqsave(&imp->imp_lock, flags);
-
-        if (imp->imp_level == LUSTRE_CONN_RECOVD)
-                in_recovery = 1;
-
-        if (generation == imp->imp_generation) {
-                imp->imp_level = LUSTRE_CONN_RECOVD;
-                imp->imp_generation++;
-        } else {
+        if (imp->imp_level != LUSTRE_CONN_FULL)
                 in_recovery = 1;
-        }
-
+        else
+                imp->imp_level = LUSTRE_CONN_NOTCONN;
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
         if (in_recovery) {
@@ -425,16 +433,7 @@ void ptlrpc_fail_import(struct obd_import *imp, int generation)
                 return;
         }
 
-        if (!imp->imp_replayable) {
-                CDEBUG(D_HA,
-                       "import %s@%s for %s not replayable, deactivating\n",
-                       imp->imp_target_uuid.uuid,
-                       imp->imp_connection->c_remote_uuid.uuid,
-                       imp->imp_obd->obd_name);
-                ptlrpc_set_import_active(imp, 0);
-        }
-
-        ptlrpc_run_failed_import_upcall(imp);
+        ptlrpc_handle_failed_import(imp);
         EXIT;
 }
 
@@ -449,7 +448,7 @@ static int signal_completed_replay(struct obd_import *imp)
                 RETURN(-ENOMEM);
 
         req->rq_replen = lustre_msg_size(0, NULL);
-        req->rq_level = LUSTRE_CONN_RECOVD;
+        req->rq_level = LUSTRE_CONN_RECOVER;
         req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
 
         rc = ptlrpc_queue_wait(req);
@@ -460,18 +459,23 @@ static int signal_completed_replay(struct obd_import *imp)
 
 int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid)
 {
-        int msg_flags = 0, rc;
+        int rc;
         unsigned long flags;
-        struct ptlrpc_request *req;
+        int in_recover = 0;
+        int recon_result;
         ENTRY;
 
         spin_lock_irqsave(&imp->imp_lock, flags);
-        if (imp->imp_level == LUSTRE_CONN_FULL) {
-                imp->imp_level = LUSTRE_CONN_RECOVD;
-                imp->imp_generation++;
-        }
+        if (imp->imp_level == LUSTRE_CONN_FULL || 
+            imp->imp_level == LUSTRE_CONN_NOTCONN)
+                    imp->imp_level = LUSTRE_CONN_RECOVER;
+        else
+                in_recover = 1;
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
+        if (in_recover == 1) 
+                RETURN(-EALREADY);
+
         if (new_uuid) {
                 struct ptlrpc_connection *conn;
                 struct obd_uuid uuid;
@@ -509,19 +513,19 @@ int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid)
 
         }
 
-        rc = ptlrpc_reconnect_import(imp, &req);
+        recon_result = ptlrpc_reconnect_import(imp);
 
-        if (rc) {
+        if (recon_result < 0) {
                 CERROR("failed to reconnect to %s@%s: %d\n",
                        imp->imp_target_uuid.uuid,
-                       imp->imp_connection->c_remote_uuid.uuid, rc);
-                RETURN(rc);
+                       imp->imp_connection->c_remote_uuid.uuid, recon_result);
+                spin_lock_irqsave(&imp->imp_lock, flags);
+                imp->imp_level = LUSTRE_CONN_NOTCONN;
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+                RETURN(recon_result);
         }
 
-        if (req->rq_repmsg)
-                msg_flags = lustre_msg_get_op_flags(req->rq_repmsg);
-
-        if (msg_flags & MSG_CONNECT_RECOVERING) {
+        if (recon_result == RECON_RESULT_RECOVERING) {
                 CDEBUG(D_HA, "replay requested by %s\n",
                        imp->imp_target_uuid.uuid);
                 rc = ptlrpc_replay(imp);
@@ -538,28 +542,31 @@ int ptlrpc_recover_import(struct obd_import *imp, char *new_uuid)
                 rc = signal_completed_replay(imp);
                 if (rc)
                         GOTO(out, rc);
-        } else if (msg_flags & MSG_CONNECT_RECONNECT) {
+        } else if (recon_result == RECON_RESULT_RECONNECTED) {
                 CDEBUG(D_HA, "reconnected to %s@%s\n",
                        imp->imp_target_uuid.uuid,
                        imp->imp_connection->c_remote_uuid.uuid);
-        } else {
+        } else if (recon_result == RECON_RESULT_EVICTED) {
                 CDEBUG(D_HA, "evicted from %s@%s; invalidating\n",
                        imp->imp_target_uuid.uuid,
                        imp->imp_connection->c_remote_uuid.uuid);
-                ptlrpc_invalidate_import_state(imp);
+                ptlrpc_set_import_active(imp, 0);
+//                ptlrpc_invalidate_import_state(imp);
+        } else {
+                LBUG();
         }
 
+        ptlrpc_set_import_active(imp, 1);
+
         rc = ptlrpc_resend(imp);
 
         spin_lock_irqsave(&imp->imp_lock, flags);
         imp->imp_level = LUSTRE_CONN_FULL;
-        imp->imp_invalid = 0;
         spin_unlock_irqrestore(&imp->imp_lock, flags);
 
         ptlrpc_wake_delayed(imp);
         EXIT;
  out:
-        ptlrpc_req_finished(req);
         return rc;
 }
 
index 40e627d..a24a26a 100644 (file)
@@ -2,7 +2,7 @@
 %define version b_devel
 %define kversion @RELEASE@
 %define linuxdir @LINUX@
-Release: 0305281701chaos
+Release: 0306170928kernel
 
 Summary: Lustre Lite File System
 Name: lustre-lite
@@ -50,16 +50,16 @@ Requires: openldap-servers, openldap-clients, python-ldap, 4Suite
 Configures openldap server for LDAP Lustre config database
 
 
-%package -n liblustre
-Summary: Lustre Lib
-Group: Development/Kernel
+#%package -n liblustre
+#Summary: Lustre Lib
+#Group: Development/Kernel
 
-%description -n liblustre
-Lustre lib binary package.
+#%description -n liblustre
+#Lustre lib binary package.
 
 %prep
 %setup -qn lustre-%{version}
-%setup -c -n lustre-%{version}-lib
+#%setup -c -n lustre-%{version}-lib
 
 %build
 rm -rf $RPM_BUILD_ROOT
@@ -69,20 +69,20 @@ cd $RPM_BUILD_DIR/lustre-%{version}
 ./configure --with-linux='%{linuxdir}' 
 make
 
-%ifarch i386
-cd $RPM_BUILD_DIR/lustre-%{version}-lib/lustre-%{version}
-./configure --with-lib 
-make
-%endif
+#%ifarch i386
+#cd $RPM_BUILD_DIR/lustre-%{version}-lib/lustre-%{version}
+#./configure --with-lib 
+#make
+#%endif
 
 %install
 cd $RPM_BUILD_DIR/lustre-%{version}
 make install prefix=$RPM_BUILD_ROOT
 
-%ifarch i386
-cd $RPM_BUILD_DIR/lustre-%{version}-lib/lustre-%{version}
-make install prefix=$RPM_BUILD_ROOT
-%endif
+#%ifarch i386
+#cd $RPM_BUILD_DIR/lustre-%{version}-lib/lustre-%{version}
+#make install prefix=$RPM_BUILD_ROOT
+#%endif
 
 %ifarch alpha
 # this hurts me
@@ -110,11 +110,15 @@ mkdir -p $RPM_BUILD_ROOT/var/lib/ldap/lustre
 %attr(-, root, root) /usr/sbin/lmc
 %attr(-, root, root) /usr/sbin/lctl
 %attr(-, root, root) /usr/sbin/lconf
+%attr(-, root, root) /usr/sbin/lactive
 %attr(-, root, root) /usr/sbin/llanalyze
 %attr(-, root, root) /usr/sbin/lfind
 %attr(-, root, root) /usr/sbin/lstripe
 %attr(-, root, root) /usr/sbin/mcreate
 %attr(-, root, root) /usr/sbin/mkdirmany
+%attr(-, root, root) /usr/sbin/llstat.pl
+%attr(-, root, root) /usr/sbin/llobdstat.pl
+%attr(-, root, root) /usr/sbin/load_ldap.sh
 %attr(-, root, root) /usr/lib/lustre/python/*
 %attr(-, root, root) /usr/lib/lustre/examples/llmount.sh
 %attr(-, root, root) /usr/lib/lustre/examples/llmountcleanup.sh
@@ -122,6 +126,9 @@ mkdir -p $RPM_BUILD_ROOT/var/lib/ldap/lustre
 %attr(-, root, root) /usr/lib/lustre/examples/local.sh
 %attr(-, root, root) /usr/lib/lustre/examples/uml.sh
 %attr(-, root, root) /usr/lib/lustre/examples/lov.sh
+%attr(-, root, root) /usr/lib/lustre/examples/echo.sh
+%attr(-, root, root) /usr/lib/lustre/examples/llechocleanup.sh
+
 %attr(-, root, root) /etc/init.d/lustre
 %attr(-, root, root) /usr/sbin/acceptor
 %attr(-, root, root) /usr/sbin/ptlctl
@@ -130,20 +137,26 @@ mkdir -p $RPM_BUILD_ROOT/var/lib/ldap/lustre
 %attr(-, root, root) /lib/libptlctl.a
 %attr(-, root, root) /lib/libtcpnal.a
 %attr(-, root, root) /usr/include/lustre/*.h
+%attr(-, root, root) /usr/sbin/lload
+%attr(-, root, root) /usr/sbin/obdbarrier
+%attr(-, root, root) /usr/sbin/obdio
+%attr(-, root, root) /usr/sbin/routerstat
+%attr(-, root, root) /usr/sbin/wirecheck
+
 %ifarch alpha
 %attr(-, root, root) /usr/sbin/mcpload
 %endif
 
 %files -n lustre-doc
-#%attr(-, root, root) %doc COPYING FDL
+%attr(-, root, root) %doc COPYING FDL
 %attr(-, root, root) %doc doc/lustre.pdf doc/lustre-HOWTO.txt
-%attr(-, root, root) %doc tests/client-echo.cfg tests/client-mount.cfg
-%attr(-, root, root) %doc tests/client-mount2.cfg
-%attr(-, root, root) %doc tests/elan-client.cfg tests/elan-server.cfg
-%attr(-, root, root) %doc tests/ldlm.cfg tests/lustre.cfg
-%attr(-, root, root) %doc tests/mds.cfg tests/net-client.cfg
-%attr(-, root, root) %doc tests/net-local.cfg tests/net-server.cfg
-%attr(-, root, root) %doc tests/obdecho.cfg tests/obdfilter.cfg
+#%attr(-, root, root) %doc tests/client-echo.cfg tests/client-mount.cfg
+#%attr(-, root, root) %doc tests/client-mount2.cfg
+#%attr(-, root, root) %doc tests/elan-client.cfg tests/elan-server.cfg
+#%attr(-, root, root) %doc tests/ldlm.cfg tests/lustre.cfg
+#%attr(-, root, root) %doc tests/mds.cfg tests/net-client.cfg
+#%attr(-, root, root) %doc tests/net-local.cfg tests/net-server.cfg
+#%attr(-, root, root) %doc tests/obdecho.cfg tests/obdfilter.cfg
 
 %files -n lustre-modules
 %attr(-, root, root) %doc COPYING
@@ -152,6 +165,7 @@ mkdir -p $RPM_BUILD_ROOT/var/lib/ldap/lustre
 %attr(-, root, root) /lib/modules/%{kversion}/kernel/fs/lustre/mdc.o
 %attr(-, root, root) /lib/modules/%{kversion}/kernel/fs/lustre/mds.o
 %attr(-, root, root) /lib/modules/%{kversion}/kernel/fs/lustre/fsfilt_ext3.o
+%attr(-, root, root) /lib/modules/%{kversion}/kernel/fs/lustre/fsfilt_reiserfs.o
 %attr(-, root, root) /lib/modules/%{kversion}/kernel/fs/lustre/obdclass.o
 %attr(-, root, root) /lib/modules/%{kversion}/kernel/fs/lustre/obdecho.o
 %attr(-, root, root) /lib/modules/%{kversion}/kernel/fs/lustre/obdfilter.o
@@ -159,6 +173,8 @@ mkdir -p $RPM_BUILD_ROOT/var/lib/ldap/lustre
 %attr(-, root, root) /lib/modules/%{kversion}/kernel/fs/lustre/osc.o
 %attr(-, root, root) /lib/modules/%{kversion}/kernel/fs/lustre/ost.o
 %attr(-, root, root) /lib/modules/%{kversion}/kernel/fs/lustre/ptlrpc.o
+%attr(-, root, root) /lib/modules/%{kversion}/kernel/fs/lustre/ptlbd.o
+%attr(-, root, root) /lib/modules/%{kversion}/kernel/fs/lustre/cobd.o
 #portals modules
 %attr(-, root, root) /lib/modules/%{kversion}/kernel/net/lustre/kptlrouter.o
 %attr(-, root, root) /lib/modules/%{kversion}/kernel/net/lustre/*nal.o
@@ -171,26 +187,26 @@ mkdir -p $RPM_BUILD_ROOT/var/lib/ldap/lustre
 %files -n lustre-source
 %attr(-, root, root) /usr/src/lustre-%{version}
 
-%ifarch i386
-%files -n liblustre
-%attr(-, root, root) /lib/lustre
-%attr(-, root, root) /lib/lustre/liblov.a
-%attr(-, root, root) /lib/lustre/liblustreclass.a
-%attr(-, root, root) /lib/lustre/libptlrpc.a
-%attr(-, root, root) /lib/lustre/libobdecho.a
-%attr(-, root, root) /lib/lustre/libldlm.a
-%attr(-, root, root) /lib/lustre/libosc.a
-%attr(-, root, root) /usr/sbin/lctl
-%attr(-, root, root) /usr/sbin/lfind
-%attr(-, root, root) /usr/sbin/lstripe
-%attr(-, root, root) /usr/sbin/obdio
-%attr(-, root, root) /usr/sbin/obdbarrier
-%attr(-, root, root) /usr/sbin/obdstat
-%attr(-, root, root) /usr/sbin/lload
-%attr(-, root, root) /usr/sbin/lconf
-%attr(-, root, root) /usr/sbin/lmc
-%attr(-, root, root) /usr/sbin/llanalyze
-%endif
+#%ifarch i386
+#%files -n liblustre
+#%attr(-, root, root) /lib/lustre
+#%attr(-, root, root) /lib/lustre/liblov.a
+#%attr(-, root, root) /lib/lustre/liblustreclass.a
+#%attr(-, root, root) /lib/lustre/libptlrpc.a
+#%attr(-, root, root) /lib/lustre/libobdecho.a
+#%attr(-, root, root) /lib/lustre/libldlm.a
+#%attr(-, root, root) /lib/lustre/libosc.a
+#%attr(-, root, root) /usr/sbin/lctl
+#%attr(-, root, root) /usr/sbin/lfind
+#%attr(-, root, root) /usr/sbin/lstripe
+#%attr(-, root, root) /usr/sbin/obdio
+#%attr(-, root, root) /usr/sbin/obdbarrier
+#%attr(-, root, root) /usr/sbin/obdstat
+#%attr(-, root, root) /usr/sbin/lload
+#%attr(-, root, root) /usr/sbin/lconf
+#%attr(-, root, root) /usr/sbin/lmc
+#%attr(-, root, root) /usr/sbin/llanalyze
+#%endif
 
 
 %files -n lustre-ldap
@@ -198,7 +214,7 @@ mkdir -p $RPM_BUILD_ROOT/var/lib/ldap/lustre
 %attr(-, root, root) /etc/openldap/schema/lustre.schema
 %attr(-, root, root) /usr/lib/lustre/lustre2ldif.xsl
 %attr(-, root, root) /usr/lib/lustre/top.ldif
-%dir /var/lib/ldap/lustre
+#%dir /var/lib/ldap/lustre
 %attr(700, ldap, ldap) /var/lib/ldap/lustre
 
 %post
index 5bb1e26..2e5c1fe 100644 (file)
@@ -41,3 +41,5 @@ runas
 openfile
 unlinkmany
 fchdir_test
+getdents
+o_directory
index 470c9de..064de98 100644 (file)
@@ -14,9 +14,9 @@ noinst_SCRIPTS = leak_finder.pl llecho.sh llmount.sh llmountcleanup.sh tbox.sh \
 noinst_PROGRAMS = openunlink testreq truncate directio openme writeme open_delay
 noinst_PROGRAMS += munlink tchmod toexcl fsx test_brw openclose createdestroy
 noinst_PROGRAMS += stat createmany statmany multifstat createtest mlink
-noinst_PROGRAMS += opendirunlink opendevunlink unlinkmany fchdir_test
-# noinst_PROGRAMS += ldaptest 
-noinst_PROGRAMS += checkstat wantedi statone runas openfile 
+noinst_PROGRAMS += opendirunlink opendevunlink unlinkmany fchdir_test checkstat
+noinst_PROGRAMS += wantedi statone runas openfile getdents o_directory
+# noinst_PROGRAMS += ldaptest
 sbin_PROGRAMS = mcreate mkdirmany
 
 # ldaptest_SOURCES = ldaptest.c
@@ -51,6 +51,8 @@ open_delay_SOURCES = open_delay.c
 opendirunlink_SOURCES=opendirunlink.c
 opendevunlink_SOURCES=opendirunlink.c
 fchdir_test_SOURCES=fchdir_test.c
+getdents_SOURCES=getdents.c
+o_directory_SOURCES = o_directory.c
 #mkdirdeep_SOURCES= mkdirdeep.c
 #mkdirdeep_LDADD=-L../portals/util -lptlctl
 #mkdirdeep_CPPFLAGS=-I$(top_srcdir)/portals/include
diff --git a/lustre/tests/getdents.c b/lustre/tests/getdents.c
new file mode 100644 (file)
index 0000000..b4155a9
--- /dev/null
@@ -0,0 +1,31 @@
+#include <stdio.h>
+#include <sys/types.h>
+#include <dirent.h>
+#include <errno.h>
+
+int main(int argc, char **argv)
+{
+        DIR *dir;
+        struct dirent64 *entry;
+
+        if (argc < 2) {
+                fprintf(stderr, "Usage: %s dirname\n", argv[0]);
+                return 1;
+        }
+
+        dir = opendir(argv[1]);
+        if (!dir) {
+                int rc = errno;
+                perror("opendir");
+                return rc;
+        }
+
+        while ((entry = readdir64(dir))) {
+                puts(entry->d_name);
+        }
+        
+        closedir(dir);
+
+        return 0;
+}
+                
index de20003..8e3b37b 100755 (executable)
@@ -1,7 +1,9 @@
 #!/bin/sh
 # suggested boilerplate for test script
 
-LCONF=${LCONF:-../utils/lconf}
+export PATH=`dirname $0`/../utils:$PATH
+
+LCONF=${LCONF:-lconf}
 NAME=${NAME:-local}
 
 config=$NAME.xml
@@ -28,7 +30,5 @@ if [ "$1" = "-v" ]; then
   verbose="-v"
 fi
 
-[ -x $LCONF ] || chmod a+rx $LCONF
-
-${LCONF} $portals_opt $lustre_opt $node_opt --reformat --gdb \
+${LCONF} $portals_opt $lustre_opt $node_opt ${REFORMAT:---reformat} --gdb \
     $verbose $conf_opt  || exit 2
index 98d0512..572cd65 100755 (executable)
@@ -1,6 +1,8 @@
 #!/bin/sh
 
-LCONF=${LCONF:-../utils/lconf}
+export PATH=`dirname $0`/../utils:$PATH
+
+LCONF=${LCONF:-lconf}
 NAME=${NAME:-local}
 TMP=${TMP:-/tmp}
 
index 7278dfa..25d05d2 100755 (executable)
@@ -1,9 +1,10 @@
-
 #!/bin/bash
 
+export PATH=`dirname $0`/../utils:$PATH
+
 config=${1:-local.xml}
 
-LMC="${LMC:-../utils/lmc} -m $config"
+LMC="${LMC:-lmc} -m $config"
 TMP=${TMP:-/tmp}
 
 MDSDEV=${MDSDEV:-$TMP/mds1}
index c0b2839..3956f9e 100755 (executable)
@@ -1,8 +1,10 @@
 #!/bin/bash
 
+export PATH=`dirname $0`/../utils:$PATH
+
 config=${1:-lov.xml}
 
-LMC=${LMC:-../utils/lmc}
+LMC=${LMC:-lmc}
 TMP=${TMP:-/tmp}
 
 MDSDEV=${MDSDEV:-$TMP/mds1}
diff --git a/lustre/tests/lstiming.sh b/lustre/tests/lstiming.sh
new file mode 100644 (file)
index 0000000..0b494e4
--- /dev/null
@@ -0,0 +1,51 @@
+#!/bin/bash
+
+set -e
+
+PATH=$PATH:.
+
+CHECKSTAT=${CHECKSTAT:-"checkstat -v"}
+MOUNT1=${MOUNT1:-/mnt/lustre1}
+MOUNT2=${MOUNT2:-/mnt/lustre2}
+DIRNAME=${DIRNAME:-"ls-timing"}
+DIRSIZE=${DIRSIZE:-200}
+export NAME=${NAME:-mount2}
+
+error () { 
+    echo FAIL
+    exit 1
+}
+
+pass() { 
+    echo PASS
+}
+echo "Mounting..."
+mount | grep $MOUNT1 || sh llmount.sh
+
+echo -n "Preparing test directory with $DIRSIZE files..."
+rm -rf "$MOUNT1/$DIRNAME"
+rm -rf "$MOUNT2/$DIRNAME"
+mkdir -p "$MOUNT1/$DIRNAME"
+[ -d "$MOUNT2/$DIRNAME" ] || error
+createmany -o $MOUNT1/$DIRNAME/file 0 $DIRSIZE &> /dev/null
+echo "done"
+
+echo -n "Cached ls: "
+time ls -lr $MOUNT1/$DIRNAME 1> /dev/null
+
+echo -n "Uncached ls: "
+time ls -lr $MOUNT2/$DIRNAME 1> /dev/null
+
+
+fsx $MOUNT1/$DIRNAME/fsx.file &>/dev/null &
+fsxpid=$!
+
+echo -n "Cached busy ls:"
+time ls -lr $MOUNT1/$DIRNAME 1> /dev/null
+
+echo -n "Uncached busy ls: "
+time ls -lr $MOUNT2/$DIRNAME 1> /dev/null
+
+kill $fsxpid
+
+exit
index cfd1535..38c0cea 100644 (file)
@@ -2,8 +2,8 @@
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
  * Compile with:
- * cc -I../../portals/include -o mkdirdeep mkdirdeep.c 
- *    -L../../portals/linux/utils -lptlctl 
+ * cc -I../../portals/include -o mkdirdeep mkdirdeep.c
+ *    -L../../portals/linux/utils -lptlctl
  */
 
 #include <stdio.h>
 #include <linux/limits.h>
 #include <portals/lltrace.h>
 
-static int opt_depth = 1;
-static int opt_mknod = 0; 
 static int opt_verbose = 0;
-static int opt_trace = 1;
-static char* basepathname = 0;
-static char mycwd[PATH_MAX];
-static char* pname = 0;
-static char* outputfilename = 0;
+static int opt_trace = 0;
 
-void usage()
+void usage(const char *pname)
 {
-        fprintf(stderr, "Usage: %s --depth <d> --output <outputtracefilename>"
-                "[--mknod] [--verbose] [--notrace] <basepath>\n", pname);
+        fprintf(stderr, "Usage: %s --depth <d> [--output <outputtracefilename>]"
+                " [--mknod] [--verbose] [--notrace] <basepath>\n", pname);
         exit(1);
 }
 
-int do_mkdir(charpath)
+int do_mkdir(char *path)
 {
         int rc = mkdir(path, 0755);
-        if (rc!=0) 
+
+        if (rc) {
                 fprintf(stderr, "mkdir(%s) failed: %s\n",
                         path, strerror(errno));
+                exit(1);
+        }
         if (opt_verbose)
                 printf("mkdir %s\n", path);
+
         return rc;
 }
 
 
-int do_mknod(charpath)
+int do_mknod(char *path)
 {
         int rc = mknod(path, 0755, S_IFIFO);
-        if (rc!=0) 
+
+        if (rc) {
                 fprintf(stderr, "mkdir(%s) failed: %s\n",
                         path, strerror(errno));
+                exit(1);
+        }
         if (opt_verbose)
                 printf("mknod %s\n", path);
+
         return rc;
 }
 
 int do_chdir(char* path)
 {
         int rc = chdir(path);
-        if (rc!=0) 
+
+        if (rc) {
                 fprintf(stderr, "chdir(%s) failed: %s\n",
                         path, strerror(errno));
+                exit(1);
+        }
         if (opt_verbose)
                 printf("chdir %s\n", path);
 
         return rc;
 }
 
-
-int do_stat(char* path)
+int do_stat(char *path)
 {
-        char mark_buf[PATH_MAX];
+        char mark_buf[PATH_MAX + 50];
         struct stat mystat;
         int rc = stat(path, &mystat);
-        if (rc!=0) 
+
+        if (rc) {
                 fprintf(stderr, "stat(%s) failed: %s\n",
                         path, strerror(errno));
+                exit(1);
+        }
         if (opt_verbose)
                 printf("stat %s = inode %lu\n", path, mystat.st_ino);
 
         if (opt_trace) {
-                snprintf(mark_buf, PATH_MAX, "stat %s = inode %lu", 
+                snprintf(mark_buf, PATH_MAX, "stat %s = inode %lu",
                          path, mystat.st_ino);
                 ltrace_mark(0, mark_buf);
         }
@@ -92,44 +99,40 @@ int do_stat(char* path)
 
 int main(int argc, char** argv)
 {
-        int c, opt_index, i, mypid;
-
-        static struct option long_options[] = {
-                {"depth", 1, 0, 0 },
-                {"help", 0, 0, 0 },
-                {"mknod", 0, 0, 0 },  
-                {"verbose", 0, 0, 0 },  
-                {"notrace", 0, 0, 0 },  
-                {"output", 1, 0, 0 },  
+        int c, i, mypid;
+        int opt_depth = 1;
+        int opt_mknod = 0;
+
+        static struct option long_opt[] = {
+                {"depth", 1, 0, 'd' },
+                {"help", 0, 0, 'h' },
+                {"mknod", 0, 0, 'm' },
+                {"output", 1, 0, 'o' },
+                {"trace", 1, 0, 't' },
+                {"verbose", 0, 0, 'v' },
                 {0,0,0,0}
         };
 
-        char full_pathname[PATH_MAX];
-        char rel_pathname[PATH_MAX];
-        char mark_buf[PATH_MAX];
+        char *outputfilename = NULL;
+        char *base_pathname;
+        char pathname[PATH_MAX];
+        char mark_buf[PATH_MAX + 50];
+        char mycwd[PATH_MAX];
+        char *pname = argv[0];
 
-        pname = strdup(argv[0]);
-        
-        while (1) {
-                c = getopt_long(argc, argv, "d:mhv", long_options, &opt_index);
-                if (c == -1)
-                        break;
-                if (c==0) {
-                        if (!strcmp(long_options[opt_index].name, "notrace")) {
-                                opt_trace = 0;
-                                continue;
-                        }
-                        c = long_options[opt_index].name[0];
-                }
+        while ((c = getopt_long(argc, argv, "d:mhvo:", long_opt, NULL)) != -1) {
                 switch (c) {
-                case 'd': 
+                case 'd':
                         opt_depth = atoi(optarg);
-                        if ((opt_depth == 0) || (opt_depth > 100))
-                                usage();
+                        if ((opt_depth == 0) || (opt_depth > 1100))
+                                usage(pname);
                         break;
                 case 'm':
                         opt_mknod = 1;
                         break;
+                case 't':
+                        opt_trace = 1;
+                        break;
                 case 'v':
                         opt_verbose = 1;
                         break;
@@ -137,92 +140,86 @@ int main(int argc, char** argv)
                         outputfilename = optarg;
                         break;
                 case 'h':
-                case '?': 
-                case ':': 
+                case '?':
+                case ':':
                 default:
-                        usage();
+                        usage(pname);
                         break;
                 }
         }
-                
-        if (optind != (argc-1)) 
-                usage();
 
-        if (outputfilename == NULL)
-                usage();
+        if (optind != (argc - 1))
+                usage(pname);
 
-        basepathname = argv[optind];
+        base_pathname = argv[optind];
         mypid = getpid();
-        
-        printf("%s(pid=%d) depth=%d mknod=%d, basepathname=%s, "
-               "trace=%d, outputfilename=%s\n",
-               pname, mypid, opt_depth, opt_mknod, basepathname, opt_trace, 
-               outputfilename);
 
         if (!getcwd(&mycwd[0], sizeof(mycwd))) {
                 fprintf(stderr, "%s: unable to getcwd()\n", pname);
                 exit(1);
         }
 
+        printf("%s(pid=%d) depth=%d mknod=%d, basepathname=%s, trace=%d\n",
+               pname, mypid, opt_depth, opt_mknod, base_pathname, opt_trace);
+
+        if (outputfilename)
+                printf("outputfilename=%s\n", outputfilename);
+
         if (opt_trace) {
                 ltrace_start();
                 ltrace_clear();
-                snprintf(mark_buf, PATH_MAX, 
-                         "Initialize - mkdir %s; chdir %s",
-                         basepathname, basepathname);
+                snprintf(mark_buf, PATH_MAX, "Initialize - mkdir %s; chdir %s",
+                         base_pathname, base_pathname);
                 ltrace_mark(2, mark_buf);
         }
 
-        if (do_mkdir(basepathname)!=0)
+        if (do_mkdir(base_pathname)!=0)
                 exit(1);
-        if (do_chdir(basepathname)!=0)
+        if (do_chdir(base_pathname)!=0)
                 exit(1);
 
         /* Create directory tree with depth level of subdirectories */
 
         if (opt_trace) {
-                snprintf(mark_buf, PATH_MAX, 
+                snprintf(mark_buf, PATH_MAX,
                          "Create Directory Tree (depth %d)", opt_depth);
                 ltrace_mark(2, mark_buf);
         }
 
-        for (i=0; i<opt_depth; i++) {
-                
-                snprintf(rel_pathname, sizeof(rel_pathname),"%d", i+1);
-                
-                 if (i == (opt_depth-1)) {
-                         /* Last Iteration */
-                         
-                         if (opt_trace) {
-                                 snprintf(mark_buf, PATH_MAX, 
-                                          "Tree Leaf (%d) %s/stat", i,
-                                          (opt_mknod ? "mknod" : "mkdir"));
-                                 ltrace_mark(3, mark_buf);
-                         }
-                         
-                         if (opt_mknod)
-                                 do_mknod(rel_pathname);
-                         else
-                                 do_mkdir(rel_pathname);
-                         /* Now stat it */
-                         do_stat(rel_pathname);
-                 }
-                else {
+        for (i = 0; i < opt_depth; i++) {
+                snprintf(pathname, sizeof(pathname), "%d", i + 1);
+
+                if (i == (opt_depth - 1)) {
+                        /* Last Iteration */
+
+                        if (opt_trace) {
+                                snprintf(mark_buf, PATH_MAX,
+                                         "Tree Leaf (%d) %s/stat", i,
+                                         (opt_mknod ? "mknod" : "mkdir"));
+                                ltrace_mark(3, mark_buf);
+                        }
+
+                        if (opt_mknod)
+                                do_mknod(pathname);
+                        else
+                                do_mkdir(pathname);
+                        /* Now stat it */
+                        do_stat(pathname);
+                } else {
                         /* Not Leaf */
 
                         if (opt_trace) {
-                                snprintf(mark_buf, PATH_MAX, 
-                                         "Tree Level (%d) mkdir/stat/chdir",
-                                         i);
+                                snprintf(mark_buf, sizeof(mark_buf),
+                                         "Tree Level (%d) mkdir/stat/chdir", i);
                                 ltrace_mark(3, mark_buf);
                         }
-                        
-                        do_mkdir(rel_pathname);
-                        do_stat(rel_pathname);
-                        do_chdir(rel_pathname);
+
+                        do_mkdir(pathname);
+                        do_stat(pathname);
+                        do_chdir(pathname);
                 }
         }
-        
+
         /* Stat through directory tree with fullpaths */
 
         if (opt_trace) {
@@ -230,46 +227,31 @@ int main(int argc, char** argv)
                 ltrace_mark(2, mark_buf);
         }
 
-        do_chdir(basepathname);
+        do_chdir(base_pathname);
 
-        strncpy(full_pathname, basepathname, sizeof(full_pathname));
+        strncpy(pathname, base_pathname, sizeof(pathname));
 
-        for (i=0; i<opt_depth; i++) {
-                snprintf(rel_pathname, sizeof(rel_pathname),"%d", i+1);
-                strcat(full_pathname, "/");
-                strcat(full_pathname, rel_pathname);
+        c = strlen(base_pathname);
+        for (i = 0; i < opt_depth; i++) {
+                c += snprintf(pathname + c, sizeof(pathname) - c, "/%d", i+1);
 
                 if (opt_trace) {
-                        snprintf(mark_buf, PATH_MAX, "stat %s", 
-                                 full_pathname);
+                        snprintf(mark_buf, PATH_MAX, "stat %s", pathname);
                         ltrace_mark(2, mark_buf);
                 }
 
-                do_stat(full_pathname);
+                do_stat(pathname);
         }
 
-        /* Cleanup */
-
-        if (opt_trace) {
-                snprintf(mark_buf, PATH_MAX, "Cleanup");
-                ltrace_mark(2, mark_buf);
-        }
-
-        if (opt_trace) {
+        if (opt_trace && outputfilename) {
                     ltrace_write_file(outputfilename);
                     ltrace_add_processnames(outputfilename);
                     ltrace_stop();
         }
 
-        do_chdir(basepathname);        
-        
-        snprintf(full_pathname, sizeof(full_pathname), 
-                 "rm -rf %s\n", basepathname);
-        if (opt_verbose) 
-                printf("Cleanup: %s", full_pathname);
-
-        system(full_pathname);
+        do_chdir(base_pathname);
 
         printf("%s (pid=%d) done.\n", pname, mypid);
+
         return 0;
 }
diff --git a/lustre/tests/o_directory.c b/lustre/tests/o_directory.c
new file mode 100644 (file)
index 0000000..d4b2c1b
--- /dev/null
@@ -0,0 +1,51 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+
+/* for O_DIRECTORY */
+#define _GNU_SOURCE
+
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+int main(int argc, char **argv)
+{
+        int fd, rc;
+
+        if (argc != 2) {
+                printf("Usage: %s <filename>\n", argv[0]);
+                exit(1);
+        }
+
+        fd = open(argv[1], O_RDONLY | O_CREAT, 0600);
+        if (fd == -1) {
+                printf("Error opening %s for create: %s\n", argv[1],
+                       strerror(errno));
+                exit(1);
+        }
+        rc = close(fd);
+        if (rc < 0) {
+                printf("Error closing %s: %s\n", argv[1], strerror(errno));
+                exit(1);
+        }
+
+        fd = open(argv[1], O_DIRECTORY);
+        if (fd >= 0) {
+                printf("opening %s as directory should have returned an "
+                       "error!\n", argv[1]);
+                exit(1);
+        }
+        if (errno != ENOTDIR) {
+                printf("opening %s as directory, expected -ENOTDIR and got "
+                       "%s\n", argv[1], strerror(errno));
+                exit(1);
+        }
+
+        return 0;
+}
index 0707f1b..ebf0a0c 100755 (executable)
@@ -104,8 +104,8 @@ unmount_client() {
 }
 
 setup() {
-    start_mds ${REFORMAT}
-    start_ost --timeout=$(($TIMEOUT*2)) ${REFORMAT}
+    start_mds --timeout=$TIMEOUT ${REFORMAT}
+    start_ost --timeout=$TIMEOUT ${REFORMAT}
     # XXX we should write our own upcall, when we move this somewhere better.
     mount_client --timeout=${TIMEOUT} \
         --lustre_upcall=$UPCALL
@@ -163,4 +163,7 @@ drop_reply "mlink /mnt/lustre/renamed-again /mnt/lustre/link2"
 drop_request "munlink /mnt/lustre/link1"
 drop_reply "munlink /mnt/lustre/link2"
 
+#bug 1423
+drop_reply "touch /mnt/lustre/renamed"
+
 $CLEANUP
index 11e888b..20981e8 100644 (file)
@@ -61,6 +61,7 @@ int main(int argc, char **argv)
                 default:
                         //fprintf(stderr, "Bad parameters.\n");
                         //Usage_and_abort ();
+                        break;
                 }
         }
 
index fa84420..cb417d2 100755 (executable)
@@ -1,11 +1,13 @@
 #!/bin/sh
 
+DIR=${DIR:-/mnt/lustre/`hostname`}
 #[ -e /proc/sys/portals/debug ] && echo 0 > /proc/sys/portals/debug 
-TGT=/mnt/lustre/client.txt
-SRC=/usr/lib/dbench/client.txt
+mkdir -p $DIR
+TGT=$DIR/client.txt
+SRC=${SRC:-/usr/lib/dbench/client.txt}
 [ ! -e $TGT -a -e $SRC ] && echo "copying $SRC to $TGT" && cp $SRC $TGT
 SRC=/usr/lib/dbench/client_plain.txt
 [ ! -e $TGT -a -e $SRC ] && echo "copying $SRC to $TGT" && cp $SRC $TGT
-cd /mnt/lustre
+cd $DIR
 echo "running 'dbench $@' on $PWD at `date`"
 dbench -c client.txt $@
index a694ed7..46d0072 100644 (file)
@@ -16,6 +16,7 @@ CHECKSTAT=${CHECKSTAT:-"./checkstat -v"}
 CREATETEST=${CREATETEST:-createtest}
 LFIND=${LFIND:-lfind}
 LSTRIPE=${LSTRIPE:-lstripe}
+LCTL=${LCTL:-lctl}
 MCREATE=${MCREATE:-mcreate}
 TOEXCL=${TOEXCL:-toexcl}
 TRUNCATE=${TRUNCATE:-truncate}
@@ -556,11 +557,14 @@ run_test 28 "create/mknod/mkdir with bad file types ============"
 test_29() {
        mkdir $DIR/d29
        touch $DIR/d29/foo
+       log 'first d29'
        ls -l $DIR/d29
        MDCDIR=${MDCDIR:-/proc/fs/lustre/ldlm/ldlm/MDC_*}
        LOCKCOUNTORIG=`cat $MDCDIR/lock_count`
        LOCKUNUSEDCOUNTORIG=`cat $MDCDIR/lock_unused_count`
+       log 'second d29'
        ls -l $DIR/d29
+       log 'done'
        LOCKCOUNTCURRENT=`cat $MDCDIR/lock_count`
        LOCKUNUSEDCOUNTCURRENT=`cat $MDCDIR/lock_unused_count`
        if [ $LOCKCOUNTCURRENT -gt $LOCKCOUNTORIG ]; then
@@ -805,8 +809,21 @@ test_36a() {
 run_test 36a "cvs init ========================================="
 
 test_36b() {
+       # on the LLNL clusters, runas will still pick up root's $TMP settings,
+        # which will not be writable for the runas user, and then you get a CVS
+       # error message with a corrupt path string (CVS bug) and panic.
+       # We're not using much space, so just stick it in /tmp, which is
+       # safe.
+       OLDTMPDIR=$TMPDIR
+       OLDTMP=$TMP
+       TMPDIR=/tmp
+       TMP=/tmp
+
        cd /etc/init.d
        $RUNAS cvs -d $DIR/cvsroot import -m "nomesg"  reposname vtag rtag
+
+       TMPDIR=$OLDTMPDIR
+       TMP=$OLDTMP
 }
 run_test 36b "cvs import ======================================="
 
@@ -838,6 +855,23 @@ test_36f() {
 }
 run_test 36f "cvs commit ======================================="
 
+test_37() {
+       mkdir -p $DIR/dextra
+       echo f > $DIR/dextra/fbugfile
+       mount -t ext2 -o loop /$EXT2_DEV $DIR/dextra
+       ls $DIR/dextra |grep "\<fbugfile\>" && error
+       umount /$EXT2_DEV
+       rm -f DIR/dextra/fbugfile
+}
+run_test 37 "ls a mounted file system to check the old contents ====="
+
+# open(file, O_DIRECTORY) will leak a request and not cleanup (bug 1501)
+test_38() {
+        o_directory $DIR/test38
+}
+run_test 38 "open a regular file with O_DIRECTORY =============="
+        
+
 log "cleanup: ======================================================"
 rm -r $DIR/[Rdfs][1-9]*
 if [ "$I_MOUNTED" = "yes" ]; then
index b59cc6c..c6a5d7d 100644 (file)
@@ -10,6 +10,7 @@
 #include <time.h>
 #include <string.h>
 #include <utime.h>
+#include <errno.h>
 
 void usage(char *prog)
 {
@@ -21,6 +22,8 @@ int main(int argc, char *argv[])
 {
        long before_mknod, after_mknod;
        long before_utime, after_utime;
+       const char *prog = argv[0];
+       const char *filename = argv[1];
        struct stat st;
        int rc;
 
@@ -28,56 +31,57 @@ int main(int argc, char *argv[])
                usage(argv[0]);
 
        before_mknod = time(0);
-       rc = mknod(argv[1], 0700, S_IFREG);
+       rc = mknod(filename, 0700, S_IFREG);
        after_mknod = time(0);
-       if (rc) {
+       if (rc && errno != EEXIST) {
                fprintf(stderr, "%s: mknod(%s) failed: rc %d: %s\n",
-                       argv[0], argv[1], rc, strerror(rc));
+                       prog, filename, errno, strerror(errno));
                return 2;
-       }
+       } else if (!rc) {
+               rc = stat(filename, &st);
+               if (rc) {
+                       fprintf(stderr, "%s: stat(%s) failed: rc %d: %s\n",
+                               prog, filename, errno, strerror(errno));
+                       return 3;
+               }
 
-       rc = stat(argv[1], &st);
-       if (rc) {
-               fprintf(stderr, "%s: stat(%s) failed: rc %d: %s\n",
-                       argv[0], argv[1], rc, strerror(rc));
-               return 3;
-       }
+               if (st.st_mtime < before_mknod || st.st_mtime > after_mknod) {
+                       fprintf(stderr,
+                               "%s: bad mknod times %lu <= %lu <= %lu false\n",
+                               prog, before_mknod, st.st_mtime, after_mknod);
+                       return 4;
+               }
 
-       if (st.st_mtime < before_mknod || st.st_mtime > after_mknod) {
-               fprintf(stderr, "%s: bad mknod times %lu <= %lu <= %lu false\n",
-                       argv[0], before_mknod, st.st_mtime, after_mknod);
-               return 4;
-       }
-
-       printf("%s: good mknod times %lu <= %lu <= %lu\n",
-              argv[0], before_mknod, st.st_mtime, after_mknod);
+               printf("%s: good mknod times %lu <= %lu <= %lu\n",
+                      prog, before_mknod, st.st_mtime, after_mknod);
 
-       sleep(5);
+               sleep(5);
+       }
 
        before_utime = time(0);
-       rc = utime(argv[0], NULL);
+       rc = utime(filename, NULL);
        after_utime = time(0);
        if (rc) {
-               fprintf(stderr, "%s: stat(%s) failed: rc %d: %s\n",
-                       argv[0], argv[1], rc, strerror(rc));
+               fprintf(stderr, "%s: utime(%s) failed: rc %d: %s\n",
+                       prog, filename, errno, strerror(errno));
                return 5;
        }
 
-       rc = stat(argv[1], &st);
+       rc = stat(filename, &st);
        if (rc) {
                fprintf(stderr, "%s: second stat(%s) failed: rc %d: %s\n",
-                       argv[0], argv[1], rc, strerror(rc));
+                       prog, filename, errno, strerror(errno));
                return 6;
        }
 
        if (st.st_mtime < before_utime || st.st_mtime > after_utime) {
                fprintf(stderr, "%s: bad utime times %lu <= %lu <= %lu false\n",
-                       argv[0], before_utime, st.st_mtime, after_utime);
+                       prog, before_utime, st.st_mtime, after_utime);
                return 7;
        }
 
        printf("%s: good utime times %lu <= %lu <= %lu\n",
-              argv[0], before_mknod, st.st_mtime, after_mknod);
+              prog, before_utime, st.st_mtime, after_utime);
 
        return 0;
 }
index a237f43..02da299 100644 (file)
@@ -8,7 +8,7 @@ CPPFLAGS = $(HAVE_LIBREADLINE)
 lctl_LDADD := $(LIBREADLINE) -lptlctl
 lload_LDADD := -lptlctl
 sbin_PROGRAMS = lctl lfind lstripe obdio obdbarrier lload wirecheck
-sbin_SCRIPTS = lconf lmc llanalyze
+sbin_SCRIPTS = lconf lmc llanalyze llstat.pl llobdstat.pl lactive load_ldap.sh
 wirecheck_SOURCES = wirecheck.c
 lctl_SOURCES = parser.c obd.c lctl.c parser.h obdctl.h
 lload_SOURCES = lload.c 
index e691423..a5e8580 100644 (file)
 import sys, getopt, types
 import string, os
 import ldap
+PYMOD_DIR = "/usr/lib/lustre/python"
+
+def development_mode():
+    base = os.path.dirname(sys.argv[0])
+    if os.access(base+"/Makefile.am", os.R_OK):
+        return 1
+    return 0
+
+if not development_mode():
+    sys.path.append(PYMOD_DIR)
+
 import Lustre
 
 lactive_options = [
index 44e8337..15e5a2c 100755 (executable)
@@ -409,11 +409,11 @@ class LCTLInterface:
         self.run(cmds)
 
     # Recover a device
-    def recover(self, dev_uuid, new_conn):
+    def recover(self, dev_name, new_conn):
         cmds = """
-    device %%%s
+    device $%s
     probe
-    recover %s""" %(dev_uuid, new_conn)
+    recover %s""" %(dev_name, new_conn)
         self.run(cmds)
                 
     # add a route to a range
@@ -797,24 +797,8 @@ def get_local_address(net_type, wildcard):
     return local
         
 
-def is_prepared(uuid):
-    """Return true if a device exists for the uuid"""
-    if config.lctl_dump:
-        return 0
-    if config.noexec and config.cleanup:
-        return 1
-    try:
-        # expect this format:
-        # 1 UP ldlm ldlm ldlm_UUID 2
-        out = lctl.device_list()
-        for s in out:
-            if uuid == string.split(s)[4]:
-                return 1
-    except CommandError, e:
-        e.dump()
-    return 0
-
-def is_prepared_name(name):
+# XXX: instead of device_list, ask for $name and see what we get
+def is_prepared(name):
     """Return true if a device exists for the name"""
     if config.lctl_dump:
         return 0
@@ -834,7 +818,7 @@ def is_prepared_name(name):
 def is_network_prepared():
     """If the LDLM device exists, then assume that all networking
        has been configured"""
-    return is_prepared('ldlm_UUID')
+    return is_prepared('ldlm')
     
 def fs_is_mounted(path):
     """Return true if path is a mounted lustre filesystem"""
@@ -1111,21 +1095,21 @@ class LDLM(Module):
         self.add_lustre_module('ldlm', 'ldlm') 
 
     def prepare(self):
-        if is_prepared(self.uuid):
+        if is_prepared(self.name):
             return
         self.info()
-        lctl.newdev(attach="ldlm %s %s" % (self.name, self.uuid))
+        lctl.newdev(attach="ldlm %s %s" % ('ldlm', 'ldlm_UUID'))
 
     def safe_to_clean(self):
         out = lctl.device_list()
         return len(out) <= 1
 
     def cleanup(self):
-        if is_prepared(self.uuid):
+        if is_prepared(self.name):
             Module.cleanup(self)
 
 class LOV(Module):
-    def __init__(self,db):
+    def __init__(self, db, uuid):
         Module.__init__(self, 'LOV', db)
         self.add_lustre_module('mdc', 'mdc')
         self.add_lustre_module('lov', 'lov')
@@ -1138,17 +1122,19 @@ class LOV(Module):
         self.devlist = self.db.get_refs('obd')
         self.stripe_cnt = self.db.get_val_int('stripecount', len(self.devlist))
         self.osclist = []
-        self.mdc_uuid = ''
+        self.client_uuid = generate_client_uuid(self.name)
+        self.mdc_name = ''
+        self.mdc = get_mdc(db, self.client_uuid, self.name, self.mds_uuid)
         for obd_uuid in self.devlist:
             obd = self.db.lookup(obd_uuid)
-            osc = get_osc(obd, self.name)
+            osc = get_osc(obd, self.client_uuid, self.name)
             if osc:
                 self.osclist.append(osc)
             else:
                 panic('osc not found:', obd_uuid)
             
     def prepare(self):
-        if is_prepared(self.uuid):
+        if is_prepared(self.name):
             return
         for osc in self.osclist:
             try:
@@ -1158,18 +1144,20 @@ class LOV(Module):
             except CommandError, e:
                 print "Error preparing OSC %s (inactive)\n" % osc.uuid
                 raise e
-        self.mdc_uuid = prepare_mdc(self.db, self.name, self.mds_uuid)
+        self.mdc.prepare()
+        self.mdc_name = self.mdc.name
         self.info(self.mds_uuid, self.stripe_cnt, self.stripe_sz,
                   self.stripe_off, self.pattern, self.devlist, self.mds_name)
         lctl.newdev(attach="lov %s %s" % (self.name, self.uuid),
-                    setup ="%s" % (self.mdc_uuid))
+                    setup ="%s" % (self.mdc_name))
 
     def cleanup(self):
-        if is_prepared(self.uuid):
+        if is_prepared(self.name):
             Module.cleanup(self)
         for osc in self.osclist:
             osc.cleanup()
-        cleanup_mdc(self.db, self.name, self.mds_uuid)
+        mdc = get_mdc(self.db, self.client_uuid, self.name, self.mds_uuid)
+        mdc.cleanup()
 
     def load_module(self):
         for osc in self.osclist:
@@ -1189,7 +1177,7 @@ class LOVConfig(Module):
 
         self.lov_uuid = self.db.get_first_ref('lov')
         l = self.db.lookup(self.lov_uuid)
-        self.lov = LOV(l)
+        self.lov = LOV(l, "YOU_SHOULD_NEVER_SEE_THIS_UUID")
         
     def prepare(self):
         lov = self.lov
@@ -1245,7 +1233,7 @@ class MDSDEV(Module):
             Module.load_module(self)
             
     def prepare(self):
-        if is_prepared(self.uuid):
+        if is_prepared(self.name):
             return
         if not self.active:
             debug(self.uuid, "not active")
@@ -1254,7 +1242,7 @@ class MDSDEV(Module):
         run_acceptors()
         blkdev = block_dev(self.devpath, self.size, self.fstype, self.format,
                            self.journal_size)
-        if