Whamcloud - gitweb
branch: HEAD
authorericm <ericm>
Mon, 7 Jul 2008 19:12:54 +0000 (19:12 +0000)
committerericm <ericm>
Mon, 7 Jul 2008 19:12:54 +0000 (19:12 +0000)
port Adaptive Timeouts from b1_6 to HEAD.
b=14071
r=nathan
r=adilger

77 files changed:
lustre/ChangeLog
lustre/cmm/mdc_device.c
lustre/fid/fid_request.c
lustre/fld/fld_request.c
lustre/include/linux/lustre_fsfilt.h
lustre/include/lprocfs_status.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_dlm.h
lustre/include/lustre_export.h
lustre/include/lustre_import.h
lustre/include/lustre_lib.h
lustre/include/lustre_net.h
lustre/include/lustre_sec.h
lustre/include/obd.h
lustre/include/obd_support.h
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/liblustre/llite_lib.c
lustre/liblustre/super.c
lustre/llite/llite_lib.c
lustre/lvfs/lvfs_lib.c
lustre/mdc/lproc_mdc.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mds/mds_lov.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mgc/mgc_request.c
lustre/mgs/mgs_handler.c
lustre/mgs/mgs_internal.h
lustre/obdclass/class_obd.c
lustre/obdclass/genops.c
lustre/obdclass/linux/linux-module.c
lustre/obdclass/lprocfs_status.c
lustre/obdclass/obd_config.c
lustre/obdclass/obd_mount.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io.c
lustre/obdfilter/filter_io_26.c
lustre/osc/lproc_osc.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c
lustre/ost/lproc_ost.c
lustre/ost/ost_handler.c
lustre/ost/ost_internal.h
lustre/ptlrpc/client.c
lustre/ptlrpc/events.c
lustre/ptlrpc/gss/gss_bulk.c
lustre/ptlrpc/gss/gss_cli_upcall.c
lustre/ptlrpc/gss/gss_keyring.c
lustre/ptlrpc/gss/gss_svc_upcall.c
lustre/ptlrpc/gss/sec_gss.c
lustre/ptlrpc/import.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/recov_thread.c
lustre/ptlrpc/sec.c
lustre/ptlrpc/sec_bulk.c
lustre/ptlrpc/sec_null.c
lustre/ptlrpc/sec_plain.c
lustre/ptlrpc/service.c
lustre/ptlrpc/wiretest.c
lustre/tests/recovery-small.sh
lustre/tests/replay-single.sh
lustre/tests/sanity-gss.sh
lustre/tests/sanity.sh
lustre/tests/test-framework.sh
lustre/utils/gss/.cvsignore
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 55c27af..e024b38 100644 (file)
@@ -599,10 +599,11 @@ Bugzilla   : 12836
 Description: lfs find on -1 stripe looping in lsm_lmm_verify_common()
 Details    : Avoid lov_verify_lmm_common() on directory with -1 stripe count.
 
-Severity   : major
-Bugzilla   : 12932
-Description: obd_health_check_timeout too short
-Details    : set obd_health_check_timeout as 1.5x of obd_timeout
+Severity   : enhancement
+Bugzilla   : 3055
+Description: Adaptive timeouts
+Details    : RPC timeouts adapt to changing server load and network
+            conditions to reduce resend attempts and improve recovery time.
 
 Severity   : normal
 Bugzilla   : 12192
index 3328740..146720c 100644 (file)
@@ -137,7 +137,8 @@ static int mdc_obd_add(const struct lu_env *env,
                                          OBD_CONNECT_OSS_CAPA | 
                                          OBD_CONNECT_IBITS |
                                          OBD_CONNECT_MDS_MDS |
-                                         OBD_CONNECT_FID;
+                                         OBD_CONNECT_FID |
+                                         OBD_CONNECT_AT;
                 rc = obd_connect(env, conn, mdc, &mdc->obd_uuid, ocd, NULL);
                 OBD_FREE_PTR(ocd);
                 if (rc) {
index 3910422..d4ecfd9 100644 (file)
@@ -85,6 +85,7 @@ static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
                 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
                         SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
         }
+        ptlrpc_at_set_req_timeout(req);
 
         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
         rc = ptlrpc_queue_wait(req);
index 23bbcf6..f0d5191 100644 (file)
@@ -456,6 +456,7 @@ static int fld_client_rpc(struct obd_export *exp,
 
         ptlrpc_request_set_replen(req);
         req->rq_request_portal = FLD_REQUEST_PORTAL;
+        ptlrpc_at_set_req_timeout(req);
 
         if (fld_op != FLD_LOOKUP)
                 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
index 93c47ee..a8710eb 100644 (file)
@@ -164,14 +164,14 @@ static inline lvfs_sbdev_type fsfilt_journal_sbdev(struct obd_device *obd,
 #define FSFILT_OP_JOIN          11
 #define FSFILT_OP_NOOP          15
 
-#define __fsfilt_check_slow(obd, start, timeout, msg)                     \
+#define __fsfilt_check_slow(obd, start, msg)                            \
 do {                                                                    \
         if (time_before(jiffies, start + 15 * HZ))                      \
                 break;                                                  \
         else if (time_before(jiffies, start + 30 * HZ))                 \
                 CDEBUG(D_VFSTRACE, "%s: slow %s %lus\n", obd->obd_name, \
                        msg, (jiffies-start) / HZ);                      \
-        else if (time_before(jiffies, start + timeout / 2 * HZ))        \
+        else if (time_before(jiffies, start + DISK_TIMEOUT * HZ))       \
                 CWARN("%s: slow %s %lus\n", obd->obd_name, msg,         \
                       (jiffies - start) / HZ);                          \
         else                                                            \
@@ -179,10 +179,10 @@ do {                                                                    \
                        (jiffies - start) / HZ);                         \
 } while (0)
 
-#define fsfilt_check_slow(obd, start, timeout, msg)    \
-do {                                                   \
-        __fsfilt_check_slow(obd, start, timeout, msg); \
-        start = jiffies;                               \
+#define fsfilt_check_slow(obd, start, msg)              \
+do {                                                    \
+        __fsfilt_check_slow(obd, start, msg);           \
+        start = jiffies;                                \
 } while (0)
 
 static inline void *fsfilt_start_log(struct obd_device *obd,
@@ -208,7 +208,7 @@ static inline void *fsfilt_start_log(struct obd_device *obd,
                         LBUG();
                 }
         }
-        fsfilt_check_slow(obd, now, obd_timeout, "journal start");
+        fsfilt_check_slow(obd, now, "journal start");
         return handle;
 }
 
@@ -243,7 +243,7 @@ static inline void *fsfilt_brw_start_log(struct obd_device *obd, int objcount,
                         LBUG();
                 }
         }
-        fsfilt_check_slow(obd, now, obd_timeout, "journal start");
+        fsfilt_check_slow(obd, now, "journal start");
 
         return handle;
 }
@@ -263,7 +263,7 @@ static inline int fsfilt_extend(struct obd_device *obd, struct inode *inode,
         int rc = obd->obd_fsops->fs_extend(inode, nblocks, handle);
         CDEBUG(D_INFO, "extending handle %p with %u blocks\n", handle, nblocks);
 
-        fsfilt_check_slow(obd, now, obd_timeout, "journal extend");
+        fsfilt_check_slow(obd, now, "journal extend");
 
         return rc;
 }
@@ -275,7 +275,7 @@ static inline int fsfilt_commit(struct obd_device *obd, struct inode *inode,
         int rc = obd->obd_fsops->fs_commit(inode, handle, force_sync);
         CDEBUG(D_INFO, "committing handle %p\n", handle);
 
-        fsfilt_check_slow(obd, now, obd_timeout, "journal start");
+        fsfilt_check_slow(obd, now, "journal start");
 
         return rc;
 }
@@ -288,7 +288,7 @@ static inline int fsfilt_commit_async(struct obd_device *obd,
         int rc = obd->obd_fsops->fs_commit_async(inode, handle, wait_handle);
 
         CDEBUG(D_INFO, "committing handle %p (async)\n", *wait_handle);
-        fsfilt_check_slow(obd, now, obd_timeout, "journal start");
+        fsfilt_check_slow(obd, now, "journal start");
 
         return rc;
 }
@@ -299,7 +299,7 @@ static inline int fsfilt_commit_wait(struct obd_device *obd,
         unsigned long now = jiffies;
         int rc = obd->obd_fsops->fs_commit_wait(inode, handle);
         CDEBUG(D_INFO, "waiting for completion %p\n", handle);
-        fsfilt_check_slow(obd, now, obd_timeout, "journal start");
+        fsfilt_check_slow(obd, now, "journal start");
         return rc;
 }
 
@@ -309,7 +309,7 @@ static inline int fsfilt_setattr(struct obd_device *obd, struct dentry *dentry,
         unsigned long now = jiffies;
         int rc;
         rc = obd->obd_fsops->fs_setattr(dentry, handle, iattr, do_trunc);
-        fsfilt_check_slow(obd, now, obd_timeout, "setattr");
+        fsfilt_check_slow(obd, now, "setattr");
         return rc;
 }
 
index f6f0b40..4331fd6 100644 (file)
@@ -283,6 +283,23 @@ struct obd_device;
 struct file;
 struct obd_histogram;
 
+/* Days / hours / mins / seconds format */
+struct dhms {
+        int d,h,m,s;
+};
+static inline void s2dhms(struct dhms *ts, time_t secs)
+{
+        ts->d = secs / 86400;
+        secs = secs % 86400;
+        ts->h = secs / 3600;
+        secs = secs % 3600;
+        ts->m = secs / 60;
+        ts->s = secs % 60;
+}
+#define DHMS_FMT "%dd%dh%02dm%02ds"
+#define DHMS_VARS(x) (x)->d, (x)->h, (x)->m, (x)->s
+
+
 #ifdef LPROCFS
 
 static inline int lprocfs_stats_lock(struct lprocfs_stats *stats, int type)
@@ -436,6 +453,13 @@ extern int lprocfs_rd_num_exports(char *page, char **start, off_t off,
                                   int count, int *eof, void *data);
 extern int lprocfs_rd_numrefs(char *page, char **start, off_t off,
                               int count, int *eof, void *data);
+struct adaptive_timeout;
+extern int lprocfs_at_hist_helper(char *page, int count, int rc,
+                                  struct adaptive_timeout *at);
+extern int lprocfs_rd_timeouts(char *page, char **start, off_t off,
+                               int count, int *eof, void *data);
+extern int lprocfs_wr_timeouts(struct file *file, const char *buffer,
+                               unsigned long count, void *data);
 extern int lprocfs_wr_evict_client(struct file *file, const char *buffer,
                                    unsigned long count, void *data);
 extern int lprocfs_wr_ping(struct file *file, const char *buffer,
@@ -543,6 +567,10 @@ struct file_operations name##_fops = {                                     \
 #define LPROC_SEQ_FOPS_RO(name)         __LPROC_SEQ_FOPS(name, NULL)
 #define LPROC_SEQ_FOPS(name)            __LPROC_SEQ_FOPS(name, name##_seq_write)
 
+/* lproc_ptlrpc.c */
+struct ptlrpc_request;
+extern void target_print_req(void *seq_file, struct ptlrpc_request *req);
+
 /* lprocfs_status.c: read recovery max time bz13079 */
 int lprocfs_obd_rd_recovery_maxtime(char *page, char **start, off_t off,
                                     int count, int *eof, void *data);
@@ -650,6 +678,16 @@ static inline int lprocfs_rd_num_exports(char *page, char **start, off_t off,
 static inline int lprocfs_rd_numrefs(char *page, char **start, off_t off,
                                      int count, int *eof, void *data)
 { return 0; }
+struct adaptive_timeout;
+static inline int lprocfs_at_hist_helper(char *page, int count, int rc,
+                                         struct adaptive_timeout *at)
+{ return 0; }
+static inline int lprocfs_rd_timeouts(char *page, char **start, off_t off,
+                                      int count, int *eof, void *data)
+{ return 0; }
+static inline int lprocfs_wr_timeouts(struct file *file, const char *buffer,
+                                      unsigned long count, void *data)
+{ return 0; }
 static inline int lprocfs_wr_evict_client(struct file *file, const char *buffer,
                                           unsigned long count, void *data)
 { return 0; }
@@ -706,6 +744,9 @@ __u64 lprocfs_stats_collector(struct lprocfs_stats *stats, int idx,
 #define LPROC_SEQ_FOPS_RO(name)
 #define LPROC_SEQ_FOPS(name)
 
+/* lproc_ptlrpc.c */
+#define target_print_req NULL
+
 #endif /* LPROCFS */
 
 #endif /* LPROCFS_SNMP_H */
index 016d8e8..82270b7 100644 (file)
@@ -449,6 +449,9 @@ static inline void lustre_handle_copy(struct lustre_handle *tgt,
         tgt->cookie = src->cookie;
 }
 
+/* flags for lm_flags */
+#define MSGHDR_AT_SUPPORT               0x1
+
 #define lustre_msg lustre_msg_v2
 /* we depend on this structure to be 8-byte aligned */
 /* this type is only endian-adjusted in lustre_unpack_msg() */
@@ -457,8 +460,8 @@ struct lustre_msg_v2 {
         __u32 lm_secflvr;
         __u32 lm_magic;
         __u32 lm_repsize;
-        __u32 lm_timeout;
-        __u32 lm_padding_1;
+        __u32 lm_cksum;
+        __u32 lm_flags;
         __u32 lm_padding_2;
         __u32 lm_padding_3;
         __u32 lm_buflens[0];
@@ -478,8 +481,8 @@ struct ptlrpc_body {
         __u32 pb_flags;
         __u32 pb_op_flags;
         __u32 pb_conn_cnt;
-        __u32 pb_padding_1;
-        __u32 pb_padding_2;
+        __u32 pb_timeout;  /* for req, the deadline, for rep, the service est */
+        __u32 pb_service_time; /* for rep, actual service time */
         __u32 pb_limit;
         __u64 pb_slv;
 };
@@ -511,12 +514,16 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
 #define MSG_OP_FLAG_SHIFT  16
 
 /* Flags that apply to all requests are in the bottom 16 bits */
-#define MSG_GEN_FLAG_MASK      0x0000ffff
-#define MSG_LAST_REPLAY        1
-#define MSG_RESENT             2
-#define MSG_REPLAY             4
-#define MSG_REQ_REPLAY_DONE    8
-#define MSG_LOCK_REPLAY_DONE  16
+#define MSG_GEN_FLAG_MASK     0x0000ffff
+#define MSG_LAST_REPLAY           0x0001
+#define MSG_RESENT                0x0002
+#define MSG_REPLAY                0x0004
+/* #define MSG_AT_SUPPORT         0x0008
+ * This was used in early prototypes of adaptive timeouts, and while there
+ * shouldn't be any users of that code there also isn't a need for using this
+ * bits. Defer usage until at least 1.10 to avoid potential conflict. */
+#define MSG_REQ_REPLAY_DONE       0x0010
+#define MSG_LOCK_REPLAY_DONE      0x0020
 
 /*
  * Flags for all connect opcodes (MDS_CONNECT, OST_CONNECT)
@@ -581,16 +588,16 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
                                 OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | \
                                 OBD_CONNECT_MDS_MDS | OBD_CONNECT_CANCELSET | \
                                 OBD_CONNECT_FID | \
-                                LRU_RESIZE_CONNECT_FLAG)
+                                LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_AT)
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
                                 OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
                                 OBD_CONNECT_BRW_SIZE | OBD_CONNECT_QUOTA64 | \
                                 OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET | \
-                                OBD_CONNECT_CKSUM | \
-                                LRU_RESIZE_CONNECT_FLAG)
+                                OBD_CONNECT_CKSUM | LRU_RESIZE_CONNECT_FLAG | \
+                                OBD_CONNECT_AT)
 #define ECHO_CONNECT_SUPPORTED (0)
-#define MGS_CONNECT_SUPPORTED  (OBD_CONNECT_VERSION)
+#define MGS_CONNECT_SUPPORTED  (OBD_CONNECT_VERSION | OBD_CONNECT_AT)
 
 #define MAX_QUOTA_COUNT32 (0xffffffffULL)
 
index e6cac14..06b5a7b 100644 (file)
@@ -429,6 +429,8 @@ struct ldlm_namespace {
          * Backward link to obd, required for ldlm pool to store new SLV. 
          */
         struct obd_device     *ns_obd;
+
+        struct adaptive_timeout ns_at_estimate;/* estimated lock callback time*/
 };
 
 static inline int ns_is_client(struct ldlm_namespace *ns)
index 4fdbca3..917e0da 100644 (file)
@@ -95,6 +95,7 @@ struct obd_export {
         struct ldlm_export_data   exp_ldlm_data;
         struct list_head          exp_outstanding_replies;
         time_t                    exp_last_request_time;
+        struct list_head          exp_req_replay_queue;
         spinlock_t                exp_lock; /* protects flags int below */
         /* ^ protects exp_outstanding_replies too */
         __u64                     exp_connect_flags;
index 7a5c761..c3ab5fd 100644 (file)
@@ -8,6 +8,22 @@
 #include <lustre_handles.h>
 #include <lustre/lustre_idl.h>
 
+
+/* Adaptive Timeout stuff */
+#define D_ADAPTTO D_OTHER
+#define AT_BINS 4                  /* "bin" means "N seconds of history" */
+#define AT_FLG_NOHIST 0x1          /* use last reported value only */
+
+struct adaptive_timeout {
+        time_t       at_binstart;         /* bin start time */
+        unsigned int at_hist[AT_BINS];    /* timeout history bins */
+        unsigned int at_flags;
+        unsigned int at_current;          /* current timeout value */
+        unsigned int at_worst_ever;       /* worst-ever timeout value */
+        time_t       at_worst_time;       /* worst-ever timeout timestamp */
+        spinlock_t   at_lock;
+};
+
 enum lustre_imp_state {
         LUSTRE_IMP_CLOSED     = 1,
         LUSTRE_IMP_NEW        = 2,
@@ -48,6 +64,13 @@ struct obd_import_conn {
         __u64                     oic_last_attempt; /* jiffies, 64-bit */
 };
 
+#define IMP_AT_MAX_PORTALS 8
+struct imp_at {
+        int                     iat_portal[IMP_AT_MAX_PORTALS];
+        struct adaptive_timeout iat_net_latency;
+        struct adaptive_timeout iat_service_estimate[IMP_AT_MAX_PORTALS];
+};
+
 struct obd_import {
         struct portals_handle     imp_handle;
         atomic_t                  imp_refcount;
@@ -111,8 +134,12 @@ struct obd_import {
         int                       imp_connect_error;
 
         __u32                     imp_msg_magic;
+        __u32                     imp_msghdr_flags;       /* adjusted based on server capability */
 
-        struct ptlrpc_request_pool *imp_rq_pool; /* emergency request pool */
+        struct ptlrpc_request_pool *imp_rq_pool;          /* emergency request pool */
+
+        struct imp_at             imp_at;                 /* adaptive timeout data */
+        time_t                    imp_last_reply_time;    /* for health check */
 };
 
 typedef void (*obd_import_callback)(struct obd_import *imp, void *closure,
@@ -131,6 +158,23 @@ void class_unobserve_import(struct obd_import *imp, obd_import_callback cb,
 void class_notify_import_observers(struct obd_import *imp, int event,
                                    void *event_arg);
 
+/* import.c */
+static inline void at_init(struct adaptive_timeout *at, int val, int flags) {
+        memset(at, 0, sizeof(*at));
+        at->at_current = val;
+        at->at_worst_ever = val;
+        at->at_worst_time = cfs_time_current_sec();
+        at->at_flags = flags;
+        spin_lock_init(&at->at_lock);
+}
+static inline int at_get(struct adaptive_timeout *at) {
+        return at->at_current;
+}
+int at_add(struct adaptive_timeout *at, unsigned int val);
+int import_at_get_index(struct obd_import *imp, int portal);
+extern unsigned int at_max;
+#define AT_OFF (at_max == 0)
+
 /* genops.c */
 struct obd_export;
 extern struct obd_import *class_exp2cliimp(struct obd_export *);
index 98bcf81..abce43e 100644 (file)
@@ -73,18 +73,15 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req);
 #define target_handle_qc_callback(req) (0)
 #endif
 
-void target_cancel_recovery_timer(struct obd_device *obd);
-
-#define OBD_RECOVERY_TIMEOUT (obd_timeout * 5 / 2) /* *waves hands* */
 #define OBD_RECOVERY_MAX_TIME (obd_timeout * 18) /* b13079 */
-void target_start_recovery_timer(struct obd_device *obd);
+
+void target_cancel_recovery_timer(struct obd_device *obd);
 int target_start_recovery_thread(struct obd_device *obd, 
-                                  svc_handler_t handler);
+                                 svc_handler_t handler);
 void target_stop_recovery_thread(struct obd_device *obd);
 void target_cleanup_recovery(struct obd_device *obd);
 int target_queue_recovery_request(struct ptlrpc_request *req,
                                   struct obd_device *obd);
-int target_queue_final_reply(struct ptlrpc_request *req, int rc);
 void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id);
 
 /* client.c */
index d3bb4c6..0ca4cc5 100644 (file)
@@ -293,11 +293,12 @@ struct lu_env;
 struct ptlrpc_request {
         int rq_type; /* one of PTL_RPC_MSG_* */
         struct list_head rq_list;
+        struct list_head rq_timed_list;         /* server-side early replies */
         struct list_head rq_history_list;       /* server-side history */
         __u64            rq_history_seq;        /* history sequence # */
         int rq_status;
         spinlock_t rq_lock;
-        /* client-side flags */
+        /* client-side flags are serialized by rq_lock */
         unsigned long rq_intr:1, rq_replied:1, rq_err:1,
                 rq_timedout:1, rq_resend:1, rq_restart:1,
                 /*
@@ -313,9 +314,15 @@ struct ptlrpc_request {
                 /* this is the last request in the sequence. */
                 rq_sequence:1,
                 rq_no_resend:1, rq_waiting:1, rq_receiving_reply:1,
-                rq_no_delay:1, rq_net_err:1, rq_wait_ctx:1;
+                rq_no_delay:1, rq_net_err:1, rq_wait_ctx:1,
+                rq_early:1, rq_must_unlink:1,
+                /* server-side flags */
+                rq_packed_final:1,  /* packed final reply */
+                rq_sent_final:1;    /* stop sending early replies */
+
         enum rq_phase rq_phase; /* one of RQ_PHASE_* */
-        atomic_t rq_refcount;   /* client-side refcount for SENT race */
+        atomic_t rq_refcount;   /* client-side refcount for SENT race,
+                                   server-side refcounf for multiple replies */
 
         struct ptlrpc_thread *rq_svc_thread; /* initial thread servicing req */
 
@@ -327,7 +334,6 @@ struct ptlrpc_request {
         int rq_reqlen;
         struct lustre_msg *rq_reqmsg;
 
-        int rq_timeout;         /* time to wait for reply (seconds) */
         int rq_replen;
         struct lustre_msg *rq_repmsg;
         __u64 rq_transno;
@@ -364,12 +370,16 @@ struct ptlrpc_request {
         /* (server side), pointed directly into req buffer */
         struct ptlrpc_user_desc *rq_user_desc;
 
+        /* early replies go to offset 0, regular replies go after that */
+        unsigned int             rq_reply_off;
+
         /* various buffer pointers */
         struct lustre_msg       *rq_reqbuf;      /* req wrapper */
         int                      rq_reqbuf_len;  /* req wrapper buf len */
         int                      rq_reqdata_len; /* req wrapper msg len */
-        struct lustre_msg       *rq_repbuf;      /* rep wrapper */
-        int                      rq_repbuf_len;  /* rep wrapper buf len */
+        char                    *rq_repbuf;      /* rep buffer */
+        int                      rq_repbuf_len;  /* rep buffer len */
+        struct lustre_msg       *rq_repdata;     /* rep wrapper msg */
         int                      rq_repdata_len; /* rep wrapper msg len */
         struct lustre_msg       *rq_clrbuf;      /* only in priv mode */
         int                      rq_clrbuf_len;  /* only in priv mode */
@@ -381,6 +391,8 @@ struct ptlrpc_request {
         int rq_import_generation;
         enum lustre_imp_state rq_send_state;
 
+        int rq_early_count;           /* how many early replies (for stats) */
+
         /* client+server request */
         lnet_handle_md_t     rq_req_md_h;
         struct ptlrpc_cb_id  rq_req_cbid;
@@ -407,10 +419,17 @@ struct ptlrpc_request {
         void (*rq_commit_cb)(struct ptlrpc_request *);
         void  *rq_cb_data;
 
-        struct ptlrpc_bulk_desc *rq_bulk;       /* client side bulk */
-        time_t rq_sent;                         /* when request sent, seconds,
-                                                 * or time when request should
-                                                 * be sent */
+        struct ptlrpc_bulk_desc *rq_bulk;/* client side bulk */
+
+        /* client outgoing req */
+        time_t rq_sent;                  /* when request/reply sent (secs), or
+                                          * time when request should be sent */
+
+        volatile time_t rq_deadline;     /* when request must finish. volatile
+               so that servers' early reply updates to the deadline aren't
+               kept in per-cpu cache */
+        int    rq_timeout;               /* service time estimate (secs) */
+
         /* Multi-rpc bits */
         struct list_head rq_set_chain;
         struct ptlrpc_request_set *rq_set;
@@ -604,13 +623,22 @@ struct ptlrpc_service {
         int              srv_n_difficult_replies; /* # 'difficult' replies */
         int              srv_n_active_reqs;     /* # reqs being served */
         cfs_duration_t   srv_rqbd_timeout;      /* timeout before re-posting reqs, in tick */
-        int              srv_watchdog_timeout; /* soft watchdog timeout, in ms */
+        int              srv_watchdog_factor;   /* soft watchdog timeout mutiplier */
         unsigned         srv_cpu_affinity:1;    /* bind threads to CPUs */
+        unsigned         srv_at_check:1;        /* check early replies */
+        cfs_time_t       srv_at_checktime;      /* debug */
 
         __u32            srv_req_portal;
         __u32            srv_rep_portal;
 
-        int               srv_n_queued_reqs;    /* # reqs waiting to be served */
+        /* AT stuff */
+        struct adaptive_timeout srv_at_estimate;/* estimated rpc service time */
+        spinlock_t        srv_at_lock;
+        struct list_head  srv_at_list;          /* reqs waiting for replies */
+        cfs_timer_t       srv_at_timer;         /* early reply timer */
+
+        int               srv_n_queued_reqs;    /* # reqs in either of the queues below */
+        struct list_head  srv_req_in_queue;     /* incoming reqs */
         struct list_head  srv_request_queue;    /* reqs waiting for service */
 
         struct list_head  srv_request_history;  /* request history */
@@ -707,11 +735,14 @@ static inline int ptlrpc_bulk_active (struct ptlrpc_bulk_desc *desc)
         return (rc);
 }
 
-int ptlrpc_send_reply(struct ptlrpc_request *req, int);
+#define PTLRPC_REPLY_MAYBE_DIFFICULT 0x01
+#define PTLRPC_REPLY_EARLY           0x02
+int ptlrpc_send_reply(struct ptlrpc_request *req, int flags);
 int ptlrpc_reply(struct ptlrpc_request *req);
 int ptlrpc_send_error(struct ptlrpc_request *req, int difficult);
 int ptlrpc_error(struct ptlrpc_request *req);
 void ptlrpc_resend_req(struct ptlrpc_request *request);
+int ptlrpc_at_get_net_latency(struct ptlrpc_request *req);
 int ptl_send_rpc(struct ptlrpc_request *request, int noreply);
 int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
 
@@ -722,23 +753,12 @@ void ptlrpc_cleanup_client(struct obd_import *imp);
 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid);
 
 static inline int
-ptlrpc_client_receiving_reply (struct ptlrpc_request *req)
+ptlrpc_client_recv_or_unlink (struct ptlrpc_request *req)
 {
         int           rc;
 
         spin_lock(&req->rq_lock);
-        rc = req->rq_receiving_reply;
-        spin_unlock(&req->rq_lock);
-        return (rc);
-}
-
-static inline int
-ptlrpc_client_replied (struct ptlrpc_request *req)
-{
-        int           rc;
-
-        spin_lock(&req->rq_lock);
-        rc = req->rq_replied;
+        rc = req->rq_receiving_reply || req->rq_must_unlink;
         spin_unlock(&req->rq_lock);
         return (rc);
 }
@@ -776,6 +796,7 @@ void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
 void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
 struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int, int,
                                                 void (*populate_pool)(struct ptlrpc_request_pool *, int));
+void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req);
 struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
                                             const struct req_format *format);
 struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp,
@@ -821,7 +842,7 @@ struct ptlrpc_service_conf {
         int psc_max_reply_size;
         int psc_req_portal;
         int psc_rep_portal;
-        int psc_watchdog_timeout; /* in ms */
+        int psc_watchdog_factor;
         int psc_min_threads;
         int psc_max_threads;
         __u32 psc_ctx_tags;
@@ -841,7 +862,7 @@ struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
 struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
                                        int max_reply_size,
                                        int req_portal, int rep_portal,
-                                       int watchdog_timeout, /* in ms */
+                                       int watchdog_factor,
                                        svc_handler_t, char *name,
                                        cfs_proc_dir_entry_t *proc_entry,
                                        svcreq_printfn_t,
@@ -881,7 +902,10 @@ int lustre_pack_request(struct ptlrpc_request *, __u32 magic, int count,
 int lustre_pack_reply(struct ptlrpc_request *, int count, int *lens,
                       char **bufs);
 int lustre_pack_reply_v2(struct ptlrpc_request *req, int count,
-                         int *lens, char **bufs);
+                         int *lens, char **bufs, int flags);
+#define LPRFL_EARLY_REPLY 1
+int lustre_pack_reply_flags(struct ptlrpc_request *, int count, int *lens,
+                            char **bufs, int flags);
 int lustre_shrink_msg(struct lustre_msg *msg, int segment,
                       unsigned int newlen, int move_data);
 void lustre_free_reply_state(struct ptlrpc_reply_state *rs);
@@ -889,6 +913,7 @@ int lustre_msg_hdr_size(__u32 magic, int count);
 int lustre_msg_size(__u32 magic, int count, int *lengths);
 int lustre_msg_size_v2(int count, int *lengths);
 int lustre_packed_msg_size(struct lustre_msg *msg);
+int lustre_msg_early_size(void);
 int lustre_unpack_msg(struct lustre_msg *m, int len);
 void *lustre_msg_buf_v2(struct lustre_msg_v2 *m, int n, int min_size);
 void *lustre_msg_buf(struct lustre_msg *m, int n, int minlen);
@@ -901,6 +926,8 @@ void *lustre_swab_reqbuf(struct ptlrpc_request *req, int n, int minlen,
                          void *swabber);
 void *lustre_swab_repbuf(struct ptlrpc_request *req, int n, int minlen,
                          void *swabber);
+__u32 lustre_msghdr_get_flags(struct lustre_msg *msg);
+void lustre_msghdr_set_flags(struct lustre_msg *msg, __u32 flags);
 __u32 lustre_msg_get_flags(struct lustre_msg *msg);
 void lustre_msg_add_flags(struct lustre_msg *msg, int flags);
 void lustre_msg_set_flags(struct lustre_msg *msg, int flags);
@@ -922,7 +949,12 @@ void lustre_msg_set_slv(struct lustre_msg *msg, __u64 slv);
 void lustre_msg_set_limit(struct lustre_msg *msg, __u64 limit);
 int lustre_msg_get_status(struct lustre_msg *msg);
 __u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg);
+int lustre_msg_is_v1(struct lustre_msg *msg);
 __u32 lustre_msg_get_magic(struct lustre_msg *msg);
+__u32 lustre_msg_get_timeout(struct lustre_msg *msg);
+__u32 lustre_msg_get_service_time(struct lustre_msg *msg);
+__u32 lustre_msg_get_cksum(struct lustre_msg *msg);
+__u32 lustre_msg_calc_cksum(struct lustre_msg *msg);
 void lustre_msg_set_handle(struct lustre_msg *msg,struct lustre_handle *handle);
 void lustre_msg_set_type(struct lustre_msg *msg, __u32 type);
 void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc);
@@ -933,6 +965,9 @@ void lustre_msg_set_status(struct lustre_msg *msg, __u32 status);
 void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt);
 void ptlrpc_req_set_repsize(struct ptlrpc_request *req, int count, int *sizes);
 void ptlrpc_request_set_replen(struct ptlrpc_request *req);
+void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout);
+void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time);
+void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum);
 
 static inline void
 lustre_shrink_reply(struct ptlrpc_request *req, int segment,
@@ -959,6 +994,16 @@ ptlrpc_rs_decref(struct ptlrpc_reply_state *rs)
                 lustre_free_reply_state(rs);
 }
 
+/* Should only be called once per req */
+static inline void ptlrpc_req_drop_rs(struct ptlrpc_request *req)
+{
+        if (req->rq_reply_state == NULL)
+                return; /* shouldn't occur */
+        ptlrpc_rs_decref(req->rq_reply_state);
+        req->rq_reply_state = NULL;
+        req->rq_repmsg = NULL;
+}
+
 static inline __u32 lustre_request_magic(struct ptlrpc_request *req)
 {
         return lustre_msg_get_magic(req->rq_reqmsg);
index 033a0cf..7c4be3c 100644 (file)
@@ -656,6 +656,9 @@ int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize);
 void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req);
 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
                                int segment, int newsize);
+int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req);
+int sptlrpc_cli_finish_early_reply(struct ptlrpc_request *req);
+
 void sptlrpc_request_out_callback(struct ptlrpc_request *req);
 
 /*
index 5836004..e5804fe 100644 (file)
@@ -941,8 +941,9 @@ struct obd_device {
         spinlock_t                       obd_uncommitted_replies_lock;
         cfs_timer_t                      obd_recovery_timer;
         time_t                           obd_recovery_start; /* seconds */
-        time_t                           obd_recovery_end; /* seconds */
+        time_t                           obd_recovery_end; /* seconds, for lprocfs_status */
         time_t                           obd_recovery_max_time; /* seconds, bz13079 */
+        int                              obd_recovery_timeout;
         
         /* new recovery stuff from CMD2 */
         struct target_recovery_data      obd_recovery_data;
index 6765019..fd4d87f 100644 (file)
@@ -56,11 +56,10 @@ extern unsigned int obd_fail_val;
 extern unsigned int obd_debug_peer_on_timeout;
 extern unsigned int obd_dump_on_timeout;
 extern unsigned int obd_dump_on_eviction;
+/* obd_timeout should only be used for recovery, not for
+   networking / disk / timings affected by load (use Adaptive Timeouts) */
 extern unsigned int obd_timeout;          /* seconds */
-#define PING_INTERVAL max(obd_timeout / 4, 1U)
-#define RECONNECT_INTERVAL max(obd_timeout / 10, 10U)
-extern unsigned int ldlm_timeout;
-extern unsigned int obd_health_check_timeout;
+extern unsigned int ldlm_timeout;         /* seconds */
 extern unsigned int obd_sync_filter;
 extern unsigned int obd_max_dirty_pages;
 extern atomic_t obd_dirty_pages;
@@ -69,17 +68,47 @@ extern int obd_race_state;
 extern unsigned int obd_alloc_fail_rate;
 
 int __obd_fail_check_set(__u32 id, __u32 value, int set);
+int __obd_fail_timeout_set(__u32 id, __u32 value, int ms, int set);
 
 /* lvfs.c */
 int obd_alloc_fail(const void *ptr, const char *name, const char *type,
                    size_t size, const char *file, int line);
 
 /* Timeout definitions */
-#define LDLM_TIMEOUT_DEFAULT 20
 #define OBD_TIMEOUT_DEFAULT 100
-#define HEALTH_CHECK_COEF 3 / 2
-#define HEALTH_CHECK_TIMEOUT_DEFAULT (OBD_TIMEOUT_DEFAULT * HEALTH_CHECK_COEF)
-#define HEALTH_CHECK_TIMEOUT (obd_timeout * HEALTH_CHECK_COEF)
+#define LDLM_TIMEOUT_DEFAULT 20
+/* Time to wait for all clients to reconnect during recovery */
+/* Should be very conservative; must catch the first reconnect after reboot */
+#define OBD_RECOVERY_FACTOR (3) /* times obd_timeout */
+/* Change recovery-small 26b time if you change this */
+#define PING_INTERVAL max(obd_timeout / 4, 1U)
+/* Client may skip 1 ping; we must wait at least 2.5. But for multiple
+ * failover targets the client only pings one server at a time, and pings
+ * can be lost on a loaded network. Since eviction has serious consequences,
+ * and there's no urgent need to evict a client just because it's idle, we
+ * should be very conservative here. */
+#define PING_EVICT_TIMEOUT (PING_INTERVAL * 6)
+#define DISK_TIMEOUT 50          /* Beyond this we warn about disk speed */
+#define CONNECTION_SWITCH_MIN 5U /* Connection switching rate limiter */
+ /* Max connect interval for nonresponsive servers; ~50s to avoid building up
+    connect requests in the LND queues, but within obd_timeout so we don't
+    miss the recovery window */
+#define CONNECTION_SWITCH_MAX min(50U, max(CONNECTION_SWITCH_MIN,obd_timeout))
+#define CONNECTION_SWITCH_INC 5  /* Connection timeout backoff */
+#ifndef CRAY_XT3
+/* In general this should be low to have quick detection of a system
+   running on a backup server. (If it's too low, import_select_connection
+   will increase the timeout anyhow.)  */
+#define INITIAL_CONNECT_TIMEOUT max(CONNECTION_SWITCH_MIN,obd_timeout/20)
+#else
+/* ...but for very large systems (e.g. CRAY) we need to keep the initial
+   connect t.o. high (bz 10803), because they will nearly ALWAYS be doing the
+   connects for the first time (clients "reboot" after every process, so no
+   chance to generate adaptive timeout data. */
+#define INITIAL_CONNECT_TIMEOUT max(CONNECTION_SWITCH_MIN,obd_timeout/2)
+#endif
+#define LONG_UNLINK 300          /* Unlink should happen before now */
+
 
 #define OBD_FAIL_MDS                     0x100
 #define OBD_FAIL_MDS_HANDLE_UNPACK       0x101
@@ -165,7 +194,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_OST_BRW_READ_BULK       0x20f
 #define OBD_FAIL_OST_SYNC_NET            0x210
 #define OBD_FAIL_OST_ALL_REPLY_NET       0x211
-#define OBD_FAIL_OST_ALL_REQUESTS_NET    0x212
+#define OBD_FAIL_OST_ALL_REQUEST_NET     0x212
 #define OBD_FAIL_OST_LDLM_REPLY_NET      0x213
 #define OBD_FAIL_OST_BRW_PAUSE_BULK      0x214
 #define OBD_FAIL_OST_ENOSPC              0x215
@@ -182,6 +211,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_OST_BRW_WRITE_BULK2     0x220
 #define OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221
 #define OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222
+#define OBD_FAIL_OST_PAUSE_CREATE        0x223
+#define OBD_FAIL_OST_BRW_PAUSE_PACK      0x224
 #define OBD_FAIL_OST_CONNECT_NET2        0x225
 
 #define OBD_FAIL_LDLM                    0x300
@@ -202,9 +233,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LDLM_GLIMPSE            0x30f
 #define OBD_FAIL_LDLM_CANCEL_RACE        0x310
 #define OBD_FAIL_LDLM_CANCEL_EVICT_RACE  0x311
-/* 
 #define OBD_FAIL_LDLM_PAUSE_CANCEL       0x312
-*/
 #define OBD_FAIL_LDLM_CLOSE_THREAD       0x313
 #define OBD_FAIL_LDLM_CANCEL_BL_CB_RACE  0x314
 
@@ -231,6 +260,9 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_PTLRPC_DROP_RPC         0x505
 #define OBD_FAIL_PTLRPC_DELAY_SEND       0x506
 #define OBD_FAIL_PTLRPC_DELAY_RECOV      0x507
+#define OBD_FAIL_PTLRPC_CLIENT_BULK_CB   0x508
+#define OBD_FAIL_PTLRPC_PAUSE_REQ        0x50a
+#define OBD_FAIL_PTLRPC_PAUSE_REP        0x50c
 
 #define OBD_FAIL_OBD_PING_NET            0x600
 #define OBD_FAIL_OBD_LOG_CANCEL_NET      0x601
@@ -245,23 +277,25 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_TGT_DELAY_RECONNECT     0x704
 #define OBD_FAIL_TGT_DELAY_PRECREATE     0x705
 #define OBD_FAIL_TGT_TOOMANY_THREADS     0x706
+#define OBD_FAIL_TGT_REPLAY_DROP         0x707
 
 #define OBD_FAIL_MDC_REVALIDATE_PAUSE    0x800
 #define OBD_FAIL_MDC_ENQUEUE_PAUSE       0x801
+#define OBD_FAIL_MDC_OLD_EXT_FLAGS       0x802
 #define OBD_FAIL_MDC_GETATTR_ENQUEUE     0x803
 
 #define OBD_FAIL_MGS                     0x900
 #define OBD_FAIL_MGS_ALL_REQUEST_NET     0x901
 #define OBD_FAIL_MGS_ALL_REPLY_NET       0x902
-#define OBD_FAIL_MGC_PROCESS_LOG         0x903
-#define OBD_FAIL_MGS_SLOW_REQUEST_NET    0x904
-#define OBD_FAIL_MGS_SLOW_TARGET_REG     0x905
+#define OBD_FAIL_MGC_PAUSE_PROCESS_LOG   0x903
+#define OBD_FAIL_MGS_PAUSE_REQ           0x904
+#define OBD_FAIL_MGS_PAUSE_TARGET_REG    0x905
 
-#define OBD_FAIL_QUOTA_QD_COUNT_32BIT    0xa00
+#define OBD_FAIL_QUOTA_QD_COUNT_32BIT    0xA00
 
-#define OBD_FAIL_LPROC_REMOVE            0xb00
+#define OBD_FAIL_LPROC_REMOVE            0xB00
 
-#define OBD_FAIL_GENERAL_ALLOC           0xc00
+#define OBD_FAIL_GENERAL_ALLOC           0xC00
 
 #define OBD_FAIL_SEQ                     0x1000
 #define OBD_FAIL_SEQ_QUERY_NET           0x1001
@@ -273,6 +307,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_SEC_CTX_INIT_NET        0x1201
 #define OBD_FAIL_SEC_CTX_INIT_CONT_NET   0x1202
 #define OBD_FAIL_SEC_CTX_FINI_NET        0x1203
+#define OBD_FAIL_SEC_CTX_HDL_PAUSE       0x1204
 
 /* Failure injection control */
 #define OBD_FAIL_MASK_SYS    0x0000FF00
@@ -321,28 +356,28 @@ static inline int obd_fail_check_set(__u32 id, __u32 value, int set)
         obd_fail_check_set(id, value, OBD_FAIL_LOC_RESET)
 
 
-static inline int obd_fail_timeout_set(__u32 id, __u32 value, int secs, int set)
+static inline int obd_fail_timeout_set(__u32 id, __u32 value, int ms, int set)
 {
-        int ret = 0;
-        if (unlikely(OBD_FAIL_PRECHECK(id) &&
-            (ret = __obd_fail_check_set(id, value, set)))) {
-                CERROR("obd_fail_timeout id %x sleeping for %d secs\n",
-                       id, secs);
-                set_current_state(TASK_UNINTERRUPTIBLE);
-                cfs_schedule_timeout(CFS_TASK_UNINT,  cfs_time_seconds(secs));
-                set_current_state(TASK_RUNNING);
-                CERROR("obd_fail_timeout id %x awake\n", id);
-        }
-        return ret;
+        if (unlikely(OBD_FAIL_PRECHECK(id)))
+                return __obd_fail_timeout_set(id, value, ms, set);
+        else
+                return 0;
 }
 
-/* If id hit obd_fail_loc, sleep secs */
+/* If id hit obd_fail_loc, sleep for seconds or milliseconds */
 #define OBD_FAIL_TIMEOUT(id, secs) \
-        obd_fail_timeout_set(id, 0, secs, OBD_FAIL_LOC_NOSET)
+        obd_fail_timeout_set(id, 0, secs * 1000, OBD_FAIL_LOC_NOSET)
+
+#define OBD_FAIL_TIMEOUT_MS(id, ms) \
+        obd_fail_timeout_set(id, 0, ms, OBD_FAIL_LOC_NOSET)
 
-/* If id hit obd_fail_loc, obd_fail_loc |= value and sleep secs */
+/* If id hit obd_fail_loc, obd_fail_loc |= value and
+ * sleep seconds or milliseconds */
 #define OBD_FAIL_TIMEOUT_ORSET(id, value, secs) \
-        obd_fail_timeout_set(id, value, secs, OBD_FAIL_LOC_ORSET)
+        obd_fail_timeout_set(id, value, secs * 1000, OBD_FAIL_LOC_ORSET)
+
+#define OBD_FAIL_TIMEOUT_MS_ORSET(id, value, ms) \
+        obd_fail_timeout_set(id, value, ms, OBD_FAIL_LOC_ORSET)
 
 #ifdef __KERNEL__
 static inline void obd_fail_write(int id, struct super_block *sb)
index 4cf0798..21ab977 100644 (file)
@@ -49,6 +49,8 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                           int count, int max, int cancel_flags, int flags);
 int ldlm_cancel_lru_estimate(struct ldlm_namespace *ns, int count, int max, 
                              int flags);
+extern int ldlm_enqueue_min;
+int ldlm_get_enq_timeout(struct ldlm_lock *lock);
 
 /* ldlm_resource.c */
 int ldlm_resource_putref_locked(struct ldlm_resource *res);
index 2bb5e64..2c2afb2 100644 (file)
@@ -576,6 +576,10 @@ void target_client_add_cb(struct obd_device *obd, __u64 transno, void *cb_data,
         spin_unlock(&exp->exp_lock);
 }
 EXPORT_SYMBOL(target_client_add_cb);
+static void 
+target_start_and_reset_recovery_timer(struct obd_device *obd,
+                                      struct ptlrpc_request *req,
+                                      int new_client);
 
 int target_handle_connect(struct ptlrpc_request *req)
 {
@@ -766,26 +770,33 @@ int target_handle_connect(struct ptlrpc_request *req)
                                    (time_t)cfs_time_current_sec());
         }
 
-        /* We want to handle EALREADY but *not* -EALREADY from
-         * target_handle_reconnect(), return reconnection state in a flag */
-        if (rc == EALREADY) {
-                lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
-                rc = 0;
-        } else if (rc) {
+        if (rc < 0) {
                 GOTO(out, rc);
         }
-        /* Tell the client if we're in recovery. */
-        /* If this is the first client, start the recovery timer */
+
         CWARN("%s: connection from %s@%s %st"LPU64" exp %p cur %ld last %ld\n",
                target->obd_name, cluuid.uuid, libcfs_nid2str(req->rq_peer.nid),
               target->obd_recovering ? "recovering/" : "", data->ocd_transno,
               export, (long)cfs_time_current_sec(),
               export ? (long)export->exp_last_request_time : 0);
 
-
+        /* Tell the client if we're in recovery. */
         if (target->obd_recovering) {
                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
-                target_start_recovery_timer(target);
+                /* If this is the first time a client connects,
+                   reset the recovery timer */
+                if (rc == 0)
+                        target_start_and_reset_recovery_timer(target, req, 
+                                                              !export);
+        }
+
+        /* We want to handle EALREADY but *not* -EALREADY from
+         * target_handle_reconnect(), return reconnection state in a flag */
+        if (rc == EALREADY) {
+                lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
+                rc = 0;
+        } else {
+                LASSERT(rc == 0);
         }
 
         /* Tell the client if we support replayable requests */
@@ -957,6 +968,12 @@ dont_check_exports:
         revimp->imp_state = LUSTRE_IMP_FULL;
         revimp->imp_msg_magic = req->rq_reqmsg->lm_magic;
 
+        if ((export->exp_connect_flags & OBD_CONNECT_AT) &&
+            (revimp->imp_msg_magic != LUSTRE_MSG_MAGIC_V1))
+                revimp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
+        else
+                revimp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
+
         rc = sptlrpc_import_sec_adapt(revimp, req->rq_svc_ctx,
                                       req->rq_flvr.sf_rpc);
         if (rc) {
@@ -1046,6 +1063,7 @@ struct ptlrpc_request *ptlrpc_clone_req( struct ptlrpc_request *orig_req)
 
         class_export_get(copy_req->rq_export);
         CFS_INIT_LIST_HEAD(&copy_req->rq_list);
+        CFS_INIT_LIST_HEAD(&copy_req->rq_replay_list);
         sptlrpc_svc_ctx_addref(copy_req);
 
         if (copy_req->rq_reply_state) {
@@ -1059,13 +1077,11 @@ struct ptlrpc_request *ptlrpc_clone_req( struct ptlrpc_request *orig_req)
         return copy_req;
 }
 
-void ptlrpc_free_clone( struct ptlrpc_request *req)
+void ptlrpc_free_clone(struct ptlrpc_request *req)
 {
-        if (req->rq_reply_state) {
-                ptlrpc_rs_decref(req->rq_reply_state);
-                req->rq_reply_state = NULL;
-        }
+        LASSERT(list_empty(&req->rq_replay_list));
 
+        ptlrpc_req_drop_rs(req);
         sptlrpc_svc_ctx_decref(req);
         class_export_put(req->rq_export);
         list_del(&req->rq_list);
@@ -1078,6 +1094,48 @@ void ptlrpc_free_clone( struct ptlrpc_request *req)
         OBD_FREE_PTR(req);
 }
 
+static int target_exp_enqueue_req_replay(struct ptlrpc_request *req)
+{
+        __u64                  transno = lustre_msg_get_transno(req->rq_reqmsg);
+        struct obd_export     *exp = req->rq_export;
+        struct ptlrpc_request *reqiter;
+        int                    dup = 0;
+
+        LASSERT(exp);
+
+        spin_lock(&exp->exp_lock);
+        list_for_each_entry(reqiter, &exp->exp_req_replay_queue,
+                            rq_replay_list) {
+                if (lustre_msg_get_transno(reqiter->rq_reqmsg) == transno) {
+                        dup = 1;
+                        break;
+                }
+        }
+
+        if (dup) {
+                /* we expect it with RESENT and REPLAY flags */
+                if ((lustre_msg_get_flags(req->rq_reqmsg) &
+                     (MSG_RESENT | MSG_REPLAY)) != (MSG_RESENT | MSG_REPLAY))
+                        CERROR("invalid flags %x of resent replay\n",
+                               lustre_msg_get_flags(req->rq_reqmsg));
+        } else {
+                list_add_tail(&req->rq_replay_list, &exp->exp_req_replay_queue);
+        }
+
+        spin_unlock(&exp->exp_lock);
+        return dup;
+}
+
+static void target_exp_dequeue_req_replay(struct ptlrpc_request *req)
+{
+        LASSERT(!list_empty(&req->rq_replay_list));
+        LASSERT(req->rq_export);
+
+        spin_lock(&req->rq_export->exp_lock);
+        list_del_init(&req->rq_replay_list);
+        spin_unlock(&req->rq_export->exp_lock);
+}
+
 #ifdef __KERNEL__
 static void target_finish_recovery(struct obd_device *obd)
 {
@@ -1109,6 +1167,7 @@ static void abort_req_replay_queue(struct obd_device *obd)
                         DEBUG_REQ(D_ERROR, req,
                                   "failed abort_req_reply; skipping");
                 }
+                target_exp_dequeue_req_replay(req);
                 ptlrpc_free_clone(req);
         }
 }
@@ -1157,6 +1216,7 @@ void target_cleanup_recovery(struct obd_device *obd)
 
         list_for_each_entry_safe(req, n, &obd->obd_req_replay_queue, rq_list) {
                 LASSERT (req->rq_reply_state == 0);
+                target_exp_dequeue_req_replay(req);
                 ptlrpc_free_clone(req);
         }
         list_for_each_entry_safe(req, n, &obd->obd_lock_replay_queue, rq_list){
@@ -1174,7 +1234,11 @@ void target_cleanup_recovery(struct obd_device *obd)
 static void target_recovery_expired(unsigned long castmeharder)
 {
         struct obd_device *obd = (struct obd_device *)castmeharder;
-        CERROR("%s: recovery timed out, aborting\n", obd->obd_name);
+        LCONSOLE_WARN("%s: recovery timed out; %d clients never reconnected "
+                      "after %lds (%d clients did)\n",
+                      obd->obd_name, obd->obd_recoverable_clients,
+                      cfs_time_current_sec()- obd->obd_recovery_start,
+                      obd->obd_connected_clients);
         spin_lock_bh(&obd->obd_processing_task_lock);
         if (obd->obd_recovering)
                 obd->obd_abort_recovery = 1;
@@ -1189,43 +1253,96 @@ void target_cancel_recovery_timer(struct obd_device *obd)
         CDEBUG(D_HA, "%s: cancel recovery timer\n", obd->obd_name);
         cfs_timer_disarm(&obd->obd_recovery_timer);
 }
-
-static void reset_recovery_timer(struct obd_device *obd)
+  
+/* extend = 1 means require at least "duration" seconds left in the timer,
+   extend = 0 means set the total duration (start_recovery_timer) */
+static void reset_recovery_timer(struct obd_device *obd, int duration,
+                                 int extend)
 {
-        time_t timeout_shift = OBD_RECOVERY_TIMEOUT;
+        cfs_time_t now = cfs_time_current_sec();
+        cfs_duration_t left;
+
         spin_lock_bh(&obd->obd_processing_task_lock);
-        if (!obd->obd_recovering) {
+        if (!obd->obd_recovering || obd->obd_abort_recovery) {
                 spin_unlock_bh(&obd->obd_processing_task_lock);
                 return;
         }
-        if (cfs_time_current_sec() + OBD_RECOVERY_TIMEOUT > 
-            obd->obd_recovery_start + obd->obd_recovery_max_time)
-                timeout_shift = obd->obd_recovery_start + 
-                        obd->obd_recovery_max_time - cfs_time_current_sec();
-        cfs_timer_arm(&obd->obd_recovery_timer, cfs_time_shift(timeout_shift));
+
+        left = cfs_time_sub(obd->obd_recovery_end, now);
+
+        if (extend && (duration > left))
+                obd->obd_recovery_timeout += duration - left;
+        else if (!extend && (duration > obd->obd_recovery_timeout))
+                /* Track the client's largest expected replay time */
+                obd->obd_recovery_timeout = duration;
+#ifdef CRAY_XT3
+        /* 
+         * If total recovery time already exceed the 
+         * obd_recovery_max_time, then CRAY XT3 will 
+         * abort the recovery
+         */
+        if(obd->obd_recovery_timeout > obd->obd_recovery_max_time)
+                obd->obd_recovery_timeout = obd->obd_recovery_max_time;
+#endif
+        obd->obd_recovery_end = obd->obd_recovery_start + 
+                                obd->obd_recovery_timeout;
+        if (!cfs_timer_is_armed(&obd->obd_recovery_timer) ||
+            cfs_time_before(now, obd->obd_recovery_end)) {
+                left = cfs_time_sub(obd->obd_recovery_end, now);
+                cfs_timer_arm(&obd->obd_recovery_timer, cfs_time_shift(left));
+        }
         spin_unlock_bh(&obd->obd_processing_task_lock);
-        CDEBUG(D_HA, "%s: timer will expire in %u seconds\n", obd->obd_name,
-               (unsigned int)timeout_shift);
-        /* Only used for lprocfs_status */
-        obd->obd_recovery_end = cfs_time_current_sec() + timeout_shift;
+        CDEBUG(D_HA, "%s: recovery timer will expire in %u seconds\n",
+               obd->obd_name, (unsigned)left);
 }
 
+static void resume_recovery_timer(struct obd_device *obd)
+{
+        LASSERT(!cfs_timer_is_armed(&obd->obd_recovery_timer));
+
+        /* to be safe, make it at least OBD_RECOVERY_FACTOR * obd_timeout */
+        reset_recovery_timer(obd, OBD_RECOVERY_FACTOR * obd_timeout, 1);
+}
 
-/* Only start it the first time called */
-void target_start_recovery_timer(struct obd_device *obd)
+static void check_and_start_recovery_timer(struct obd_device *obd)
 {
         spin_lock_bh(&obd->obd_processing_task_lock);
-        if (obd->obd_recovery_handler
-            || timer_pending((struct timer_list *)&obd->obd_recovery_timer)) {
+        if (cfs_timer_is_armed(&obd->obd_recovery_timer)) {
                 spin_unlock_bh(&obd->obd_processing_task_lock);
                 return;
         }
-        CWARN("%s: starting recovery timer (%us)\n", obd->obd_name,
-              OBD_RECOVERY_TIMEOUT);
-        cfs_timer_init(&obd->obd_recovery_timer, target_recovery_expired, obd);
+        CWARN("%s: starting recovery timer\n", obd->obd_name);
+        obd->obd_recovery_start = cfs_time_current_sec();
+        /* minimum */
+        obd->obd_recovery_timeout = OBD_RECOVERY_FACTOR * obd_timeout;
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
-        reset_recovery_timer(obd);
+        reset_recovery_timer(obd, obd->obd_recovery_timeout, 0);
+}
+
+/* Reset the timer with each new client connection */
+/*
+ * This timer is actually reconnect_timer, which is for making sure 
+ * the total recovery window is at least as big as my reconnect 
+ * attempt timing. So the initial recovery time_out will be set to
+ * OBD_RECOVERY_FACTOR * obd_timeout. If the timeout coming
+ * from client is bigger than this, then the recovery time_out will
+ * be extend to make sure the client could be reconnected, in the 
+ * process, the timeout from the new client should be ignored.
+ */
+
+static void
+target_start_and_reset_recovery_timer(struct obd_device *obd,
+                                      struct ptlrpc_request *req,
+                                      int new_client)
+{
+        int req_timeout = OBD_RECOVERY_FACTOR * 
+                          lustre_msg_get_timeout(req->rq_reqmsg);
+
+        check_and_start_recovery_timer(obd);
+
+        if (req_timeout > obd->obd_recovery_timeout && !new_client)
+                reset_recovery_timer(obd, req_timeout, 0);
 }
 
 #ifdef __KERNEL__
@@ -1317,6 +1434,7 @@ static struct ptlrpc_request *target_next_replay_req(struct obd_device *obd)
         } else if (!list_empty(&obd->obd_req_replay_queue)) {
                 req = list_entry(obd->obd_req_replay_queue.next,
                                  struct ptlrpc_request, rq_list);
+                target_exp_dequeue_req_replay(req);
                 list_del_init(&req->rq_list);
                 obd->obd_requests_queued_for_recovery--;
         } else {
@@ -1441,7 +1559,9 @@ static int handle_recovery_req(struct ptlrpc_thread *thread,
         /* don't reset timer for final stage */
         if (!req_replay_done(req->rq_export) ||
             !lock_replay_done(req->rq_export))
-                reset_recovery_timer(class_exp2obd(req->rq_export));
+                reset_recovery_timer(class_exp2obd(req->rq_export),
+                       OBD_RECOVERY_FACTOR * AT_OFF ? obd_timeout :
+                       at_get(&req->rq_rqbd->rqbd_service->srv_at_estimate), 1);
         ptlrpc_free_clone(req);
         RETURN(0);
 }
@@ -1504,6 +1624,7 @@ static int target_recovery_thread(void *arg)
         CDEBUG(D_INFO, "1: request replay stage - %d clients from t"LPU64"\n",
               atomic_read(&obd->obd_req_replay_clients),
               obd->obd_next_recovery_transno);
+        resume_recovery_timer(obd);
         while ((req = target_next_replay_req(obd))) {
                 LASSERT(trd->trd_processing_task == current->pid);
                 DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s",
@@ -1528,9 +1649,11 @@ static int target_recovery_thread(void *arg)
                 class_disconnect_stale_exports(obd, req_replay_done);
                 abort_req_replay_queue(obd);
         }
+
         /* The second stage: replay locks */
         CDEBUG(D_INFO, "2: lock replay stage - %d clients\n",
                atomic_read(&obd->obd_lock_replay_clients));
+        resume_recovery_timer(obd);
         while ((req = target_next_replay_lock(obd))) {
                 LASSERT(trd->trd_processing_task == current->pid);
                 DEBUG_REQ(D_HA|D_WARNING, req, "processing lock from %s: ",
@@ -1636,12 +1759,13 @@ void target_recovery_init(struct obd_device *obd, svc_handler_t handler)
               "last_transno "LPU64"\n", obd->obd_name,
               obd->obd_max_recoverable_clients, obd->obd_last_committed);
         obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
-        target_start_recovery_thread(obd, handler);
-        obd->obd_recovery_start = cfs_time_current_sec();
-        /* Only used for lprocfs_status */
-        obd->obd_recovery_end = obd->obd_recovery_start + OBD_RECOVERY_TIMEOUT;
+        obd->obd_recovery_start = 0;
+        obd->obd_recovery_end = 0;
+        obd->obd_recovery_timeout = OBD_RECOVERY_FACTOR * obd_timeout;
         /* bz13079: this should be set to desired value for ost but not for mds */
         obd->obd_recovery_max_time = OBD_RECOVERY_MAX_TIME;
+        cfs_timer_init(&obd->obd_recovery_timer, target_recovery_expired, obd);
+        target_start_recovery_thread(obd, handler);
 }
 EXPORT_SYMBOL(target_recovery_init);
 
@@ -1779,13 +1903,8 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         }
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
-        /* A resent, replayed request that is still on the queue; just drop it.
-           The queued request will handle this. */
-        if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT|MSG_REPLAY)) ==
-            (MSG_RESENT | MSG_REPLAY)) {
-                DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
+        if (OBD_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_DROP))
                 RETURN(0);
-        }
 
         req = ptlrpc_clone_req(req);
         if (req == NULL)
@@ -1800,6 +1919,13 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         }
         LASSERT(req->rq_export->exp_req_replay_needed);
 
+        if (target_exp_enqueue_req_replay(req)) {
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
+                ptlrpc_free_clone(req);
+                RETURN(0);
+        }
+
         /* XXX O(n^2) */
         list_for_each(tmp, &obd->obd_req_replay_queue) {
                 struct ptlrpc_request *reqiter =
@@ -1810,6 +1936,16 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                         inserted = 1;
                         break;
                 }
+
+                if (unlikely(lustre_msg_get_transno(reqiter->rq_reqmsg) ==
+                             transno)) {
+                        DEBUG_REQ(D_ERROR, req, "dropping replay: transno "
+                                  "has been claimed by another client");
+                        spin_unlock_bh(&obd->obd_processing_task_lock);
+                        target_exp_dequeue_req_replay(req);
+                        ptlrpc_free_clone(req);
+                        RETURN(0);
+                }
         }
 
         if (!inserted)
@@ -1819,7 +1955,6 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         wake_up(&obd->obd_next_transno_waitq);
         spin_unlock_bh(&obd->obd_processing_task_lock);
         RETURN(0);
-
 }
 
 struct obd_device * target_req2obd(struct ptlrpc_request *req)
@@ -1880,7 +2015,7 @@ int target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id)
                 DEBUG_REQ(D_NET, req, "sending reply");
         }
 
-        return (ptlrpc_send_reply(req, 1));
+        return (ptlrpc_send_reply(req, PTLRPC_REPLY_MAYBE_DIFFICULT));
 }
 
 void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
@@ -1992,7 +2127,8 @@ void target_committed_to_req(struct ptlrpc_request *req)
                 lustre_msg_set_last_committed(req->rq_repmsg,
                                               obd->obd_last_committed);
         else
-                DEBUG_REQ(D_IOCTL, req, "not sending last_committed update");
+                DEBUG_REQ(D_IOCTL, req, "not sending last_committed update (%d/"
+                          "%d)", obd->obd_no_transno, req->rq_repmsg == NULL);
 
         CDEBUG(D_INFO, "last_committed "LPU64", transno "LPU64", xid "LPU64"\n",
                obd->obd_last_committed, req->rq_transno, req->rq_xid);
index 7b3b159..3962278 100644 (file)
@@ -60,10 +60,10 @@ inline cfs_time_t round_timeout(cfs_time_t timeout)
         return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
 }
 
-/* timeout for initial callback (AST) reply */
-static inline unsigned int ldlm_get_rq_timeout(unsigned int ldlm_timeout,
-                                               unsigned int obd_timeout)
+/* timeout for initial callback (AST) reply (bz10399) */
+static inline unsigned int ldlm_get_rq_timeout(void)
 {
+        /* Non-AT value */
         unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
 
         return timeout < 1 ? 1 : timeout;
@@ -263,11 +263,11 @@ repeat:
                         goto repeat;
                 }
 
-                LDLM_ERROR(lock, "lock callback timer expired: evicting client "
-                           "%s@%s nid %s\n",
-                           lock->l_export->exp_client_uuid.uuid,
-                           lock->l_export->exp_connection->c_remote_uuid.uuid,
-                           libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
+                LDLM_ERROR(lock, "lock callback timer expired after %lds: "
+                           "evicting client at %s ",
+                           cfs_time_current_sec()- lock->l_enqueued_time.tv_sec,
+                           libcfs_nid2str(
+                                   lock->l_export->exp_connection->c_peer.nid));
 
                 last = lock;
 
@@ -307,21 +307,25 @@ repeat:
  */
 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock)
 {
+        int timeout;
         cfs_time_t timeout_rounded;
 
         if (!list_empty(&lock->l_pending_chain))
                 return 0;
 
-        lock->l_callback_timeout =cfs_time_add(cfs_time_current(),
-                                               cfs_time_seconds(obd_timeout)/2);
+        timeout = ldlm_get_enq_timeout(lock);
+
+        lock->l_callback_timeout = cfs_time_shift(timeout);
 
         timeout_rounded = round_timeout(lock->l_callback_timeout);
 
-        if (cfs_time_before(timeout_rounded, cfs_timer_deadline(&waiting_locks_timer)) ||
+        if (cfs_time_before(timeout_rounded,
+                            cfs_timer_deadline(&waiting_locks_timer)) ||
             !cfs_timer_is_armed(&waiting_locks_timer)) {
                 cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
-
         }
+        /* if the new lock has a shorter timeout than something earlier on
+           the list, we'll wait the longer amount of time; no big deal. */
         list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
         return 1;
 }
@@ -649,7 +653,9 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         }
 
         req->rq_send_state = LUSTRE_IMP_FULL;
-        req->rq_timeout = ldlm_get_rq_timeout(ldlm_timeout, obd_timeout);
+        /* ptlrpc_prep_req already set timeout */
+        if (AT_OFF)
+                req->rq_timeout = ldlm_get_rq_timeout();
 
         if (lock->l_export && lock->l_export->exp_ldlm_stats)
                 lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
@@ -678,7 +684,8 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         total_enqueue_wait = cfs_timeval_sub(&granted_time,
                                              &lock->l_enqueued_time, NULL);
 
-        if (total_enqueue_wait / 1000000 > obd_timeout)
+        if (total_enqueue_wait / ONE_MILLION > obd_timeout)
+                /* non-fatal with AT - change to LDLM_DEBUG? */
                 LDLM_ERROR(lock, "enqueue wait took %luus from "CFS_TIME_T,
                            total_enqueue_wait, lock->l_enqueued_time.tv_sec);
 
@@ -720,9 +727,17 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
                    total_enqueue_wait);
 
+        /* Server-side enqueue wait time estimate, used in
+            __ldlm_add_waiting_lock to set future enqueue timers */
+        at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
+               total_enqueue_wait / ONE_MILLION);
+
         ptlrpc_request_set_replen(req);
+
         req->rq_send_state = LUSTRE_IMP_FULL;
-        req->rq_timeout = ldlm_get_rq_timeout(ldlm_timeout, obd_timeout);
+        /* ptlrpc_prep_req already set timeout */
+        if (AT_OFF)
+                req->rq_timeout = ldlm_get_rq_timeout();
 
         /* We only send real blocking ASTs after the lock is granted */
         lock_res_and_lock(lock);
@@ -786,7 +801,9 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
 
 
         req->rq_send_state = LUSTRE_IMP_FULL;
-        req->rq_timeout = ldlm_get_rq_timeout(ldlm_timeout, obd_timeout);
+        /* ptlrpc_prep_req already set timeout */
+        if (AT_OFF)
+                req->rq_timeout = ldlm_get_rq_timeout();
 
         if (lock->l_export && lock->l_export->exp_ldlm_stats)
                 lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
@@ -1084,7 +1101,7 @@ existing_lock:
         EXIT;
  out:
         req->rq_status = err;
-        if (req->rq_reply_state == NULL) {
+        if (!req->rq_packed_final) {
                 err = lustre_pack_reply(req, 1, NULL, NULL);
                 if (rc == 0)
                         rc = err;
@@ -1216,7 +1233,7 @@ int ldlm_handle_convert(struct ptlrpc_request *req)
         return rc;
 }
 
-/* Cancel all the locks, which handles are packed into ldlm_request */
+/* Cancel all the locks whos handles are packed into ldlm_request */
 int ldlm_request_cancel(struct ptlrpc_request *req,
                         const struct ldlm_request *dlm_req, int first)
 {
@@ -1471,7 +1488,7 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
                 return 0;
 
         req->rq_status = rc;
-        if (req->rq_reply_state == NULL) {
+        if (!req->rq_packed_final) {
                 rc = lustre_pack_reply(req, 1, NULL, NULL);
                 if (rc)
                         return rc;
@@ -2045,7 +2062,7 @@ static int ldlm_setup(void)
         ldlm_state->ldlm_cb_service =
                 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
                                 LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
-                                LDLM_CB_REPLY_PORTAL, ldlm_timeout * 900,
+                                LDLM_CB_REPLY_PORTAL, 1800,
                                 ldlm_callback_handler, "ldlm_cbd",
                                 ldlm_svc_proc_dir, NULL,
                                 ldlm_min_threads, ldlm_max_threads,
@@ -2060,7 +2077,7 @@ static int ldlm_setup(void)
         ldlm_state->ldlm_cancel_service =
                 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
                                 LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL,
-                                LDLM_CANCEL_REPLY_PORTAL, ldlm_timeout * 6000,
+                                LDLM_CANCEL_REPLY_PORTAL, 6000,
                                 ldlm_cancel_handler, "ldlm_canceld",
                                 ldlm_svc_proc_dir, NULL,
                                 ldlm_min_threads, ldlm_max_threads,
index f2049ec..4e68bee 100644 (file)
 
 #include "ldlm_internal.h"
 
+int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
+CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
+                "lock enqueue timeout minimum");
+
 static void interrupted_completion_wait(void *data)
 {
 }
@@ -65,7 +69,8 @@ int ldlm_expired_completion_wait(void *data)
                           CFS_DURATION_T"s ago); not entering recovery in "
                            "server code, just going back to sleep",
                           lock->l_enqueued_time.tv_sec,
-                           cfs_time_current_sec() - lock->l_enqueued_time.tv_sec);
+                           cfs_time_current_sec() -
+                           lock->l_enqueued_time.tv_sec);
                 if (cfs_time_after(cfs_time_current(), next_dump)) {
                         last_dump = next_dump;
                         next_dump = cfs_time_shift(300);
@@ -89,6 +94,20 @@ int ldlm_expired_completion_wait(void *data)
         RETURN(0);
 }
 
+/* We use the same basis for both server side and client side functions
+   from a single node. */
+int ldlm_get_enq_timeout(struct ldlm_lock *lock)
+{
+        int timeout = at_get(&lock->l_resource->lr_namespace->ns_at_estimate);
+        if (AT_OFF)
+                return obd_timeout / 2;
+        /* Since these are non-updating timeouts, we should be conservative.
+           It would be nice to have some kind of "early reply" mechanism for
+           lock callbacks too... */
+        timeout = timeout + (timeout >> 1); /* 150% */
+        return max(timeout, ldlm_enqueue_min);
+}
+
 static int is_granted_or_cancelled(struct ldlm_lock *lock)
 {
         int ret = 0;
@@ -110,6 +129,7 @@ int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
         struct obd_device *obd;
         struct obd_import *imp = NULL;
         struct l_wait_info lwi;
+        __u32 timeout;
         int rc = 0;
         ENTRY;
 
@@ -134,8 +154,14 @@ noreproc:
         obd = class_exp2obd(lock->l_conn_export);
 
         /* if this is a local lock, then there is no import */
-        if (obd != NULL)
+        if (obd != NULL) {
                 imp = obd->u.cli.cl_import;
+        }
+
+        /* Wait a long time for enqueue - server may have to callback a
+           lock from another client.  Server will evict the other client if it
+           doesn't respond reasonably, and then give us the lock. */
+        timeout = ldlm_get_enq_timeout(lock) * 2;
 
         lwd.lwd_lock = lock;
 
@@ -143,7 +169,7 @@ noreproc:
                 LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
                 lwi = LWI_INTR(interrupted_completion_wait, &lwd);
         } else {
-                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
+                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
                                        ldlm_expired_completion_wait,
                                        interrupted_completion_wait, &lwd);
         }
@@ -168,7 +194,13 @@ noreproc:
                 RETURN(rc);
         }
 
-        LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
+        LDLM_DEBUG(lock, "client-side enqueue waking up: granted after %lds",
+                   cfs_time_current_sec() - lock->l_enqueued_time.tv_sec);
+
+        /* Update our time estimate */
+        at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
+               cfs_time_current_sec() - lock->l_enqueued_time.tv_sec);
+
         RETURN(0);
 }
 
@@ -921,6 +953,8 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
         LASSERT(exp != NULL);
         LASSERT(count > 0);
 
+        OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, obd_fail_val);
+
         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
                 RETURN(count);
 
@@ -955,9 +989,9 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                 req->rq_no_resend = 1;
                 req->rq_no_delay = 1;
 
-                /* XXX FIXME bug 249 */
                 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
                 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
+                ptlrpc_at_set_req_timeout(req);
 
                 ldlm_cancel_pack(req, cancels, count);
 
index 84b72fd..c3a3aa1 100644 (file)
@@ -367,6 +367,8 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obd, char *name,
                 GOTO(out_proc, rc);
         }
 
+        at_init(&ns->ns_at_estimate, ldlm_enqueue_min, 0);
+
         ldlm_namespace_register(ns, client);
         RETURN(ns);
 out_proc:
index 9aa5498..f023382 100644 (file)
@@ -157,7 +157,8 @@ int liblustre_process_log(struct config_llog_instance *cfg,
         if (ocd == NULL)
                 GOTO(out_cleanup, rc = -ENOMEM);
 
-        ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_FID;
+        ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_FID |
+                                 OBD_CONNECT_AT;
 #ifdef LIBLUSTRE_POSIX_ACL
         ocd->ocd_connect_flags |= OBD_CONNECT_ACL;
 #endif
@@ -291,7 +292,7 @@ int _sysio_lustre_init(void)
                         obd_timeout);
         }
 
-       /* debug peer on timeout? */
+        /* debug peer on timeout? */
         envstr = getenv("LIBLUSTRE_DEBUG_PEER_ON_TIMEOUT");
         if (envstr != NULL) {
                 obd_debug_peer_on_timeout = 
index fae94f2..5789360 100644 (file)
@@ -2076,7 +2076,7 @@ llu_fsswop_mount(const char *source,
                            sizeof(async), &async, NULL);
 
         ocd.ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_VERSION |
-                                OBD_CONNECT_FID;
+                                OBD_CONNECT_FID | OBD_CONNECT_AT;
 #ifdef LIBLUSTRE_POSIX_ACL
         ocd.ocd_connect_flags |= OBD_CONNECT_ACL;
 #endif
@@ -2113,7 +2113,7 @@ llu_fsswop_mount(const char *source,
 
         ocd.ocd_connect_flags = OBD_CONNECT_SRVLOCK | OBD_CONNECT_REQPORTAL |
                                 OBD_CONNECT_VERSION | OBD_CONNECT_TRUNCLOCK |
-                                OBD_CONNECT_FID;
+                                OBD_CONNECT_FID | OBD_CONNECT_AT;
         ocd.ocd_version = LUSTRE_VERSION_CODE;
         err = obd_connect(NULL, &dt_conn, obd, &sbi->ll_sb_uuid, &ocd, NULL);
         if (err) {
index 6c346db..941a42a 100644 (file)
@@ -206,7 +206,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
                                   OBD_CONNECT_JOIN     | OBD_CONNECT_ATTRFID  |
                                   OBD_CONNECT_VERSION  | OBD_CONNECT_MDS_CAPA |
                                   OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET|
-                                  OBD_CONNECT_FID;
+                                  OBD_CONNECT_FID      | OBD_CONNECT_AT;
 
 #ifdef HAVE_LRU_RESIZE_SUPPORT
         if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
@@ -371,7 +371,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
         data->ocd_connect_flags = OBD_CONNECT_GRANT     | OBD_CONNECT_VERSION  |
                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
                                   OBD_CONNECT_CANCELSET | OBD_CONNECT_FID      |
-                                  OBD_CONNECT_SRVLOCK   | OBD_CONNECT_TRUNCLOCK;
+                                  OBD_CONNECT_SRVLOCK   | OBD_CONNECT_TRUNCLOCK|
+                                  OBD_CONNECT_AT;
         if (sbi->ll_flags & LL_SBI_OSS_CAPA)
                 data->ocd_connect_flags |= OBD_CONNECT_OSS_CAPA;
 
index 5725e25..ebe6c97 100644 (file)
@@ -124,6 +124,24 @@ int __obd_fail_check_set(__u32 id, __u32 value, int set)
 }
 EXPORT_SYMBOL(__obd_fail_check_set);
 
+int __obd_fail_timeout_set(__u32 id, __u32 value, int ms, int set)
+{
+        int ret = 0;
+
+        ret = __obd_fail_check_set(id, value, set);
+        if (ret) {
+                CERROR("obd_fail_timeout id %x sleeping for %dms\n",
+                       id, ms);
+                set_current_state(TASK_UNINTERRUPTIBLE);
+                cfs_schedule_timeout(CFS_TASK_UNINT,
+                                     cfs_time_seconds(ms) / 1000);
+                set_current_state(TASK_RUNNING);
+                CERROR("obd_fail_timeout id %x awake\n", id);
+        }
+        return ret;
+}
+EXPORT_SYMBOL(__obd_fail_timeout_set);
+
 #ifdef LPROCFS
 void lprocfs_counter_add(struct lprocfs_stats *stats, int idx,
                                        long amount)
index bea58ac..dafe84e 100644 (file)
@@ -74,11 +74,12 @@ static struct lprocfs_vars lprocfs_mdc_obd_vars[] = {
         { "kbytesavail",     lprocfs_rd_kbytesavail, 0, 0 },
         { "filestotal",      lprocfs_rd_filestotal,  0, 0 },
         { "filesfree",       lprocfs_rd_filesfree,   0, 0 },
-        //{ "filegroups",      lprocfs_rd_filegroups,  0, 0 },
+        /*{ "filegroups",      lprocfs_rd_filegroups,  0, 0 },*/
         { "mds_server_uuid", lprocfs_rd_server_uuid, 0, 0 },
         { "mds_conn_uuid",   lprocfs_rd_conn_uuid,   0, 0 },
         { "max_rpcs_in_flight", mdc_rd_max_rpcs_in_flight,
                                 mdc_wr_max_rpcs_in_flight, 0 },
+        { "timeouts",        lprocfs_rd_timeouts,    0, 0 },
         { 0 }
 };
 
index 5307cb1..399ddd4 100644 (file)
@@ -143,7 +143,8 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
         }
 
         if (op_data->op_attr.ia_valid & ATTR_FROM_OPEN) {
-                req->rq_request_portal = MDS_SETATTR_PORTAL; //XXX FIXME bug 249
+                req->rq_request_portal = MDS_SETATTR_PORTAL;
+                ptlrpc_at_set_req_timeout(req);
                 rpc_lock = obd->u.cli.cl_setattr_lock;
         } else {
                 rpc_lock = obd->u.cli.cl_rpc_lock;
index 14ddcc9..2e3e349 100644 (file)
@@ -810,8 +810,8 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data,
         /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a
          * portal whose threads are not taking any DLM locks and are therefore
          * always progressing */
-        /* XXX FIXME bug 249 */
         req->rq_request_portal = MDS_READPAGE_PORTAL;
+        ptlrpc_at_set_req_timeout(req);
 
         /* Ensure that this close's handle is fixed up during replay. */
         if (likely(mod != NULL))
@@ -942,6 +942,7 @@ int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
         }
 
         req->rq_request_portal = MDS_READPAGE_PORTAL;
+        ptlrpc_at_set_req_timeout(req);
 
         desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL);
         if (desc == NULL)
@@ -983,8 +984,9 @@ int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid,
                 RETURN(rc);
         }
 
-        /* XXX FIXME bug 249 */
         req->rq_request_portal = MDS_READPAGE_PORTAL;
+        ptlrpc_at_set_req_timeout(req);
+
         desc = ptlrpc_prep_bulk_imp(req, 1, BULK_PUT_SINK, MDS_BULK_PORTAL);
         if (desc == NULL) {
                 ptlrpc_request_free(req);
index e5a074b..506b648 100644 (file)
@@ -476,9 +476,10 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         OBD_ALLOC(data, sizeof(*data));
         if (data == NULL)
                 RETURN(-ENOMEM);
-        data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX |
+        data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
-                                  OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID;
+                                  OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
+                                  OBD_CONNECT_AT;
 #ifdef HAVE_LRU_RESIZE_SUPPORT
         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
 #endif
index 686addd..8bc9c12 100644 (file)
@@ -319,15 +319,19 @@ static int mdt_getstatus(struct mdt_thread_info *info)
 
 static int mdt_statfs(struct mdt_thread_info *info)
 {
-        struct md_device  *next  = info->mti_mdt->mdt_child;
-        struct obd_statfs *osfs;
-        int                rc;
+        struct md_device      *next  = info->mti_mdt->mdt_child;
+        struct ptlrpc_service *svc;
+        struct obd_statfs     *osfs;
+        int                    rc;
 
         ENTRY;
 
+        svc = info->mti_pill->rc_req->rq_rqbd->rqbd_service;
+
         /* This will trigger a watchdog timeout */
         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
-                         (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
+                         (MDT_SERVICE_WATCHDOG_FACTOR *
+                          at_get(&svc->srv_at_estimate) / 1000) + 1);
 
         rc = mdt_check_ucred(info);
         if (rc)
@@ -1123,6 +1127,7 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
         int                      tmpcount;
         int                      tmpsize;
+        int                      timeout;
         int                      i;
         int                      rc;
         ENTRY;
@@ -1146,7 +1151,11 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
                 GOTO(abort_bulk, rc = 0);
 
-        *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
+        timeout = (int) req->rq_deadline - cfs_time_current_sec();
+        if (timeout < 0)
+                CERROR("Req deadline already passed %lu (now: %lu)\n",
+                       req->rq_deadline, cfs_time_current_sec());
+        *lwi = LWI_TIMEOUT(max(timeout, 1) * HZ, NULL, NULL);
         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
         LASSERT (rc == 0 || rc == -ETIMEDOUT);
 
@@ -1707,6 +1716,8 @@ static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
                         sptlrpc_svc_ctx_invalidate(req);
         }
 
+        OBD_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, obd_fail_val);
+
         return rc;
 }
 
@@ -3337,21 +3348,21 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
         procfs_entry = m->mdt_md_dev.md_lu_dev.ld_obd->obd_proc_entry;
 
         conf = (typeof(conf)) {
-                .psc_nbufs            = MDS_NBUFS,
-                .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = MDS_MAXREQSIZE,
-                .psc_max_reply_size   = MDS_MAXREPSIZE,
-                .psc_req_portal       = MDS_REQUEST_PORTAL,
-                .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = MDS_MAXREQSIZE,
+                .psc_max_reply_size  = MDS_MAXREPSIZE,
+                .psc_req_portal      = MDS_REQUEST_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
                 /*
                  * We'd like to have a mechanism to set this on a per-device
                  * basis, but alas...
                  */
-                .psc_min_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
-                                       MDT_MAX_THREADS),
-                .psc_max_threads   = MDT_MAX_THREADS,
-                .psc_ctx_tags      = LCT_MD_THREAD
+                .psc_min_threads    = min(max(mdt_num_threads, MDT_MIN_THREADS),
+                                          MDT_MAX_THREADS),
+                .psc_max_threads     = MDT_MAX_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD
         };
 
         m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
@@ -3360,7 +3371,8 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
 
         m->mdt_regular_service =
                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
-                                     procfs_entry, NULL, LUSTRE_MDT_NAME);
+                                     procfs_entry, target_print_req,
+                                     LUSTRE_MDT_NAME);
         if (m->mdt_regular_service == NULL)
                 RETURN(-ENOMEM);
 
@@ -3373,22 +3385,22 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * ideally.
          */
         conf = (typeof(conf)) {
-                .psc_nbufs            = MDS_NBUFS,
-                .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = MDS_MAXREQSIZE,
-                .psc_max_reply_size   = MDS_MAXREPSIZE,
-                .psc_req_portal       = MDS_READPAGE_PORTAL,
-                .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_min_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
-                                       MDT_MAX_THREADS),
-                .psc_max_threads   = MDT_MAX_THREADS,
-                .psc_ctx_tags      = LCT_MD_THREAD
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = MDS_MAXREQSIZE,
+                .psc_max_reply_size  = MDS_MAXREPSIZE,
+                .psc_req_portal      = MDS_READPAGE_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+                .psc_min_threads    = min(max(mdt_num_threads, MDT_MIN_THREADS),
+                                          MDT_MAX_THREADS),
+                .psc_max_threads     = MDT_MAX_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD
         };
         m->mdt_readpage_service =
                 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
                                      LUSTRE_MDT_NAME "_readpage",
-                                     procfs_entry, NULL, "mdt_rdpg");
+                                     procfs_entry, target_print_req,"mdt_rdpg");
 
         if (m->mdt_readpage_service == NULL) {
                 CERROR("failed to start readpage service\n");
@@ -3401,23 +3413,23 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * setattr service configuration.
          */
         conf = (typeof(conf)) {
-                .psc_nbufs            = MDS_NBUFS,
-                .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = MDS_MAXREQSIZE,
-                .psc_max_reply_size   = MDS_MAXREPSIZE,
-                .psc_req_portal       = MDS_SETATTR_PORTAL,
-                .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = MDS_MAXREQSIZE,
+                .psc_max_reply_size  = MDS_MAXREPSIZE,
+                .psc_req_portal      = MDS_SETATTR_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
                 .psc_min_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
-                                       MDT_MAX_THREADS),
-                .psc_max_threads   = MDT_MAX_THREADS,
-                .psc_ctx_tags      = LCT_MD_THREAD
+                                         MDT_MAX_THREADS),
+                .psc_max_threads     = MDT_MAX_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD
         };
 
         m->mdt_setattr_service =
                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
                                      LUSTRE_MDT_NAME "_setattr",
-                                     procfs_entry, NULL, "mdt_attr");
+                                     procfs_entry, target_print_req,"mdt_attr");
 
         if (!m->mdt_setattr_service) {
                 CERROR("failed to start setattr service\n");
@@ -3432,22 +3444,22 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * sequence controller service configuration
          */
         conf = (typeof(conf)) {
-                .psc_nbufs = MDS_NBUFS,
-                .psc_bufsize = MDS_BUFSIZE,
-                .psc_max_req_size = SEQ_MAXREQSIZE,
-                .psc_max_reply_size = SEQ_MAXREPSIZE,
-                .psc_req_portal = SEQ_CONTROLLER_PORTAL,
-                .psc_rep_portal = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_min_threads = SEQ_NUM_THREADS,
-                .psc_max_threads = SEQ_NUM_THREADS,
-                .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = SEQ_MAXREQSIZE,
+                .psc_max_reply_size  = SEQ_MAXREPSIZE,
+                .psc_req_portal      = SEQ_CONTROLLER_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+                .psc_min_threads     = SEQ_NUM_THREADS,
+                .psc_max_threads     = SEQ_NUM_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD|LCT_DT_THREAD
         };
 
         m->mdt_mdsc_service =
                 ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
                                      LUSTRE_MDT_NAME"_mdsc",
-                                     procfs_entry, NULL, "mdt_mdsc");
+                                     procfs_entry, target_print_req,"mdt_mdsc");
         if (!m->mdt_mdsc_service) {
                 CERROR("failed to start seq controller service\n");
                 GOTO(err_mdt_svc, rc = -ENOMEM);
@@ -3461,22 +3473,22 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * metadata sequence server service configuration
          */
         conf = (typeof(conf)) {
-                .psc_nbufs = MDS_NBUFS,
-                .psc_bufsize = MDS_BUFSIZE,
-                .psc_max_req_size = SEQ_MAXREQSIZE,
-                .psc_max_reply_size = SEQ_MAXREPSIZE,
-                .psc_req_portal = SEQ_METADATA_PORTAL,
-                .psc_rep_portal = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_min_threads = SEQ_NUM_THREADS,
-                .psc_max_threads = SEQ_NUM_THREADS,
-                .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = SEQ_MAXREQSIZE,
+                .psc_max_reply_size  = SEQ_MAXREPSIZE,
+                .psc_req_portal      = SEQ_METADATA_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+                .psc_min_threads     = SEQ_NUM_THREADS,
+                .psc_max_threads     = SEQ_NUM_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD|LCT_DT_THREAD
         };
 
         m->mdt_mdss_service =
                 ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
                                      LUSTRE_MDT_NAME"_mdss",
-                                     procfs_entry, NULL, "mdt_mdss");
+                                     procfs_entry, target_print_req,"mdt_mdss");
         if (!m->mdt_mdss_service) {
                 CERROR("failed to start metadata seq server service\n");
                 GOTO(err_mdt_svc, rc = -ENOMEM);
@@ -3493,22 +3505,22 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * controller which manages space.
          */
         conf = (typeof(conf)) {
-                .psc_nbufs = MDS_NBUFS,
-                .psc_bufsize = MDS_BUFSIZE,
-                .psc_max_req_size = SEQ_MAXREQSIZE,
-                .psc_max_reply_size = SEQ_MAXREPSIZE,
-                .psc_req_portal = SEQ_DATA_PORTAL,
-                .psc_rep_portal = OSC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_min_threads = SEQ_NUM_THREADS,
-                .psc_max_threads = SEQ_NUM_THREADS,
-                .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = SEQ_MAXREQSIZE,
+                .psc_max_reply_size  = SEQ_MAXREPSIZE,
+                .psc_req_portal      = SEQ_DATA_PORTAL,
+                .psc_rep_portal      = OSC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+                .psc_min_threads     = SEQ_NUM_THREADS,
+                .psc_max_threads     = SEQ_NUM_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD|LCT_DT_THREAD
         };
 
         m->mdt_dtss_service =
                 ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
                                      LUSTRE_MDT_NAME"_dtss",
-                                     procfs_entry, NULL, "mdt_dtss");
+                                     procfs_entry, target_print_req,"mdt_dtss");
         if (!m->mdt_dtss_service) {
                 CERROR("failed to start data seq server service\n");
                 GOTO(err_mdt_svc, rc = -ENOMEM);
@@ -3520,22 +3532,22 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
 
         /* FLD service start */
         conf = (typeof(conf)) {
-                .psc_nbufs            = MDS_NBUFS,
-                .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = FLD_MAXREQSIZE,
-                .psc_max_reply_size   = FLD_MAXREPSIZE,
-                .psc_req_portal       = FLD_REQUEST_PORTAL,
-                .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_min_threads      = FLD_NUM_THREADS,
-                .psc_max_threads      = FLD_NUM_THREADS,
-                .psc_ctx_tags         = LCT_DT_THREAD|LCT_MD_THREAD
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = FLD_MAXREQSIZE,
+                .psc_max_reply_size  = FLD_MAXREPSIZE,
+                .psc_req_portal      = FLD_REQUEST_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+                .psc_min_threads     = FLD_NUM_THREADS,
+                .psc_max_threads     = FLD_NUM_THREADS,
+                .psc_ctx_tags        = LCT_DT_THREAD|LCT_MD_THREAD
         };
 
         m->mdt_fld_service =
                 ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
                                      LUSTRE_MDT_NAME"_fld",
-                                     procfs_entry, NULL, "mdt_fld");
+                                     procfs_entry, target_print_req, "mdt_fld");
         if (!m->mdt_fld_service) {
                 CERROR("failed to start fld service\n");
                 GOTO(err_mdt_svc, rc = -ENOMEM);
@@ -3550,21 +3562,22 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m)
          * mds-mds requests be not blocked during recovery.
          */
         conf = (typeof(conf)) {
-                .psc_nbufs            = MDS_NBUFS,
-                .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = MDS_MAXREQSIZE,
-                .psc_max_reply_size   = MDS_MAXREPSIZE,
-                .psc_req_portal       = MDS_MDS_PORTAL,
-                .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_min_threads      = min(max(mdt_num_threads, MDT_MIN_THREADS),
-                                            MDT_MAX_THREADS),
-                .psc_max_threads      = MDT_MAX_THREADS,
-                .psc_ctx_tags         = LCT_MD_THREAD
+                .psc_nbufs           = MDS_NBUFS,
+                .psc_bufsize         = MDS_BUFSIZE,
+                .psc_max_req_size    = MDS_MAXREQSIZE,
+                .psc_max_reply_size  = MDS_MAXREPSIZE,
+                .psc_req_portal      = MDS_MDS_PORTAL,
+                .psc_rep_portal      = MDC_REPLY_PORTAL,
+                .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+                .psc_min_threads    = min(max(mdt_num_threads, MDT_MIN_THREADS),
+                                          MDT_MAX_THREADS),
+                .psc_max_threads     = MDT_MAX_THREADS,
+                .psc_ctx_tags        = LCT_MD_THREAD
         };
-        m->mdt_xmds_service = ptlrpc_init_svc_conf(&conf, mdt_xmds_handle,
-                                                  LUSTRE_MDT_NAME "_mds",
-                                                  procfs_entry, NULL, "mdt_xmds");
+        m->mdt_xmds_service =
+                ptlrpc_init_svc_conf(&conf, mdt_xmds_handle,
+                                     LUSTRE_MDT_NAME "_mds",
+                                     procfs_entry, target_print_req,"mdt_xmds");
 
         if (m->mdt_xmds_service == NULL) {
                 CERROR("failed to start readpage service\n");
index b0d0cf3..2d07448 100644 (file)
@@ -163,8 +163,7 @@ struct mdt_device {
         struct lprocfs_stats      *mdt_stats;
 };
 
-/*XXX copied from mds_internal.h */
-#define MDT_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
+#define MDT_SERVICE_WATCHDOG_FACTOR     (2000)
 #define MDT_ROCOMPAT_SUPP       (OBD_ROCOMPAT_LOVOBJID)
 #define MDT_INCOMPAT_SUPP       (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR)
 
index 1f0411f..e7e05b8 100644 (file)
@@ -1081,7 +1081,7 @@ static int mgc_process_log(struct obd_device *mgc,
         if (cld->cld_stopping)
                 RETURN(0);
 
-        OBD_FAIL_TIMEOUT(OBD_FAIL_MGC_PROCESS_LOG, 20);
+        OBD_FAIL_TIMEOUT(OBD_FAIL_MGC_PAUSE_PROCESS_LOG, 20);
 
         lsi = s2lsi(cld->cld_cfg.cfg_sb);
 
index 3c3c186..79d6f0f 100644 (file)
@@ -205,9 +205,9 @@ static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         mgs->mgs_service =
                 ptlrpc_init_svc(MGS_NBUFS, MGS_BUFSIZE, MGS_MAXREQSIZE,
                                 MGS_MAXREPSIZE, MGS_REQUEST_PORTAL,
-                                MGC_REPLY_PORTAL, MGS_SERVICE_WATCHDOG_TIMEOUT,
+                                MGC_REPLY_PORTAL, 2000,
                                 mgs_handle, LUSTRE_MGS_NAME,
-                                obd->obd_proc_entry, NULL,
+                                obd->obd_proc_entry, target_print_req,
                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
                                 "ll_mgs", LCT_MD_THREAD);
 
@@ -397,7 +397,7 @@ static int mgs_handle_target_reg(struct ptlrpc_request *req)
                                    obd->obd_name, lockrc);
         }
 
-        OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_SLOW_TARGET_REG, 10);
+        OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_PAUSE_TARGET_REG, 10);
 
         /* Log writing contention is handled by the fsdb_sem */
 
@@ -546,7 +546,9 @@ int mgs_handle(struct ptlrpc_request *req)
         ENTRY;
 
         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
-        OBD_FAIL_TIMEOUT(OBD_FAIL_MGS_SLOW_REQUEST_NET, 2);
+        OBD_FAIL_TIMEOUT_MS(OBD_FAIL_MGS_PAUSE_REQ, obd_fail_val);
+        if (OBD_FAIL_CHECK(OBD_FAIL_MGS_ALL_REQUEST_NET))
+                RETURN(0);
 
         LASSERT(current->journal_info == NULL);
         opc = lustre_msg_get_opc(req->rq_reqmsg);
index e0833f9..1c31c24 100644 (file)
@@ -15,9 +15,6 @@
 #include <lustre_log.h>
 #include <lustre_export.h>
 
-/* in ms */
-#define MGS_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
-
 /* mgs_llog.c */
 int class_dentry_readdir(struct obd_device *obd, struct dentry *dir,
                          struct vfsmount *inmnt, 
index 9de3728..62ac5d3 100644 (file)
@@ -65,9 +65,8 @@ __u64 obd_pages;
 unsigned int obd_debug_peer_on_timeout;
 unsigned int obd_dump_on_timeout;
 unsigned int obd_dump_on_eviction;
-unsigned int obd_timeout = OBD_TIMEOUT_DEFAULT; /* seconds */
+unsigned int obd_timeout = OBD_TIMEOUT_DEFAULT;   /* seconds */
 unsigned int ldlm_timeout = LDLM_TIMEOUT_DEFAULT; /* seconds */
-unsigned int obd_health_check_timeout = HEALTH_CHECK_TIMEOUT_DEFAULT; /* seconds */
 unsigned int obd_max_dirty_pages = 256;
 atomic_t obd_dirty_pages;
 
@@ -387,7 +386,6 @@ EXPORT_SYMBOL(obd_dump_on_timeout);
 EXPORT_SYMBOL(obd_dump_on_eviction);
 EXPORT_SYMBOL(obd_timeout);
 EXPORT_SYMBOL(ldlm_timeout);
-EXPORT_SYMBOL(obd_health_check_timeout);
 EXPORT_SYMBOL(obd_max_dirty_pages);
 EXPORT_SYMBOL(obd_dirty_pages);
 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
index 9e4a7f5..a630808 100644 (file)
@@ -680,6 +680,7 @@ void class_export_destroy(struct obd_export *exp)
                 ptlrpc_put_connection_superhack(exp->exp_connection);
 
         LASSERT(list_empty(&exp->exp_outstanding_replies));
+        LASSERT(list_empty(&exp->exp_req_replay_queue));
         obd_destroy_export(exp);
  
         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
@@ -705,6 +706,7 @@ struct obd_export *class_new_export(struct obd_device *obd,
         atomic_set(&export->exp_rpc_count, 0);
         export->exp_obd = obd;
         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
+        CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
         /* XXX this should be in LDLM init */
         CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
         spin_lock_init(&export->exp_ldlm_data.led_lock);
@@ -837,6 +839,18 @@ void class_import_destroy(struct obd_import *import)
         EXIT;
 }
 
+static void init_imp_at(struct imp_at *at) {
+        int i;
+        at_init(&at->iat_net_latency, 0, 0);
+        for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
+                /* max service estimates are tracked on the server side, so
+                   don't use the AT history here, just use the last reported
+                   val. (But keep hist for proc histogram, worst_ever) */
+                at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
+                        AT_FLG_NOHIST);
+        }
+}
+
 struct obd_import *class_new_import(struct obd_device *obd)
 {
         struct obd_import *imp;
@@ -863,6 +877,7 @@ struct obd_import *class_new_import(struct obd_device *obd)
         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
         class_handle_hash(&imp->imp_handle, import_handle_addref);
+        init_imp_at(&imp->imp_at);
 
         /* the default magic is V2, will be used in connect RPC, and
          * then adjusted according to the flags in request/reply. */
@@ -1023,10 +1038,12 @@ static void class_disconnect_export_list(struct list_head *list, int flags)
                 fake_exp->exp_flags = flags;
                 spin_unlock(&fake_exp->exp_lock);
 
+                CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
+                       "last request at %ld\n",
+                       exp->exp_obd->obd_name, obd_export_nid2str(exp),
+                       exp, exp->exp_last_request_time);
                 rc = obd_disconnect(fake_exp);
                 class_export_put(exp);
-                CDEBUG(D_HA, "disconnecting export %s (%p): rc %d\n",
-                       exp->exp_client_uuid.uuid, exp, rc);
         }
         EXIT;
 }
index 2215132..df8a15d 100644 (file)
@@ -286,27 +286,6 @@ static int obd_proc_read_health(char *page, char **start, off_t off,
         return rc;
 }
 
-static int obd_proc_rd_health_timeout(char *page, char **start, off_t off,
-                                      int count, int *eof, void *data)
-{
-        *eof = 1;
-        return snprintf(page, count, "%d\n", obd_health_check_timeout);
-}
-
-static int obd_proc_wr_health_timeout(struct file *file, const char *buffer,
-                                      unsigned long count, void *data)
-{
-        int val, rc;
-
-        rc = lprocfs_write_helper(buffer, count, &val);
-        if (rc)
-                return rc;
-
-        obd_health_check_timeout = val;
-
-        return count;
-}
-
 /* Root for /proc/fs/lustre */
 struct proc_dir_entry *proc_lustre_root = NULL;
 
@@ -314,8 +293,6 @@ struct lprocfs_vars lprocfs_base[] = {
         { "version", obd_proc_read_version, NULL, NULL },
         { "pinger", obd_proc_read_pinger, NULL, NULL },
         { "health_check", obd_proc_read_health, NULL, NULL },
-        { "health_check_timeout", obd_proc_rd_health_timeout,
-           obd_proc_wr_health_timeout, NULL },
         { 0 }
 };
 #else
index c9fa3db..d34eba0 100644 (file)
@@ -600,6 +600,70 @@ int lprocfs_rd_conn_uuid(char *page, char **start, off_t off, int count,
         return rc;
 }
 
+int lprocfs_at_hist_helper(char *page, int count, int rc,
+                           struct adaptive_timeout *at)
+{
+        int i;
+        for (i = 0; i < AT_BINS; i++)
+                rc += snprintf(page + rc, count - rc, "%3u ", at->at_hist[i]);
+        rc += snprintf(page + rc, count - rc, "\n");
+        return rc;
+}
+
+/* See also ptlrpc_lprocfs_rd_timeouts */
+int lprocfs_rd_timeouts(char *page, char **start, off_t off, int count,
+                        int *eof, void *data)
+{
+        struct obd_device *obd = (struct obd_device *)data;
+        struct obd_import *imp;
+        unsigned int cur, worst;
+        time_t now, worstt;
+        struct dhms ts;
+        int i, rc = 0;
+
+        LASSERT(obd != NULL);
+        LPROCFS_CLIMP_CHECK(obd);
+        imp = obd->u.cli.cl_import;
+        *eof = 1;
+
+        now = cfs_time_current_sec();
+
+        /* Some network health info for kicks */
+        s2dhms(&ts, now - imp->imp_last_reply_time);
+        rc += snprintf(page + rc, count - rc,
+                       "%-10s : %ld, "DHMS_FMT" ago\n",
+                       "last reply", imp->imp_last_reply_time, DHMS_VARS(&ts));
+
+
+        cur = at_get(&imp->imp_at.iat_net_latency);
+        worst = imp->imp_at.iat_net_latency.at_worst_ever;
+        worstt = imp->imp_at.iat_net_latency.at_worst_time;
+        s2dhms(&ts, now - worstt);
+        rc += snprintf(page + rc, count - rc,
+                       "%-10s : cur %3u  worst %3u (at %ld, "DHMS_FMT" ago) ",
+                       "network", cur, worst, worstt, DHMS_VARS(&ts));
+        rc = lprocfs_at_hist_helper(page, count, rc,
+                                    &imp->imp_at.iat_net_latency);
+
+        for(i = 0; i < IMP_AT_MAX_PORTALS; i++) {
+                if (imp->imp_at.iat_portal[i] == 0)
+                        break;
+                cur = at_get(&imp->imp_at.iat_service_estimate[i]);
+                worst = imp->imp_at.iat_service_estimate[i].at_worst_ever;
+                worstt = imp->imp_at.iat_service_estimate[i].at_worst_time;
+                s2dhms(&ts, now - worstt);
+                rc += snprintf(page + rc, count - rc,
+                               "portal %-2d  : cur %3u  worst %3u (at %ld, "
+                               DHMS_FMT" ago) ", imp->imp_at.iat_portal[i],
+                               cur, worst, worstt, DHMS_VARS(&ts));
+                rc = lprocfs_at_hist_helper(page, count, rc,
+                                          &imp->imp_at.iat_service_estimate[i]);
+        }
+
+        LPROCFS_CLIMP_EXIT(obd);
+        return rc;
+}
+
 static const char *obd_connect_names[] = {
         "read_only",
         "lov_index",
@@ -1692,7 +1756,6 @@ int lprocfs_obd_rd_recovery_status(char *page, char **start, off_t off,
 
         if (lprocfs_obd_snprintf(&page, size, &len, "status: ") <= 0)
                 goto out;
-
         if (obd->obd_max_recoverable_clients == 0) {
                 if (lprocfs_obd_snprintf(&page, size, &len, "INACTIVE\n") <= 0)
                         goto out;
@@ -1704,59 +1767,55 @@ int lprocfs_obd_rd_recovery_status(char *page, char **start, off_t off,
         if (obd->obd_recovering == 0) {
                 if (lprocfs_obd_snprintf(&page, size, &len, "COMPLETE\n") <= 0)
                         goto out;
-
-                if (lprocfs_obd_snprintf(&page, size, &len, "recovery_start: %lu\n",
-                    obd->obd_recovery_start) <= 0)
+                if (lprocfs_obd_snprintf(&page, size, &len,
+                                         "recovery_start: %lu\n",
+                                         obd->obd_recovery_start) <= 0)
                         goto out;
-
-                if (lprocfs_obd_snprintf(&page, size, &len, "recovery_end: %lu\n",
-                    obd->obd_recovery_end) <= 0)
+                if (lprocfs_obd_snprintf(&page, size, &len,
+                                         "recovery_duration: %lu\n",
+                                         obd->obd_recovery_end -
+                                         obd->obd_recovery_start) <= 0)
                         goto out;
-
-                /* Number of clients have have completed recovery */
-                if (lprocfs_obd_snprintf(&page, size, &len, "recovered_clients: %d\n",
-                    obd->obd_max_recoverable_clients - obd->obd_recoverable_clients) <= 0)
+                /* Number of clients that have completed recovery */
+                if (lprocfs_obd_snprintf(&page, size, &len,
+                                         "completed_clients: %d/%d\n",
+                                         obd->obd_max_recoverable_clients -
+                                         obd->obd_recoverable_clients,
+                                         obd->obd_max_recoverable_clients) <= 0)
                         goto out;
-
-                if (lprocfs_obd_snprintf(&page, size, &len, "unrecovered_clients: %d\n",
-                    obd->obd_recoverable_clients) <= 0)
+                if (lprocfs_obd_snprintf(&page, size, &len,
+                                         "replayed_requests: %d\n",
+                                         obd->obd_replayed_requests) <= 0)
                         goto out;
-
-                if (lprocfs_obd_snprintf(&page, size, &len, "last_transno: "LPD64"\n",
-                    obd->obd_next_recovery_transno - 1) <= 0)
+                if (lprocfs_obd_snprintf(&page, size, &len,
+                                         "last_transno: "LPD64"\n",
+                                         obd->obd_next_recovery_transno - 1)<=0)
                         goto out;
-
-                lprocfs_obd_snprintf(&page, size, &len, "replayed_requests: %d\n", obd->obd_replayed_requests);
                 goto fclose;
         }
 
         if (lprocfs_obd_snprintf(&page, size, &len, "RECOVERING\n") <= 0)
                 goto out;
-
         if (lprocfs_obd_snprintf(&page, size, &len, "recovery_start: %lu\n",
-            obd->obd_recovery_start) <= 0)
+                                 obd->obd_recovery_start) <= 0)
                 goto out;
-
-        if (lprocfs_obd_snprintf(&page, size, &len, "time remaining: %lu\n",
-                                 cfs_time_current_sec() >= obd->obd_recovery_end ? 0 :
-                                 obd->obd_recovery_end - cfs_time_current_sec()) <= 0)
+        if (lprocfs_obd_snprintf(&page, size, &len, "time_remaining: %lu\n",
+                           cfs_time_current_sec() >= obd->obd_recovery_end ? 0 :
+                           obd->obd_recovery_end - cfs_time_current_sec()) <= 0)
                 goto out;
-
-        if(lprocfs_obd_snprintf(&page, size, &len, "connected_clients: %d/%d\n",
-                                obd->obd_connected_clients,
-                                obd->obd_max_recoverable_clients) <= 0)
+        if (lprocfs_obd_snprintf(&page, size, &len,"connected_clients: %d/%d\n",
+                                 obd->obd_connected_clients,
+                                 obd->obd_max_recoverable_clients) <= 0)
                 goto out;
-
-        /* Number of clients have have completed recovery */
-        if (lprocfs_obd_snprintf(&page, size, &len, "completed_clients: %d/%d\n",
-                                 obd->obd_max_recoverable_clients - obd->obd_recoverable_clients,
+        /* Number of clients that have completed recovery */
+        if (lprocfs_obd_snprintf(&page, size, &len,"completed_clients: %d/%d\n",
+                                 obd->obd_max_recoverable_clients -
+                                 obd->obd_recoverable_clients,
                                  obd->obd_max_recoverable_clients) <= 0)
                 goto out;
-
-        if (lprocfs_obd_snprintf(&page, size, &len, "replayed_requests: %d/??\n",
+        if (lprocfs_obd_snprintf(&page, size, &len,"replayed_requests: %d/??\n",
                                  obd->obd_replayed_requests) <= 0)
                 goto out;
-
         if (lprocfs_obd_snprintf(&page, size, &len, "queued_requests: %d\n",
                                  obd->obd_requests_queued_for_recovery) <= 0)
                 goto out;
@@ -1831,7 +1890,8 @@ EXPORT_SYMBOL(lprocfs_rd_server_uuid);
 EXPORT_SYMBOL(lprocfs_rd_conn_uuid);
 EXPORT_SYMBOL(lprocfs_rd_num_exports);
 EXPORT_SYMBOL(lprocfs_rd_numrefs);
-
+EXPORT_SYMBOL(lprocfs_at_hist_helper);
+EXPORT_SYMBOL(lprocfs_rd_timeouts);
 EXPORT_SYMBOL(lprocfs_rd_blksize);
 EXPORT_SYMBOL(lprocfs_rd_kbytestotal);
 EXPORT_SYMBOL(lprocfs_rd_kbytesfree);
index fbfb086..f0bdd83 100644 (file)
@@ -756,7 +756,6 @@ int class_process_config(struct lustre_cfg *lcfg)
                 CDEBUG(D_IOCTL, "changing lustre timeout from %d to %d\n",
                        obd_timeout, lcfg->lcfg_num);
                 obd_timeout = max(lcfg->lcfg_num, 1U);
-                obd_health_check_timeout = HEALTH_CHECK_TIMEOUT;
                 GOTO(out, err = 0);
         }
         case LCFG_SET_UPCALL: {
index 58dc3bf..32d9654 100644 (file)
@@ -543,7 +543,7 @@ DECLARE_MUTEX(mgc_start_lock);
 static int lustre_start_mgc(struct super_block *sb)
 {
         struct lustre_handle mgc_conn = {0, };
-        struct obd_connect_data ocd = { 0 };
+        struct obd_connect_data *data = NULL;
         struct lustre_sb_info *lsi = s2lsi(sb);
         struct obd_device *obd;
         struct obd_export *exp;
@@ -723,11 +723,14 @@ static int lustre_start_mgc(struct super_block *sb)
                 /* nonfatal */
                 CWARN("can't set %s %d\n", KEY_INIT_RECOV_BACKUP, rc);
         /* We connect to the MGS at setup, and don't disconnect until cleanup */
-
-        ocd.ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_FID;
-        ocd.ocd_version = LUSTRE_VERSION_CODE;
-
-        rc = obd_connect(NULL, &mgc_conn, obd, &(obd->obd_uuid), &ocd, NULL);
+        OBD_ALLOC_PTR(data);
+        if (data == NULL)
+                GOTO(out, rc = -ENOMEM);
+        data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_FID |
+                                  OBD_CONNECT_AT;
+        data->ocd_version = LUSTRE_VERSION_CODE;
+        rc = obd_connect(NULL, &mgc_conn, obd, &(obd->obd_uuid), data, NULL);
+        OBD_FREE_PTR(data);
         if (rc) {
                 CERROR("connect failed %d\n", rc);
                 GOTO(out, rc);
index 69dccff..f740f46 100644 (file)
@@ -1417,7 +1417,7 @@ struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
                 return ERR_PTR(-ENOENT);
 
         rc = filter_lock_dentry(obd, dparent);
-        fsfilt_check_slow(obd, now, obd_timeout, "parent lock");
+        fsfilt_check_slow(obd, now, "parent lock");
         return rc ? ERR_PTR(rc) : dparent;
 }
 
@@ -2040,18 +2040,17 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
 
         if (obd->obd_recovering) {
                 LCONSOLE_WARN("OST %s now serving %s (%s%s%s), but will be in "
-                              "recovery until %d %s reconnect, or if no clients"
-                              " reconnect for %d:%.02d; during that time new "
-                              "clients will not be allowed to connect. "
+                              "recovery for at least %d:%.02d, or until %d "
+                              "client%s reconnect. During this time new clients"
+                              " will not be allowed to connect. "
                               "Recovery progress can be monitored by watching "
                               "/proc/fs/lustre/obdfilter/%s/recovery_status.\n",
                               obd->obd_name, lustre_cfg_string(lcfg, 1),
                               label ?: "", label ? "/" : "", str,
+                              obd->obd_recovery_timeout / 60,
+                              obd->obd_recovery_timeout % 60,
                               obd->obd_max_recoverable_clients,
-                              (obd->obd_max_recoverable_clients == 1)
-                              ? "client" : "clients",
-                              (int)(OBD_RECOVERY_TIMEOUT) / 60,
-                              (int)(OBD_RECOVERY_TIMEOUT) % 60,
+                              (obd->obd_max_recoverable_clients == 1) ? "":"s",
                               obd->obd_name);
         } else {
                 LCONSOLE_INFO("OST %s now serving %s (%s%s%s) with recovery "
@@ -3553,7 +3552,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
         struct filter_obd *filter;
         struct obd_statfs *osfs;
         int err = 0, rc = 0, recreate_obj = 0, i;
-        unsigned long enough_time = jiffies + min(obd_timeout * HZ / 4, 10U*HZ);
+        cfs_time_t enough_time = cfs_time_shift(DISK_TIMEOUT/2);
         obd_id next_id;
         void *handle = NULL;
         ENTRY;
index 6f2ac82..135a3d2 100644 (file)
@@ -29,8 +29,6 @@
 #define FILTER_GRANT_CHUNK (2ULL * PTLRPC_MAX_BRW_SIZE)
 #define GRANT_FOR_LLOG(obd) 16
 
-#define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */
-
 extern struct file_operations filter_per_export_stats_fops;
 extern struct file_operations filter_per_nid_stats_fops;
 
@@ -60,6 +58,7 @@ struct filter_mod_data {
 #else
 #define FILTER_FMD_MAX_NUM_DEFAULT  32
 #endif
+/* Client cache seconds */
 #define FILTER_FMD_MAX_AGE_DEFAULT ((obd_timeout + 10) * HZ)
 
 struct filter_mod_data *filter_fmd_find(struct obd_export *exp,
index 380e4f6..1fbf7a7 100644 (file)
@@ -310,7 +310,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         inode = dentry->d_inode;
 
         obdo_to_inode(inode, oa, OBD_MD_FLATIME);
-        fsfilt_check_slow(obd, now, obd_timeout, "preprw_read setup");
+        fsfilt_check_slow(obd, now, "preprw_read setup");
 
         for (i = 0, lnb = res, rnb = nb; i < obj->ioo_bufcnt;
              i++, rnb++, lnb++) {
@@ -343,7 +343,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
         }
 
-        fsfilt_check_slow(obd, now, obd_timeout, "start_page_read");
+        fsfilt_check_slow(obd, now, "start_page_read");
 
         rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf,
                               exp, NULL, NULL, NULL);
@@ -548,7 +548,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         fso.fso_dentry = dentry;
         fso.fso_bufcnt = obj->ioo_bufcnt;
 
-        fsfilt_check_slow(exp->exp_obd, now, obd_timeout, "preprw_write setup");
+        fsfilt_check_slow(exp->exp_obd, now, "preprw_write setup");
 
         /* Don't update inode timestamps if this write is older than a
          * setattr which modifies the timestamps. b=10150 */
@@ -648,7 +648,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
                               NULL, NULL, NULL);
 
-        fsfilt_check_slow(exp->exp_obd, now, obd_timeout, "start_page_write");
+        fsfilt_check_slow(exp->exp_obd, now, "start_page_write");
 
         if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats)
                 lprocfs_counter_add(exp->exp_nid_stats->nid_stats,
index c06c60b..5e9f45b 100644 (file)
@@ -708,7 +708,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         DQUOT_INIT(inode);
 
         LOCK_INODE_MUTEX(inode);
-        fsfilt_check_slow(obd, now, obd_timeout, "i_mutex");
+        fsfilt_check_slow(obd, now, "i_mutex");
         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
                                            oti);
         if (IS_ERR(oti->oti_handle)) {
@@ -721,7 +721,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
         }
         /* have to call fsfilt_commit() from this point on */
 
-        fsfilt_check_slow(obd, now, obd_timeout, "brw_start");
+        fsfilt_check_slow(obd, now, "brw_start");
 
         i = OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
 
@@ -773,7 +773,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
 
         lquota_getflag(filter_quota_interface_ref, obd, oa);
 
-        fsfilt_check_slow(obd, now, obd_timeout, "direct_io");
+        fsfilt_check_slow(obd, now, "direct_io");
 
         err = fsfilt_commit_wait(obd, inode, wait_handle);
         if (err) {
@@ -786,7 +786,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
                          "oti_transno "LPU64" last_committed "LPU64"\n",
                          oti->oti_transno, obd->obd_last_committed);
 
-        fsfilt_check_slow(obd, now, obd_timeout, "commitrw commit");
+        fsfilt_check_slow(obd, now, "commitrw commit");
 
 cleanup:
         filter_grant_commit(exp, niocount, res);
index 8712ef6..1d7c0a4 100644 (file)
@@ -412,6 +412,7 @@ static struct lprocfs_vars lprocfs_osc_obd_vars[] = {
         { "checksums",       osc_rd_checksum, osc_wr_checksum, 0 },
         { "checksum_type",   osc_rd_checksum_type, osc_wd_checksum_type, 0 },
         { "resend_count",    osc_rd_resend_count, osc_wr_resend_count, 0},
+        { "timeouts",        lprocfs_rd_timeouts,      0, 0 },
         { 0 }
 };
 
index 99dd705..81eef48 100644 (file)
@@ -163,7 +163,8 @@ static int oscc_internal_create(struct osc_creator *oscc)
                 RETURN(-ENOMEM);
         }
 
-        request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
+        request->rq_request_portal = OST_CREATE_PORTAL;
+        ptlrpc_at_set_req_timeout(request);
         body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
 
         spin_lock(&oscc->oscc_lock);
@@ -378,8 +379,8 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         CDEBUG(D_HA,"%s: oscc recovery in progress, waiting\n",
                                oscc->oscc_obd->obd_name);
 
-                        lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(obd_timeout/4)),
-                                          NULL, NULL);
+                        lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(
+                                obd_timeout / 4)), NULL, NULL);
                         rc = l_wait_event(oscc->oscc_waitq,
                                           !oscc_recovering(oscc), &lwi);
                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
index f82849b..17c7e1d 100644 (file)
@@ -537,6 +537,7 @@ static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
                 RETURN(rc);
         }
         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
+        ptlrpc_at_set_req_timeout(req);
         osc_pack_req_body(req, oinfo);
 
         /* overload the size and blocks fields in the oa with start/end */
@@ -703,6 +704,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
 
         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
         req->rq_interpret_reply = osc_destroy_interpret;
+        ptlrpc_at_set_req_timeout(req);
 
         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
                 memcpy(obdo_logcookie(oa), oti->oti_logcookies,
@@ -1062,6 +1064,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                 RETURN(rc);
         }
         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
+        ptlrpc_at_set_req_timeout(req);
 
         if (opc == OST_WRITE)
                 desc = ptlrpc_prep_bulk_imp(req, page_count,
@@ -3326,7 +3329,9 @@ static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
                 RETURN(rc);
         }
         ptlrpc_request_set_replen(req);
-        req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
+        req->rq_request_portal = OST_CREATE_PORTAL;
+        ptlrpc_at_set_req_timeout(req);
+
         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
                 /* procfs requests not want stat in wait for avoid deadlock */
                 req->rq_no_resend = 1;
@@ -3379,7 +3384,8 @@ static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                 RETURN(rc);
         }
         ptlrpc_request_set_replen(req);
-        req->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
+        req->rq_request_portal = OST_CREATE_PORTAL;
+        ptlrpc_at_set_req_timeout(req);
 
         if (flags & OBD_STATFS_NODELAY) {
                 /* procfs requests not want stat in wait for avoid deadlock */
index e07405a..909abf8 100644 (file)
@@ -46,37 +46,4 @@ void lprocfs_ost_init_vars(struct lprocfs_static_vars *lvars)
     lvars->obd_vars     = lprocfs_ost_obd_vars;
 }
 
-void
-ost_print_req(void *seq_file, struct ptlrpc_request *req)
-{
-        /* Called holding srv_lock with irqs disabled.
-         * Print specific req contents and a newline.
-         * CAVEAT EMPTOR: check request message length before printing!!!
-         * You might have received any old crap so you must be just as
-         * careful here as the service's request parser!!! */
-        struct seq_file *sf = seq_file;
-
-        switch (req->rq_phase) {
-        case RQ_PHASE_NEW:
-                /* still awaiting a service thread's attention, or rejected
-                 * because the generic request message didn't unpack */
-                seq_printf(sf, "<not swabbed>\n");
-                break;
-                
-        case RQ_PHASE_INTERPRET:
-                /* being handled, so basic msg swabbed, and opc is valid
-                 * but racing with ost_handle() */
-                seq_printf(sf, "opc %d\n", lustre_msg_get_opc(req->rq_reqmsg));
-                break;
-                
-        case RQ_PHASE_COMPLETE:
-                /* been handled by ost_handle() reply state possibly still
-                 * volatile */
-                seq_printf(sf, "opc %d\n", lustre_msg_get_opc(req->rq_reqmsg));
-                break;
-
-        default:
-                LBUG();
-        }
-}
 #endif /* LPROCFS */
index 65b361d..45c5102 100644 (file)
@@ -769,13 +769,14 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
          * If getting the lock took more time than
          * client was willing to wait, drop it. b=11330
          */
-        if (cfs_time_current_sec() > req->rq_arrival_time.tv_sec + obd_timeout || 
+        if (cfs_time_current_sec() > req->rq_deadline ||
             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
                 no_reply = 1;
                 CERROR("Dropping timed-out read from %s because locking"
-                       "object "LPX64" took %ld seconds.\n",
+                       "object "LPX64" took %ld seconds (limit was %ld).\n",
                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
-                       cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
+                       cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
+                       req->rq_deadline - req->rq_arrival_time.tv_sec);
                 GOTO(out_lock, rc = -ETIMEDOUT);
         }
 
@@ -850,14 +851,30 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 }
 
                 if (rc == 0) {
-                        lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
-                                                   ost_bulk_timeout, desc);
-                        rc = l_wait_event(desc->bd_waitq,
-                                          !ptlrpc_bulk_active(desc) ||
-                                          exp->exp_failed, &lwi);
-                        LASSERT(rc == 0 || rc == -ETIMEDOUT);
+                        time_t start = cfs_time_current_sec();
+                        do {
+                                long timeoutl = req->rq_deadline -
+                                        cfs_time_current_sec();
+                                cfs_duration_t timeout = (timeoutl <= 0 || rc) ?
+                                        CFS_TICK : cfs_time_seconds(timeoutl);
+                                lwi = LWI_TIMEOUT_INTERVAL(timeout,
+                                                           cfs_time_seconds(1),
+                                                           ost_bulk_timeout,
+                                                           desc);
+                                rc = l_wait_event(desc->bd_waitq,
+                                                  !ptlrpc_bulk_active(desc) ||
+                                                  exp->exp_failed, &lwi);
+                                LASSERT(rc == 0 || rc == -ETIMEDOUT);
+                                /* Wait again if we changed deadline */
+                        } while ((rc == -ETIMEDOUT) &&
+                                 (req->rq_deadline > cfs_time_current_sec()));
+
                         if (rc == -ETIMEDOUT) {
-                                DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
+                                DEBUG_REQ(D_ERROR, req,
+                                          "timeout on bulk PUT after %ld%+lds",
+                                          req->rq_deadline - start,
+                                          cfs_time_current_sec() -
+                                          req->rq_deadline);
                                 ptlrpc_abort_bulk(desc);
                         } else if (exp->exp_failed) {
                                 DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
@@ -907,11 +924,8 @@ out:
                 req->rq_status = rc;
                 ptlrpc_error(req);
         } else {
-                if (req->rq_reply_state != NULL) {
-                        /* reply out callback would free */
-                        ptlrpc_rs_decref(req->rq_reply_state);
-                        req->rq_reply_state = NULL;
-                }
+                /* reply out callback would free */
+                ptlrpc_req_drop_rs(req);
                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
                       "client will retry\n",
                       exp->exp_obd->obd_name,
@@ -1023,6 +1037,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         rc = lustre_pack_reply(req, 3, size, NULL);
         if (rc != 0)
                 GOTO(out, rc);
+        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val);
         rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
                              niocount * sizeof(*rcs));
 
@@ -1056,13 +1071,14 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
          * If getting the lock took more time than
          * client was willing to wait, drop it. b=11330
          */
-        if (cfs_time_current_sec() > req->rq_arrival_time.tv_sec + obd_timeout || 
+        if (cfs_time_current_sec() > req->rq_deadline ||
             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
                 no_reply = 1;
-                CERROR("Dropping timed-out write from %s because locking"
-                       "object "LPX64" took %ld seconds.\n",
+                CERROR("Dropping timed-out write from %s because locking "
+                       "object "LPX64" took %ld seconds (limit was %ld).\n",
                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
-                       cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
+                       cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
+                       req->rq_deadline - req->rq_arrival_time.tv_sec);
                 GOTO(out_lock, rc = -ETIMEDOUT);
         }
 
@@ -1102,13 +1118,28 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         else
                 rc = ptlrpc_start_bulk_transfer (desc);
         if (rc == 0) {
-                lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 2, HZ,
-                                           ost_bulk_timeout, desc);
-                rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
-                                  desc->bd_export->exp_failed, &lwi);
-                LASSERT(rc == 0 || rc == -ETIMEDOUT);
+                time_t start = cfs_time_current_sec();
+                do {
+                        long timeoutl = req->rq_deadline -
+                                cfs_time_current_sec();
+                        cfs_duration_t timeout = (timeoutl <= 0 || rc) ?
+                                CFS_TICK : cfs_time_seconds(timeoutl);
+                        lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1),
+                                                   ost_bulk_timeout, desc);
+                        rc = l_wait_event(desc->bd_waitq,
+                                          !ptlrpc_bulk_active(desc) ||
+                                          desc->bd_export->exp_failed, &lwi);
+                        LASSERT(rc == 0 || rc == -ETIMEDOUT);
+                        /* Wait again if we changed deadline */
+                } while ((rc == -ETIMEDOUT) &&
+                         (req->rq_deadline > cfs_time_current_sec()));
+
                 if (rc == -ETIMEDOUT) {
-                        DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
+                        DEBUG_REQ(D_ERROR, req,
+                                  "timeout on bulk GET after %ld%+lds",
+                                  req->rq_deadline - start,
+                                  cfs_time_current_sec() -
+                                  req->rq_deadline);
                         ptlrpc_abort_bulk(desc);
                 } else if (desc->bd_export->exp_failed) {
                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
@@ -1234,11 +1265,8 @@ out:
                 req->rq_status = rc;
                 ptlrpc_error(req);
         } else {
-                if (req->rq_reply_state != NULL) {
-                        /* reply out callback would free */
-                        ptlrpc_rs_decref(req->rq_reply_state);
-                        req->rq_reply_state = NULL;
-                }
+                /* reply out callback would free */
+                ptlrpc_req_drop_rs(req);
                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
                       "client will retry\n",
                       exp->exp_obd->obd_name,
@@ -1890,12 +1918,11 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
         ost->ost_service =
                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
                                 OST_MAXREPSIZE, OST_REQUEST_PORTAL,
-                                OSC_REPLY_PORTAL,
-                                OST_WATCHDOG_TIMEOUT, ost_handle,
-                                LUSTRE_OSS_NAME, obd->obd_proc_entry,
-                                ost_print_req, oss_min_threads,
-                                oss_max_threads, "ll_ost",
-                                LCT_DT_THREAD);
+                                OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
+                                ost_handle, LUSTRE_OSS_NAME,
+                                obd->obd_proc_entry, target_print_req,
+                                oss_min_threads, oss_max_threads,
+                                "ll_ost", LCT_DT_THREAD);
         if (ost->ost_service == NULL) {
                 CERROR("failed to start service\n");
                 GOTO(out_lprocfs, rc = -ENOMEM);
@@ -1908,23 +1935,22 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
         if (oss_num_create_threads) {
                 if (oss_num_create_threads > OSS_MAX_CREATE_THREADS)
                         oss_num_create_threads = OSS_MAX_CREATE_THREADS;
-                if (oss_num_create_threads < OSS_DEF_CREATE_THREADS)
-                        oss_num_create_threads = OSS_DEF_CREATE_THREADS;
+                if (oss_num_create_threads < OSS_MIN_CREATE_THREADS)
+                        oss_num_create_threads = OSS_MIN_CREATE_THREADS;
                 oss_min_create_threads = oss_max_create_threads =
                         oss_num_create_threads;
         } else {
-                oss_min_create_threads = OSS_DEF_CREATE_THREADS;
+                oss_min_create_threads = OSS_MIN_CREATE_THREADS;
                 oss_max_create_threads = OSS_MAX_CREATE_THREADS;
         }
 
         ost->ost_create_service =
                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
                                 OST_MAXREPSIZE, OST_CREATE_PORTAL,
-                                OSC_REPLY_PORTAL,
-                                OST_WATCHDOG_TIMEOUT, ost_handle, "ost_create",
-                                obd->obd_proc_entry, ost_print_req,
-                                oss_min_create_threads,
-                                oss_max_create_threads,
+                                OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
+                                ost_handle, "ost_create",
+                                obd->obd_proc_entry, target_print_req,
+                                oss_min_create_threads, oss_max_create_threads,
                                 "ll_ost_creat", LCT_DT_THREAD);
         if (ost->ost_create_service == NULL) {
                 CERROR("failed to start OST create service\n");
@@ -1938,9 +1964,9 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
         ost->ost_io_service =
                 ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
                                 OST_MAXREPSIZE, OST_IO_PORTAL,
-                                OSC_REPLY_PORTAL,
-                                OST_WATCHDOG_TIMEOUT, ost_handle, "ost_io",
-                                obd->obd_proc_entry, ost_print_req,
+                                OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
+                                ost_handle, "ost_io",
+                                obd->obd_proc_entry, target_print_req,
                                 oss_min_threads, oss_max_threads,
                                 "ll_ost_io", LCT_DT_THREAD);
         if (ost->ost_io_service == NULL) {
index 64df2d2..55adc5f 100644 (file)
@@ -5,11 +5,7 @@
 #ifndef OST_INTERNAL_H
 #define OST_INTERNAL_H
 
-#ifdef LPROCFS
-extern void ost_print_req(void *seq_file, struct ptlrpc_request *req);
-#else
-# define ost_print_req NULL
-#endif
+#define OSS_SERVICE_WATCHDOG_FACTOR 2000
 
 /*
  * tunables for per-thread page pool (bug 5137)
@@ -37,7 +33,7 @@ struct ost_thread_local_cache {
 
 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r);
 
-#define OSS_DEF_CREATE_THREADS  1UL
+#define OSS_MIN_CREATE_THREADS  2UL
 #define OSS_MAX_CREATE_THREADS 16UL
 
 /* Quota stuff */
index bc9c2c2..8fa9fff 100644 (file)
@@ -193,6 +193,160 @@ void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
         EXIT;
 }
 
+/* Set server timelimit for this req */
+void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
+{
+        __u32 serv_est;
+        int idx;
+        struct imp_at *at;
+
+        LASSERT(req->rq_import);
+
+        if (AT_OFF) {
+                /* non-AT settings */
+                req->rq_timeout = req->rq_import->imp_server_timeout ?
+                        obd_timeout / 2 : obd_timeout;
+                lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
+                return;
+        }
+
+        at = &req->rq_import->imp_at;
+        idx = import_at_get_index(req->rq_import,
+                                  req->rq_request_portal);
+        serv_est = at_get(&at->iat_service_estimate[idx]);
+        /* add an arbitrary minimum: 125% +5 sec */
+        req->rq_timeout = serv_est + (serv_est >> 2) + 5;
+        /* We could get even fancier here, using history to predict increased
+           loading... */
+
+        /* Let the server know what this RPC timeout is by putting it in the
+           reqmsg*/
+        lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
+}
+
+/* Adjust max service estimate based on server value */
+static void ptlrpc_at_adj_service(struct ptlrpc_request *req)
+{
+        int idx;
+        unsigned int serv_est, oldse;
+        struct imp_at *at = &req->rq_import->imp_at;
+
+        LASSERT(req->rq_import);
+
+        /* service estimate is returned in the repmsg timeout field,
+           may be 0 on err */
+        serv_est = lustre_msg_get_timeout(req->rq_repmsg);
+
+        idx = import_at_get_index(req->rq_import, req->rq_request_portal);
+        /* max service estimates are tracked on the server side,
+           so just keep minimal history here */
+        oldse = at_add(&at->iat_service_estimate[idx], serv_est);
+        if (oldse != 0)
+                CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d "
+                       "has changed from %d to %d\n",
+                       req->rq_import->imp_obd->obd_name,req->rq_request_portal,
+                       oldse, at_get(&at->iat_service_estimate[idx]));
+}
+
+/* Expected network latency per remote node (secs) */
+int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
+{
+        return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency);
+}
+
+/* Adjust expected network latency */
+static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req)
+{
+        unsigned int st, nl, oldnl;
+        struct imp_at *at = &req->rq_import->imp_at;
+        time_t now = cfs_time_current_sec();
+
+        LASSERT(req->rq_import);
+
+        st = lustre_msg_get_service_time(req->rq_repmsg);
+
+        /* Network latency is total time less server processing time */
+        nl = max_t(int, now - req->rq_sent - st, 0) + 1/*st rounding*/;
+        if (st > now - req->rq_sent + 2 /* rounding */)
+                CERROR("Reported service time %u > total measured time %ld\n",
+                       st, now - req->rq_sent);
+
+        oldnl = at_add(&at->iat_net_latency, nl);
+        if (oldnl != 0)
+                CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) "
+                       "has changed from %d to %d\n",
+                       req->rq_import->imp_obd->obd_name,
+                       obd_uuid2str(
+                               &req->rq_import->imp_connection->c_remote_uuid),
+                       oldnl, at_get(&at->iat_net_latency));
+}
+
+static int unpack_reply(struct ptlrpc_request *req)
+{
+        int rc;
+
+        /* Clear reply swab mask; we may have already swabbed an early reply */
+        req->rq_rep_swab_mask = 0;
+
+        rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
+        if (rc) {
+                DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
+                return(-EPROTO);
+        }
+
+        rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
+        if (rc) {
+                DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
+                return(-EPROTO);
+        }
+        return 0;
+}
+
+/*
+ * Handle an early reply message, called with the rq_lock held.
+ * If anything goes wrong just ignore it - same as if it never happened
+ */
+static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) {
+        time_t          olddl;
+        int             rc;
+        ENTRY;
+
+        req->rq_early = 0;
+        spin_unlock(&req->rq_lock);
+
+        rc = sptlrpc_cli_unwrap_early_reply(req);
+        if (rc)
+                GOTO(out, rc);
+
+        rc = unpack_reply(req);
+        if (rc)
+                GOTO(out_cleanup, rc);
+
+        /* Expecting to increase the service time estimate here */
+        ptlrpc_at_adj_service(req);
+        ptlrpc_at_adj_net_latency(req);
+
+        /* Adjust the local timeout for this req */
+        ptlrpc_at_set_req_timeout(req);
+
+        olddl = req->rq_deadline;
+        /* server assumes it now has rq_timeout from when it sent the
+           early reply, so client should give it at least that long. */
+        req->rq_deadline = cfs_time_current_sec() + req->rq_timeout +
+                    ptlrpc_at_get_net_latency(req);
+
+        DEBUG_REQ(D_ADAPTTO, req,
+                  "Early reply #%d, new deadline in %lds (%+lds)",
+                  req->rq_early_count, req->rq_deadline -
+                  cfs_time_current_sec(), req->rq_deadline - olddl);
+
+out_cleanup:
+        sptlrpc_cli_finish_early_reply(req);
+out:
+        spin_lock(&req->rq_lock);
+        RETURN(rc);
+}
+
 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
 {
         struct list_head *l, *tmp;
@@ -348,11 +502,6 @@ static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
         }
 
         lustre_msg_add_version(request->rq_reqmsg, version);
-
-        if (imp->imp_server_timeout)
-                request->rq_timeout = obd_timeout / 2;
-        else
-                request->rq_timeout = obd_timeout;
         request->rq_send_state = LUSTRE_IMP_FULL;
         request->rq_type = PTL_RPC_MSG_REQUEST;
         request->rq_export = NULL;
@@ -365,12 +514,14 @@ static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
 
         request->rq_phase = RQ_PHASE_NEW;
 
-        /* XXX FIXME bug 249 */
         request->rq_request_portal = imp->imp_client->cli_request_portal;
         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
 
+        ptlrpc_at_set_req_timeout(request);
+
         spin_lock_init(&request->rq_lock);
         CFS_INIT_LIST_HEAD(&request->rq_list);
+        CFS_INIT_LIST_HEAD(&request->rq_timed_list);
         CFS_INIT_LIST_HEAD(&request->rq_replay_list);
         CFS_INIT_LIST_HEAD(&request->rq_mod_list);
         CFS_INIT_LIST_HEAD(&request->rq_ctx_chain);
@@ -381,7 +532,6 @@ static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
         atomic_set(&request->rq_refcount, 1);
 
         lustre_msg_set_opc(request->rq_reqmsg, opcode);
-        lustre_msg_set_flags(request->rq_reqmsg, 0);
 
         RETURN(0);
 out_ctx:
@@ -721,6 +871,12 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req)
 
         if (req->rq_restart)
                 GOTO(out, rc = 1);
+
+        if (req->rq_early) {
+                ptlrpc_at_recv_early_reply(req);
+                GOTO(out, rc = 0); /* keep waiting */
+        }
+
         EXIT;
  out:
         spin_unlock(&req->rq_lock);
@@ -775,11 +931,6 @@ static int after_reply(struct ptlrpc_request *req)
          * including buflens, status etc is in the sender's byte order. 
          */
 
-        /*
-         * Clear reply swab mask; this is a new reply in sender's byte order. 
-         */
-        req->rq_rep_swab_mask = 0;
-
         rc = sptlrpc_cli_unwrap_reply(req);
         if (rc) {
                 DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
@@ -792,17 +943,9 @@ static int after_reply(struct ptlrpc_request *req)
         if (req->rq_resend)
                 RETURN(0);
 
-        rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
-        if (rc) {
-                DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
-                RETURN(-EPROTO);
-        }
-
-        rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
-        if (rc) {
-                DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
-                RETURN(-EPROTO);
-        }
+        rc = unpack_reply(req);
+        if (rc)
+                RETURN(rc);
 
         do_gettimeofday(&work_start);
         timediff = cfs_timeval_sub(&work_start, &req->rq_arrival_time, NULL);
@@ -817,6 +960,10 @@ static int after_reply(struct ptlrpc_request *req)
                 RETURN(-EPROTO);
         }
 
+        OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, obd_fail_val);
+        ptlrpc_at_adj_service(req);
+        ptlrpc_at_adj_net_latency(req);
+
         rc = ptlrpc_check_status(req);
         imp->imp_connect_error = rc;
 
@@ -1114,13 +1261,27 @@ check_ctx:
                                 force_timer_recalc = 1;
                         }
 
+                        spin_lock(&req->rq_lock);
+
+                        if (req->rq_early) {
+                                ptlrpc_at_recv_early_reply(req);
+                                spin_unlock(&req->rq_lock);
+                                continue;
+                        }
+
                         /* Still waiting for a reply? */
-                        if (ptlrpc_client_receiving_reply(req))
+                        if (req->rq_receiving_reply) {
+                                spin_unlock(&req->rq_lock);
                                 continue;
+                        }
 
                         /* Did we actually receive a reply? */
-                        if (!ptlrpc_client_replied(req))
+                        if (!req->rq_replied) {
+                                spin_unlock(&req->rq_lock);
                                 continue;
+                        }
+
+                        spin_unlock(&req->rq_lock);
 
                         spin_lock(&imp->imp_lock);
                         list_del_init(&req->rq_list);
@@ -1198,16 +1359,27 @@ check_ctx:
         RETURN(set->set_remaining == 0 || force_timer_recalc);
 }
 
+/* Return 1 if we should give up, else 0 */
 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
 {
         struct obd_import *imp = req->rq_import;
         int rc = 0;
         ENTRY;
 
-        DEBUG_REQ(D_ERROR|D_NETERROR, req, "%s (sent at %lu, "CFS_DURATION_T"s ago)",
+        DEBUG_REQ(D_ERROR|D_NETERROR, req,
+                  "%s (sent at %lu, "CFS_DURATION_T"s ago)",
                   req->rq_net_err ? "network error" : "timeout",
                   (long)req->rq_sent, cfs_time_current_sec() - req->rq_sent);
 
+        if (imp) {
+                LCONSOLE_WARN("Request x"LPU64" sent from %s to NID %s %lus ago"
+                              " has timed out (limit %lus).\n", req->rq_xid,
+                              req->rq_import->imp_obd->obd_name,
+                              libcfs_nid2str(imp->imp_connection->c_peer.nid),
+                              cfs_time_current_sec() - req->rq_sent,
+                              req->rq_deadline - req->rq_sent);
+        }
+
         if (imp != NULL && obd_debug_peer_on_timeout)
                 LNetCtl(IOC_LIBCFS_DEBUG_PEER, &imp->imp_connection->c_peer);
 
@@ -1238,6 +1410,9 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req)
         if (req->rq_ctx_init || req->rq_ctx_fini ||
             req->rq_send_state != LUSTRE_IMP_FULL ||
             imp->imp_obd->obd_no_recov) {
+                DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)",
+                          ptlrpc_import_state_name(req->rq_send_state),
+                          ptlrpc_import_state_name(imp->imp_state));
                 spin_lock(&req->rq_lock);
                 req->rq_status = -ETIMEDOUT;
                 req->rq_err = 1;
@@ -1245,7 +1420,8 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req)
                 RETURN(1);
         }
         
-        /* if request can't be resend we can't wait answer after timeout */
+        /* if a request can't be resent we can't wait for an answer after
+           the timeout */
         if (req->rq_no_resend) {
                 DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
                 rc = 1;
@@ -1271,13 +1447,13 @@ int ptlrpc_expired_set(void *data)
                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
 
                 /* request in-flight? */
-                if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting &&
+                if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting &&
                        !req->rq_resend) ||
                       (req->rq_phase == RQ_PHASE_BULK)))
                         continue;
 
                 if (req->rq_timedout ||           /* already dealt with */
-                    req->rq_sent + req->rq_timeout > now) /* not expired */
+                    req->rq_deadline > now)       /* not expired */
                         continue;
 
                 /* deal with this guy */
@@ -1317,13 +1493,14 @@ void ptlrpc_interrupted_set(void *data)
         }
 }
 
+/* get the smallest timeout in the set; this does NOT set a timeout. */
 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
 {
         struct list_head      *tmp;
         time_t                 now = cfs_time_current_sec();
-        time_t                 deadline;
         int                    timeout = 0;
         struct ptlrpc_request *req;
+        int                    deadline;
         ENTRY;
 
         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
@@ -1543,14 +1720,15 @@ void ptlrpc_unregister_reply (struct ptlrpc_request *request)
         struct l_wait_info lwi;
 
         LASSERT(!in_interrupt ());             /* might sleep */
-
-        if (!ptlrpc_client_receiving_reply(request))
+        if (!ptlrpc_client_recv_or_unlink(request))
+                /* Nothing left to do */
                 return;
 
         LNetMDUnlink (request->rq_reply_md_h);
 
         /* We have to l_wait_event() whatever the result, to give liblustre
-         * a chance to run reply_in_callback() */
+         * a chance to run reply_in_callback(), and to make sure we've
+         * unlinked before returning a req to the pool */
 
         if (request->rq_set != NULL)
                 wq = &request->rq_set->set_waitq;
@@ -1560,13 +1738,16 @@ void ptlrpc_unregister_reply (struct ptlrpc_request *request)
         for (;;) {
                 /* Network access will complete in finite time but the HUGE
                  * timeout lets us CWARN for visibility of sluggish NALs */
-                lwi = LWI_TIMEOUT(cfs_time_seconds(300), NULL, NULL);
-                rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
+                lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
+                rc = l_wait_event (*wq, !ptlrpc_client_recv_or_unlink(request),
+                                   &lwi);
                 if (rc == 0)
                         return;
 
                 LASSERT (rc == -ETIMEDOUT);
-                DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
+                DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout "
+                          "rvcng=%d unlnk=%d", request->rq_receiving_reply,
+                          request->rq_must_unlink);
         }
 }
 
@@ -1587,6 +1768,7 @@ void ptlrpc_free_committed(struct obd_import *imp)
             imp->imp_generation == imp->imp_last_generation_checked) {
                 CDEBUG(D_RPCTRACE, "%s: skip recheck: last_committed "LPU64"\n",
                        imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
+                EXIT;
                 return;
         }
 
@@ -1686,6 +1868,10 @@ static int expired_request(void *data)
         if (ptlrpc_check_suspend())
                 RETURN(1);
 
+        /* deadline may have changed with an early reply */
+        if (req->rq_deadline > cfs_time_current_sec())
+                RETURN(1);
+
         RETURN(ptlrpc_expire_one_request(req));
 }
 
@@ -1757,7 +1943,8 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req)
         int brc;
         struct l_wait_info lwi;
         struct obd_import *imp = req->rq_import;
-        cfs_duration_t timeout = 0;
+        cfs_duration_t timeout = CFS_TICK;
+        long timeoutl;
         ENTRY;
 
         LASSERT(req->rq_set == NULL);
@@ -1805,6 +1992,7 @@ restart:
                 list_del_init(&req->rq_list);
 
                 if (req->rq_err) {
+                        /* rq_status was set locally */
                         rc = -EIO;
                 }
                 else if (req->rq_intr) {
@@ -1868,19 +2056,20 @@ restart:
         }
 
         rc = ptl_send_rpc(req, 0);
-        if (rc) {
+        if (rc)
                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
-                timeout = CFS_TICK;
-        } else {
-                timeout = cfs_timeout_cap(cfs_time_seconds(req->rq_timeout));
-                DEBUG_REQ(D_NET, req, 
-                          "-- sleeping for "CFS_DURATION_T" jiffies", timeout);
-        }
+
 repeat:
+        timeoutl = req->rq_deadline - cfs_time_current_sec();
+        timeout = (timeoutl <= 0 || rc) ? CFS_TICK :
+                cfs_time_seconds(timeoutl);
+        DEBUG_REQ(D_NET, req,
+                  "-- sleeping for "CFS_DURATION_T" ticks", timeout);
         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
                                req);
         rc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
-        if (rc == -ETIMEDOUT && ptlrpc_check_and_wait_suspend(req))
+        if (rc == -ETIMEDOUT && ((req->rq_deadline > cfs_time_current_sec()) ||
+                                 ptlrpc_check_and_wait_suspend(req)))
                 goto repeat;
 
 after_send:
@@ -1900,16 +2089,11 @@ after_send:
          * req->rq_receiving_reply is clear and returns. */
         ptlrpc_unregister_reply (req);
 
-        if (req->rq_err)
-                GOTO(out, rc = -EIO);
 
-        /* Resend if we need to, unless we were interrupted. */
-        if (req->rq_resend && !req->rq_intr) {
-                /* ...unless we were specifically told otherwise. */
-                if (req->rq_no_resend)
-                        GOTO(out, rc = -ETIMEDOUT);
-                spin_lock(&imp->imp_lock);
-                goto restart;
+        if (req->rq_err) {
+                DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
+                          rc, req->rq_status);
+                GOTO(out, rc = -EIO);
         }
 
         if (req->rq_intr) {
@@ -1920,6 +2104,15 @@ after_send:
                 GOTO(out, rc = -EINTR);
         }
 
+        /* Resend if we need to */
+        if (req->rq_resend) {
+                /* ...unless we were specifically told otherwise. */
+                if (req->rq_no_resend)
+                        GOTO(out, rc = -ETIMEDOUT);
+                spin_lock(&imp->imp_lock);
+                goto restart;
+        }
+
         if (req->rq_timedout) {                 /* non-recoverable timeout */
                 GOTO(out, rc = -ETIMEDOUT);
         }
@@ -2045,12 +2238,9 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         ENTRY;
 
         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
-
         /* Not handling automatic bulk replay yet (or ever?) */
         LASSERT(req->rq_bulk == NULL);
 
-        DEBUG_REQ(D_HA, req, "REPLAY");
-
         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
         memset(aa, 0, sizeof *aa);
@@ -2059,10 +2249,15 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         aa->praa_old_state = req->rq_send_state;
         req->rq_send_state = LUSTRE_IMP_REPLAY;
         req->rq_phase = RQ_PHASE_NEW;
-        aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
+        if (req->rq_repmsg)
+                aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
         req->rq_status = 0;
-
         req->rq_interpret_reply = ptlrpc_replay_interpret;
+        /* Readjust the timeout for current conditions */
+        ptlrpc_at_set_req_timeout(req);
+
+        DEBUG_REQ(D_HA, req, "REPLAY");
+
         atomic_inc(&req->rq_import->imp_replay_inflight);
         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
 
@@ -2094,6 +2289,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 spin_lock (&req->rq_lock);
                 if (req->rq_import_generation < imp->imp_generation) {
                         req->rq_err = 1;
+                        req->rq_status = -EINTR;
                         ptlrpc_wake_client_req(req);
                 }
                 spin_unlock (&req->rq_lock);
@@ -2108,6 +2304,7 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
                 spin_lock (&req->rq_lock);
                 if (req->rq_import_generation < imp->imp_generation) {
                         req->rq_err = 1;
+                        req->rq_status = -EINTR;
                         ptlrpc_wake_client_req(req);
                 }
                 spin_unlock (&req->rq_lock);
@@ -2144,3 +2341,4 @@ __u64 ptlrpc_sample_next_xid(void)
         return tmp;
 }
 EXPORT_SYMBOL(ptlrpc_sample_next_xid);
+
index d24c92d..72bbce3 100644 (file)
@@ -83,30 +83,74 @@ void reply_in_callback(lnet_event_t *ev)
         struct ptlrpc_request *req = cbid->cbid_arg;
         ENTRY;
 
-        LASSERT (ev->type == LNET_EVENT_PUT ||
-                 ev->type == LNET_EVENT_UNLINK);
-        LASSERT (ev->unlinked);
-        LASSERT (ev->md.start == req->rq_repbuf);
-        LASSERT (ev->offset == 0);
-        LASSERT (ev->mlength <= req->rq_repbuf_len);
-
         DEBUG_REQ((ev->status == 0) ? D_NET : D_ERROR, req,
                   "type %d, status %d", ev->type, ev->status);
 
+        LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK);
+        LASSERT (ev->md.start == req->rq_repbuf);
+        LASSERT (ev->mlength <= req->rq_repbuf_len);
+        /* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
+           for adaptive timeouts' early reply. */
+        LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
+
         spin_lock(&req->rq_lock);
 
-        LASSERT (req->rq_receiving_reply);
         req->rq_receiving_reply = 0;
+        req->rq_early = 0;
+
+        if (ev->status)
+                goto out_wake;
+        if (ev->type == LNET_EVENT_UNLINK) {
+                req->rq_must_unlink = 0;
+                DEBUG_REQ(D_RPCTRACE, req, "unlink");
+                goto out_wake;
+        }
 
-        if (ev->type == LNET_EVENT_PUT && ev->status == 0) {
+        if ((ev->offset == 0) &&
+            ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT))) {
+                /* Early reply */
+                DEBUG_REQ(D_ADAPTTO, req,
+                          "Early reply received: mlen=%u offset=%d replen=%d "
+                          "replied=%d unlinked=%d", ev->mlength, ev->offset,
+                          req->rq_replen, req->rq_replied, ev->unlinked);
+
+                req->rq_early_count++; /* number received, client side */
+                if (req->rq_replied) {
+                        /* If we already got the real reply, then we need to
+                         * check if lnet_finalize() unlinked the md.  In that
+                         * case, there will be no further callback of type
+                         * LNET_EVENT_UNLINK.
+                         */
+                        if (ev->unlinked)
+                                req->rq_must_unlink = 0;
+                        else
+                                DEBUG_REQ(D_RPCTRACE, req, "unlinked in reply");
+                        goto out_wake;
+                }
+                req->rq_early = 1;
+                req->rq_reply_off = ev->offset;
+                req->rq_nob_received = ev->mlength;
+                /* And we're still receiving */
+                req->rq_receiving_reply = 1;
+        } else {
+                /* Real reply */
                 req->rq_replied = 1;
+                req->rq_reply_off = ev->offset;
                 req->rq_nob_received = ev->mlength;
+                /* LNetMDUnlink can't be called under the LNET_LOCK,
+                   so we must unlink in ptlrpc_unregister_reply */
+                DEBUG_REQ(D_INFO, req,
+                          "reply in flags=%x mlen=%u offset=%d replen=%d",
+                          lustre_msg_get_flags(req->rq_reqmsg),
+                          ev->mlength, ev->offset, req->rq_replen);
         }
 
+        req->rq_import->imp_last_reply_time = cfs_time_current_sec();
+
+out_wake:
         /* NB don't unlock till after wakeup; req can disappear under us
          * since we don't have our own ref */
         ptlrpc_wake_client_req(req);
-
         spin_unlock(&req->rq_lock);
         EXIT;
 }
@@ -212,6 +256,11 @@ void request_in_callback(lnet_event_t *ev)
 #ifdef CRAY_XT3
         req->rq_uid = ev->uid;
 #endif
+        spin_lock_init(&req->rq_lock);
+        CFS_INIT_LIST_HEAD(&req->rq_timed_list);
+        atomic_set(&req->rq_refcount, 1);
+        if (ev->type == LNET_EVENT_PUT)
+                DEBUG_REQ(D_RPCTRACE, req, "incoming req");
 
         CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
 
@@ -239,7 +288,7 @@ void request_in_callback(lnet_event_t *ev)
                 rqbd->rqbd_refcount++;
         }
 
-        list_add_tail(&req->rq_list, &service->srv_request_queue);
+        list_add_tail(&req->rq_list, &service->srv_req_in_queue);
         service->srv_n_queued_reqs++;
 
         /* NB everything can disappear under us once the request
index 7766a06..88d0599 100644 (file)
@@ -509,7 +509,7 @@ int gss_cli_ctx_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
 
         switch (RPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
         case SPTLRPC_SVC_NULL:
-                vmsg = req->rq_repbuf;
+                vmsg = req->rq_repdata;
                 voff = vmsg->lm_bufcount - 1;
                 LASSERT(vmsg && vmsg->lm_bufcount >= 3);
 
@@ -519,7 +519,7 @@ int gss_cli_ctx_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
                 break;
         case SPTLRPC_SVC_AUTH:
         case SPTLRPC_SVC_INTG:
-                vmsg = req->rq_repbuf;
+                vmsg = req->rq_repdata;
                 voff = vmsg->lm_bufcount - 2;
                 LASSERT(vmsg && vmsg->lm_bufcount >= 4);
 
@@ -528,7 +528,7 @@ int gss_cli_ctx_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
                 LASSERT(rmsg && rmsg->lm_bufcount >= 4);
                 break;
         case SPTLRPC_SVC_PRIV:
-                vmsg = req->rq_repbuf;
+                vmsg = req->rq_repdata;
                 voff = vmsg->lm_bufcount - 1;
                 LASSERT(vmsg && vmsg->lm_bufcount >= 2);
 
index ccc000f..b37502a 100644 (file)
@@ -311,7 +311,8 @@ int gss_do_ctx_init_rpc(__user char *buffer, unsigned long count)
                 goto out_copy;
         }
 
-        lsize = ctx_init_parse_reply(req->rq_repbuf,
+        LASSERT(req->rq_repdata);
+        lsize = ctx_init_parse_reply(req->rq_repdata,
                                      param.reply_buf, param.reply_buf_size);
         if (lsize < 0) {
                 param.status = (int) lsize;
index f2014e5..23a684e 100644 (file)
@@ -65,8 +65,12 @@ static int sec_install_rctx_kr(struct ptlrpc_sec *sec,
 
 /*
  * the timeout is only for the case that upcall child process die abnormally.
- * in any other cases it should finally update kernel key. so we set this
- * timeout value excessive long.
+ * in any other cases it should finally update kernel key.
+ * 
+ * FIXME we'd better to incorporate the client & server side upcall timeouts
+ * into the framework of Adaptive Timeouts, but we need to figure out how to
+ * make sure that kernel knows the upcall processes is in-progress or died
+ * unexpectedly.
  */
 #define KEYRING_UPCALL_TIMEOUT  (obd_timeout + obd_timeout)
 
@@ -833,7 +837,7 @@ void flush_user_ctx_cache_kr(struct ptlrpc_sec *sec,
         for (;;) {
                 key = request_key(&gss_key_type, desc, NULL);
                 if (IS_ERR(key)) {
-                        CWARN("No more key found for current user\n");
+                        CDEBUG(D_SEC, "No more key found for current user\n");
                         break;
                 }
 
index f76c22d..e1b1d7f 100644 (file)
@@ -1330,7 +1330,7 @@ cache_check:
         grctx->src_init = 1;
         grctx->src_reserve_len = size_round4(rsip->out_token.len);
 
-        rc = lustre_pack_reply_v2(req, 1, &replen, NULL);
+        rc = lustre_pack_reply_v2(req, 1, &replen, NULL, 0);
         if (rc) {
                 CERROR("failed to pack reply: %d\n", rc);
                 GOTO(out, rc = SECSVC_DROP);
index f33cddb..d4aef70 100644 (file)
@@ -65,6 +65,7 @@
 #include <obd.h>
 #include <obd_class.h>
 #include <obd_support.h>
+#include <obd_cksum.h>
 #include <lustre/lustre_idl.h>
 #include <lustre_net.h>
 #include <lustre_import.h>
 
 #include <linux/crypto.h>
 
+/*
+ * early reply have fixed size, respectively in privacy and integrity mode.
+ * so we calculate them only once.
+ */
+static int gss_at_reply_off_integ;
+static int gss_at_reply_off_priv;
+
 
 static inline int msg_last_segidx(struct lustre_msg *msg)
 {
@@ -144,21 +152,23 @@ netobj_t *gss_swab_netobj(struct lustre_msg *msg, int segment)
 /*
  * payload should be obtained from mechanism. but currently since we
  * only support kerberos, we could simply use fixed value.
- * krb5 header:         16
- * krb5 checksum:       20
+ * krb5 "meta" data:
+ *  - krb5 header:      16
+ *  - krb5 checksum:    20
+ *
+ * for privacy mode, payload also include the cipher text which has the same
+ * size as plain text, plus possible confounder, padding both at maximum cipher
+ * block size.
  */
 #define GSS_KRB5_INTEG_MAX_PAYLOAD      (40)
 
 static inline
-int gss_estimate_payload(struct gss_ctx *mechctx, int msgsize, int privacy)
+int gss_mech_payload(struct gss_ctx *mechctx, int msgsize, int privacy)
 {
-        if (privacy) {
-                /* we suppose max cipher block size is 16 bytes. here we
-                 * add 16 for confounder and 16 for padding. */
-                return GSS_KRB5_INTEG_MAX_PAYLOAD + msgsize + 16 + 16 + 16;
-        } else {
+        if (privacy)
+                return GSS_KRB5_INTEG_MAX_PAYLOAD + 16 + 16 + 16 + msgsize;
+        else
                 return GSS_KRB5_INTEG_MAX_PAYLOAD;
-        }
 }
 
 /*
@@ -575,11 +585,10 @@ exit:
  * cred APIs                           *
  ***************************************/
 
-static inline
-int gss_cli_payload(struct ptlrpc_cli_ctx *ctx,
-                    int msgsize, int privacy)
+static inline int gss_cli_payload(struct ptlrpc_cli_ctx *ctx,
+                                  int msgsize, int privacy)
 {
-        return gss_estimate_payload(NULL, msgsize, privacy);
+        return gss_mech_payload(NULL, msgsize, privacy);
 }
 
 int gss_cli_ctx_match(struct ptlrpc_cli_ctx *ctx, struct vfs_cred *vcred)
@@ -731,20 +740,23 @@ int gss_cli_ctx_verify(struct ptlrpc_cli_ctx *ctx,
 {
         struct gss_cli_ctx     *gctx;
         struct gss_header      *ghdr, *reqhdr;
-        struct lustre_msg      *msg = req->rq_repbuf;
+        struct lustre_msg      *msg = req->rq_repdata;
         __u32                   major;
-        int                     rc = 0;
+        int                     pack_bulk, early = 0, rc = 0;
         ENTRY;
 
         LASSERT(req->rq_cli_ctx == ctx);
         LASSERT(msg);
 
-        req->rq_repdata_len = req->rq_nob_received;
         gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
 
+        if ((char *) msg < req->rq_repbuf ||
+            (char *) msg >= req->rq_repbuf + req->rq_repbuf_len)
+                early = 1;
+
         /* special case for context negotiation, rq_repmsg/rq_replen actually
-         * are not used currently. */
-        if (req->rq_ctx_init) {
+         * are not used currently. but early reply always be treated normally */
+        if (req->rq_ctx_init && !early) {
                 req->rq_repmsg = lustre_msg_buf(msg, 1, 0);
                 req->rq_replen = msg->lm_buflens[1];
                 RETURN(0);
@@ -773,8 +785,9 @@ int gss_cli_ctx_verify(struct ptlrpc_cli_ctx *ctx,
 
         switch (ghdr->gh_proc) {
         case PTLRPC_GSS_PROC_DATA:
-                if (!equi(req->rq_pack_bulk == 1,
-                          ghdr->gh_flags & LUSTRE_GSS_PACK_BULK)) {
+                pack_bulk = ghdr->gh_flags & LUSTRE_GSS_PACK_BULK;
+
+                if (!early && !equi(req->rq_pack_bulk == 1, pack_bulk)) {
                         CERROR("%s bulk flag in reply\n",
                                req->rq_pack_bulk ? "missing" : "unexpected");
                         RETURN(-EPROTO);
@@ -799,11 +812,20 @@ int gss_cli_ctx_verify(struct ptlrpc_cli_ctx *ctx,
                 if (major != GSS_S_COMPLETE)
                         RETURN(-EPERM);
 
-                req->rq_repmsg = lustre_msg_buf(msg, 1, 0);
-                req->rq_replen = msg->lm_buflens[1];
+                if (early && reqhdr->gh_svc == SPTLRPC_SVC_NULL) {
+                        __u32 cksum;
 
-                if (req->rq_pack_bulk) {
-                        /* FIXME */
+                        cksum = crc32_le(!(__u32) 0,
+                                         lustre_msg_buf(msg, 1, 0),
+                                         lustre_msg_buflen(msg, 1));
+                        if (cksum != msg->lm_cksum) {
+                                CWARN("early reply checksum mismatch: "
+                                      "%08x != %08x\n", cksum, msg->lm_cksum);
+                                RETURN(-EPROTO);
+                        }
+                }
+
+                if (pack_bulk) {
                         /* bulk checksum is right after the lustre msg */
                         if (msg->lm_bufcount < 3) {
                                 CERROR("Invalid reply bufcount %u\n",
@@ -812,10 +834,22 @@ int gss_cli_ctx_verify(struct ptlrpc_cli_ctx *ctx,
                         }
 
                         rc = bulk_sec_desc_unpack(msg, 2);
+                        if (rc) {
+                                CERROR("unpack bulk desc: %d\n", rc);
+                                RETURN(rc);
+                        }
                 }
+
+                req->rq_repmsg = lustre_msg_buf(msg, 1, 0);
+                req->rq_replen = msg->lm_buflens[1];
                 break;
         case PTLRPC_GSS_PROC_ERR:
-                rc = gss_cli_ctx_handle_err_notify(ctx, req, ghdr);
+                if (early) {
+                        CERROR("server return error with early reply\n");
+                        rc = -EPROTO;
+                } else {
+                        rc = gss_cli_ctx_handle_err_notify(ctx, req, ghdr);
+                }
                 break;
         default:
                 CERROR("unknown gss proc %d\n", ghdr->gh_proc);
@@ -947,16 +981,22 @@ int gss_cli_ctx_unseal(struct ptlrpc_cli_ctx *ctx,
 {
         struct gss_cli_ctx      *gctx;
         struct gss_header       *ghdr;
-        int                      msglen, rc;
+        struct lustre_msg       *msg = req->rq_repdata;
+        int                      msglen, pack_bulk, early = 0, rc;
         __u32                    major;
         ENTRY;
 
-        LASSERT(req->rq_repbuf);
         LASSERT(req->rq_cli_ctx == ctx);
+        LASSERT(req->rq_ctx_init == 0);
+        LASSERT(msg);
 
         gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
 
-        ghdr = gss_swab_header(req->rq_repbuf, 0);
+        if ((char *) msg < req->rq_repbuf ||
+            (char *) msg >= req->rq_repbuf + req->rq_repbuf_len)
+                early = 1;
+
+        ghdr = gss_swab_header(msg, 0);
         if (ghdr == NULL) {
                 CERROR("can't decode gss header\n");
                 RETURN(-EPROTO);
@@ -971,49 +1011,52 @@ int gss_cli_ctx_unseal(struct ptlrpc_cli_ctx *ctx,
 
         switch (ghdr->gh_proc) {
         case PTLRPC_GSS_PROC_DATA:
-                if (!equi(req->rq_pack_bulk == 1,
-                          ghdr->gh_flags & LUSTRE_GSS_PACK_BULK)) {
+                pack_bulk = ghdr->gh_flags & LUSTRE_GSS_PACK_BULK;
+
+                if (!early && !equi(req->rq_pack_bulk == 1, pack_bulk)) {
                         CERROR("%s bulk flag in reply\n",
                                req->rq_pack_bulk ? "missing" : "unexpected");
                         RETURN(-EPROTO);
                 }
 
-                if (lustre_msg_swabbed(req->rq_repbuf))
+                if (lustre_msg_swabbed(msg))
                         gss_header_swabber(ghdr);
 
-                major = gss_unseal_msg(gctx->gc_mechctx, req->rq_repbuf,
-                                       &msglen, req->rq_repbuf_len);
+                /* use rq_repdata_len as buffer size, which assume unseal
+                 * doesn't need extra memory space. for precise control, we'd
+                 * better calculate out actual buffer size as
+                 * (repbuf_len - offset - repdata_len) */
+                major = gss_unseal_msg(gctx->gc_mechctx, msg,
+                                       &msglen, req->rq_repdata_len);
                 if (major != GSS_S_COMPLETE) {
                         rc = -EPERM;
                         break;
                 }
 
-                if (lustre_unpack_msg(req->rq_repbuf, msglen)) {
+                if (lustre_unpack_msg(msg, msglen)) {
                         CERROR("Failed to unpack after decryption\n");
                         RETURN(-EPROTO);
                 }
-                req->rq_repdata_len = msglen;
 
-                if (req->rq_repbuf->lm_bufcount < 1) {
+                if (msg->lm_bufcount < 1) {
                         CERROR("Invalid reply buffer: empty\n");
                         RETURN(-EPROTO);
                 }
 
-                if (req->rq_pack_bulk) {
-                        if (req->rq_repbuf->lm_bufcount < 2) {
-                                CERROR("Too few request buffer segments %d\n",
-                                       req->rq_repbuf->lm_bufcount);
+                if (pack_bulk) {
+                        if (msg->lm_bufcount < 2) {
+                                CERROR("bufcount %u: missing bulk sec desc\n",
+                                       msg->lm_bufcount);
                                 RETURN(-EPROTO);
                         }
 
                         /* bulk checksum is the last segment */
-                        if (bulk_sec_desc_unpack(req->rq_repbuf,
-                                                 req->rq_repbuf->lm_bufcount-1))
+                        if (bulk_sec_desc_unpack(msg, msg->lm_bufcount-1))
                                 RETURN(-EPROTO);
                 }
 
-                req->rq_repmsg = lustre_msg_buf(req->rq_repbuf, 0, 0);
-                req->rq_replen = req->rq_repbuf->lm_buflens[0];
+                req->rq_repmsg = lustre_msg_buf(msg, 0, 0);
+                req->rq_replen = msg->lm_buflens[0];
 
                 rc = 0;
                 break;
@@ -1438,8 +1481,9 @@ int gss_alloc_repbuf_intg(struct ptlrpc_sec *sec,
                           struct ptlrpc_request *req,
                           int svc, int msgsize)
 {
-        int                       txtsize;
-        int                       buflens[4], bufcnt = 2;
+        int             txtsize;
+        int             buflens[4], bufcnt = 2;
+        int             alloc_size;
 
         /*
          * on-wire data layout:
@@ -1476,7 +1520,12 @@ int gss_alloc_repbuf_intg(struct ptlrpc_sec *sec,
         else if (svc != SPTLRPC_SVC_NULL)
                 buflens[bufcnt++] = gss_cli_payload(req->rq_cli_ctx, txtsize,0);
 
-        return do_alloc_repbuf(req, lustre_msg_size_v2(bufcnt, buflens));
+        alloc_size = lustre_msg_size_v2(bufcnt, buflens);
+
+        /* add space for early reply */
+        alloc_size += gss_at_reply_off_integ;
+
+        return do_alloc_repbuf(req, alloc_size);
 }
 
 static
@@ -1484,8 +1533,9 @@ int gss_alloc_repbuf_priv(struct ptlrpc_sec *sec,
                           struct ptlrpc_request *req,
                           int msgsize)
 {
-        int                       txtsize;
-        int                       buflens[3], bufcnt;
+        int             txtsize;
+        int             buflens[3], bufcnt;
+        int             alloc_size;
 
         /* Inner (clear) buffers
          *  - lustre message
@@ -1514,7 +1564,12 @@ int gss_alloc_repbuf_priv(struct ptlrpc_sec *sec,
         buflens[1] = gss_cli_payload(req->rq_cli_ctx, buflens[0], 0);
         buflens[2] = gss_cli_payload(req->rq_cli_ctx, txtsize, 1);
 
-        return do_alloc_repbuf(req, lustre_msg_size_v2(bufcnt, buflens));
+        alloc_size = lustre_msg_size_v2(bufcnt, buflens);
+
+        /* add space for early reply */
+        alloc_size += gss_at_reply_off_priv;
+
+        return do_alloc_repbuf(req, alloc_size);
 }
 
 int gss_alloc_repbuf(struct ptlrpc_sec *sec,
@@ -1853,6 +1908,17 @@ int gss_svc_sign(struct ptlrpc_request *req,
                 RETURN(rc);
 
         rs->rs_repdata_len = rc;
+
+        if (likely(req->rq_packed_final)) {
+                req->rq_reply_off = gss_at_reply_off_integ;
+        } else {
+                if (svc == SPTLRPC_SVC_NULL)
+                        rs->rs_repbuf->lm_cksum = crc32_le(!(__u32) 0,
+                                        lustre_msg_buf(rs->rs_repbuf, 1, 0),
+                                        lustre_msg_buflen(rs->rs_repbuf, 1));
+                req->rq_reply_off = 0;
+        }
+
         RETURN(0);
 }
 
@@ -1871,7 +1937,7 @@ int gss_pack_err_notify(struct ptlrpc_request *req, __u32 major, __u32 minor)
         grctx->src_err_notify = 1;
         grctx->src_reserve_len = 0;
 
-        rc = lustre_pack_reply_v2(req, 1, &replen, NULL);
+        rc = lustre_pack_reply_v2(req, 1, &replen, NULL, 0);
         if (rc) {
                 CERROR("could not pack reply, err %d\n", rc);
                 RETURN(rc);
@@ -2366,19 +2432,23 @@ void gss_svc_invalidate_ctx(struct ptlrpc_svc_ctx *svc_ctx)
 }
 
 static inline
-int gss_svc_payload(struct gss_svc_reqctx *grctx, int msgsize, int privacy)
+int gss_svc_payload(struct gss_svc_reqctx *grctx, int early,
+                    int msgsize, int privacy)
 {
-        if (gss_svc_reqctx_is_special(grctx))
+        /* we should treat early reply normally, but which is actually sharing
+         * the same ctx with original request, so in this case we should
+         * ignore the special ctx's special flags */
+        if (early == 0 && gss_svc_reqctx_is_special(grctx))
                 return grctx->src_reserve_len;
 
-        return gss_estimate_payload(NULL, msgsize, privacy);
+        return gss_mech_payload(NULL, msgsize, privacy);
 }
 
 int gss_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
 {
         struct gss_svc_reqctx       *grctx;
         struct ptlrpc_reply_state   *rs;
-        int                          privacy, svc, bsd_off = 0;
+        int                          early, privacy, svc, bsd_off = 0;
         int                          ibuflens[2], ibufcnt = 0;
         int                          buflens[4], bufcnt;
         int                          txtsize, wmsg_size, rs_size;
@@ -2392,9 +2462,10 @@ int gss_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
         }
 
         svc = RPC_FLVR_SVC(req->rq_flvr.sf_rpc);
+        early = (req->rq_packed_final == 0);
 
         grctx = gss_svc_ctx2reqctx(req->rq_svc_ctx);
-        if (gss_svc_reqctx_is_special(grctx))
+        if (!early && gss_svc_reqctx_is_special(grctx))
                 privacy = 0;
         else
                 privacy = (svc == SPTLRPC_SVC_PRIV);
@@ -2419,8 +2490,8 @@ int gss_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
                 /* wrapper buffer */
                 bufcnt = 3;
                 buflens[0] = PTLRPC_GSS_HEADER_SIZE;
-                buflens[1] = gss_svc_payload(grctx, buflens[0], 0);
-                buflens[2] = gss_svc_payload(grctx, txtsize, 1);
+                buflens[1] = gss_svc_payload(grctx, early, buflens[0], 0);
+                buflens[2] = gss_svc_payload(grctx, early, txtsize, 1);
         } else {
                 bufcnt = 2;
                 buflens[0] = PTLRPC_GSS_HEADER_SIZE;
@@ -2442,9 +2513,10 @@ int gss_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
                         bufcnt++;
                 }
 
-                if (gss_svc_reqctx_is_special(grctx) ||
+                if ((!early && gss_svc_reqctx_is_special(grctx)) ||
                     svc != SPTLRPC_SVC_NULL)
-                        buflens[bufcnt++] = gss_svc_payload(grctx, txtsize, 0);
+                        buflens[bufcnt++] = gss_svc_payload(grctx, early,
+                                                            txtsize, 0);
         }
 
         wmsg_size = lustre_msg_size_v2(bufcnt, buflens);
@@ -2518,7 +2590,7 @@ int gss_svc_seal(struct ptlrpc_request *req,
         msgobj.data = (__u8 *) rs->rs_repbuf;
 
         /* allocate temporary cipher buffer */
-        cipher_buflen = gss_estimate_payload(gctx->gsc_mechctx, msglen, 1);
+        cipher_buflen = gss_mech_payload(gctx->gsc_mechctx, msglen, 1);
         OBD_ALLOC(cipher_buf, cipher_buflen);
         if (!cipher_buf)
                 RETURN(-ENOMEM);
@@ -2536,12 +2608,14 @@ int gss_svc_seal(struct ptlrpc_request *req,
 
         /* we are about to override data at rs->rs_repbuf, nullify pointers
          * to which to catch further illegal usage. */
-        grctx->src_repbsd = NULL;
-        grctx->src_repbsd_size = 0;
+        if (req->rq_pack_bulk) {
+                grctx->src_repbsd = NULL;
+                grctx->src_repbsd_size = 0;
+        }
 
         /* now the real wire data */
         buflens[0] = PTLRPC_GSS_HEADER_SIZE;
-        buflens[1] = gss_estimate_payload(gctx->gsc_mechctx, buflens[0], 0);
+        buflens[1] = gss_mech_payload(gctx->gsc_mechctx, buflens[0], 0);
         buflens[2] = cipher_obj.len;
 
         LASSERT(lustre_msg_size_v2(3, buflens) <= rs->rs_repbuf_len);
@@ -2579,6 +2653,12 @@ int gss_svc_seal(struct ptlrpc_request *req,
         rs->rs_repdata_len = lustre_shrink_msg(rs->rs_repbuf, 2,
                                                cipher_obj.len, 0);
 
+        /* reply offset */
+        if (likely(req->rq_packed_final))
+                req->rq_reply_off = gss_at_reply_off_priv;
+        else
+                req->rq_reply_off = 0;
+
         /* to catch upper layer's further access */
         rs->rs_msg = NULL;
         req->rq_repmsg = NULL;
@@ -2594,15 +2674,22 @@ int gss_svc_authorize(struct ptlrpc_request *req)
 {
         struct ptlrpc_reply_state *rs = req->rq_reply_state;
         struct gss_svc_reqctx     *grctx = gss_svc_ctx2reqctx(req->rq_svc_ctx);
-        struct gss_wire_ctx       *gw;
-        int                        rc;
+        struct gss_wire_ctx       *gw = &grctx->src_wirectx;
+        int                        early, rc;
         ENTRY;
 
-        if (gss_svc_reqctx_is_special(grctx))
+        early = (req->rq_packed_final == 0);
+
+        if (!early && gss_svc_reqctx_is_special(grctx)) {
+                LASSERT(rs->rs_repdata_len != 0);
+
+                req->rq_reply_off = gss_at_reply_off_integ;
                 RETURN(0);
+        }
 
-        gw = &grctx->src_wirectx;
-        if (gw->gw_proc != PTLRPC_GSS_PROC_DATA &&
+        /* early reply could happen in many cases */
+        if (!early &&
+            gw->gw_proc != PTLRPC_GSS_PROC_DATA &&
             gw->gw_proc != PTLRPC_GSS_PROC_DESTROY) {
                 CERROR("proc %d not support\n", gw->gw_proc);
                 RETURN(-EINVAL);
@@ -2636,10 +2723,6 @@ void gss_svc_free_rs(struct ptlrpc_reply_state *rs)
         LASSERT(rs->rs_svc_ctx);
         grctx = container_of(rs->rs_svc_ctx, struct gss_svc_reqctx, src_base);
 
-        /* paranoid, maybe not necessary */
-        grctx->src_reqbsd = NULL;
-        grctx->src_repbsd = NULL;
-
         gss_svc_reqctx_decref(grctx);
         rs->rs_svc_ctx = NULL;
 
@@ -2706,6 +2789,23 @@ err_out:
         return -ENOMEM;
 }
 
+static void gss_init_at_reply_offset(void)
+{
+        int buflens[3], clearsize;
+
+        buflens[0] = PTLRPC_GSS_HEADER_SIZE;
+        buflens[1] = lustre_msg_early_size();
+        buflens[2] = gss_cli_payload(NULL, buflens[1], 0);
+        gss_at_reply_off_integ = lustre_msg_size_v2(3, buflens);
+
+        buflens[0] = lustre_msg_early_size();
+        clearsize = lustre_msg_size_v2(1, buflens);
+        buflens[0] = PTLRPC_GSS_HEADER_SIZE;
+        buflens[1] = gss_cli_payload(NULL, clearsize, 0);
+        buflens[2] = gss_cli_payload(NULL, clearsize, 1);
+        gss_at_reply_off_priv = lustre_msg_size_v2(3, buflens);
+}
+
 int __init sptlrpc_gss_init(void)
 {
         int rc;
@@ -2739,6 +2839,8 @@ int __init sptlrpc_gss_init(void)
                 goto out_keyring;
 #endif
 
+        gss_init_at_reply_offset();
+
         return 0;
 
 #ifdef HAVE_GSS_PIPEFS
index 345b2b6..7214a05 100644 (file)
@@ -199,6 +199,8 @@ void ptlrpc_deactivate_import(struct obd_import *imp)
  */
 void ptlrpc_invalidate_import(struct obd_import *imp)
 {
+        struct list_head *tmp, *n;
+        struct ptlrpc_request *req;
         struct l_wait_info lwi;
         int rc;
 
@@ -216,19 +218,19 @@ void ptlrpc_invalidate_import(struct obd_import *imp)
 
         LASSERT(imp->imp_invalid);
 
-        /* wait for all requests to error out and call completion callbacks */
-        lwi = LWI_TIMEOUT_INTERVAL(cfs_timeout_cap(cfs_time_seconds(obd_timeout)),
-                                   HZ, NULL, NULL);
+        /* wait for all requests to error out and call completion callbacks.
+           Cap it at obd_timeout -- these should all have been locally
+           cancelled by ptlrpc_abort_inflight. */
+        lwi = LWI_TIMEOUT_INTERVAL(
+                cfs_timeout_cap(cfs_time_seconds(obd_timeout)),
+                cfs_time_seconds(1), NULL, NULL);
         rc = l_wait_event(imp->imp_recovery_waitq,
                           (atomic_read(&imp->imp_inflight) == 0), &lwi);
 
         if (rc) {
-                struct list_head *tmp, *n;
-                struct ptlrpc_request *req;
-
                 CERROR("%s: rc = %d waiting for callback (%d != 0)\n",
-                       obd2cli_tgt(imp->imp_obd), rc,
-                       atomic_read(&imp->imp_inflight));
+                         obd2cli_tgt(imp->imp_obd), rc,
+                         atomic_read(&imp->imp_inflight));
                 spin_lock(&imp->imp_lock);
                 list_for_each_safe(tmp, n, &imp->imp_sending_list) {
                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
@@ -325,6 +327,7 @@ static int import_select_connection(struct obd_import *imp)
 {
         struct obd_import_conn *imp_conn = NULL, *conn;
         struct obd_export *dlmexp;
+        int tried_all = 1;
         ENTRY;
 
         spin_lock(&imp->imp_lock);
@@ -341,36 +344,60 @@ static int import_select_connection(struct obd_import *imp)
                        imp->imp_obd->obd_name,
                        libcfs_nid2str(conn->oic_conn->c_peer.nid),
                        conn->oic_last_attempt);
-                /* Throttle the reconnect rate to once per RECONNECT_INTERVAL */
-                if (cfs_time_before_64(conn->oic_last_attempt + 
-                                       RECONNECT_INTERVAL * HZ,
-                                       cfs_time_current_64())) {
-                        /* If we have never tried this connection since the
-                           the last successful attempt, go with this one */
-                        if (cfs_time_beforeq_64(conn->oic_last_attempt,
-                                               imp->imp_last_success_conn)) {
-                                imp_conn = conn;
-                                break;
-                        }
+                /* Don't thrash connections */
+                if (cfs_time_before_64(cfs_time_current_64(),
+                                     conn->oic_last_attempt +
+                                     cfs_time_seconds(CONNECTION_SWITCH_MIN))) {
+                        continue;
+                }
 
-                        /* Both of these connections have already been tried
-                           since the last successful connection; just choose the
-                           least recently used */
-                        if (!imp_conn)
-                                imp_conn = conn;
-                        else if (cfs_time_before_64(conn->oic_last_attempt,
-                                                    imp_conn->oic_last_attempt))
-                                imp_conn = conn;
+                /* If we have not tried this connection since the
+                   the last successful attempt, go with this one */
+                if ((conn->oic_last_attempt == 0) ||
+                    cfs_time_beforeq_64(conn->oic_last_attempt,
+                                       imp->imp_last_success_conn)) {
+                        imp_conn = conn;
+                        tried_all = 0;
+                        break;
                 }
+
+                /* If all of the connections have already been tried
+                   since the last successful connection; just choose the
+                   least recently used */
+                if (!imp_conn)
+                        imp_conn = conn;
+                else if (cfs_time_before_64(conn->oic_last_attempt,
+                                            imp_conn->oic_last_attempt))
+                        imp_conn = conn;
         }
 
         /* if not found, simply choose the current one */
         if (!imp_conn) {
                 LASSERT(imp->imp_conn_current);
                 imp_conn = imp->imp_conn_current;
+                tried_all = 0;
         }
         LASSERT(imp_conn->oic_conn);
 
+        /* If we've tried everything, and we're back to the beginning of the
+           list, increase our timeout and try again. It will be reset when
+           we do finally connect. (FIXME: really we should wait for all network
+           state associated with the last connection attempt to drain before
+           trying to reconnect on it.) */
+        if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item) &&
+            !imp->imp_recon_bk /* not retrying */) {
+                if (at_get(&imp->imp_at.iat_net_latency) <
+                    CONNECTION_SWITCH_MAX) {
+                        at_add(&imp->imp_at.iat_net_latency,
+                               at_get(&imp->imp_at.iat_net_latency) +
+                               CONNECTION_SWITCH_INC);
+                }
+                LASSERT(imp_conn->oic_last_attempt);
+                CWARN("%s: tried all connections, increasing latency to %ds\n",
+                      imp->imp_obd->obd_name,
+                      at_get(&imp->imp_at.iat_net_latency));
+        }
+
         imp_conn->oic_last_attempt = cfs_time_current_64();
 
         /* switch connection, don't mind if it's same as the current one */
@@ -509,6 +536,8 @@ int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
         /* Reset connect flags to the originally requested flags, in case
          * the server is updated on-the-fly we will get the new features. */
         imp->imp_connect_data.ocd_connect_flags = imp->imp_connect_flags_orig;
+        imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
+
         rc = obd_reconnect(NULL, imp->imp_obd->obd_self_export, obd,
                            &obd->obd_uuid, &imp->imp_connect_data);
         if (rc)
@@ -548,15 +577,12 @@ int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
                 spin_lock(&imp->imp_lock);
                 imp->imp_replayable = 1;
                 spin_unlock(&imp->imp_lock);
-                /* On an initial connect, we don't know which one of a
-                   failover server pair is up.  Don't wait long. */
-#ifdef CRAY_XT3
-                request->rq_timeout = max((int)(obd_timeout / 2), 5);
-#else
-                request->rq_timeout = max((int)(obd_timeout / 20), 5);
-#endif
                 lustre_msg_add_op_flags(request->rq_reqmsg, 
                                         MSG_CONNECT_INITIAL);
+                if (AT_OFF)
+                        /* AT will use INITIAL_CONNECT_TIMEOUT the first
+                           time, adaptive after that. */
+                        request->rq_timeout = INITIAL_CONNECT_TIMEOUT;
         }
 
         if (set_transno)
@@ -710,6 +736,8 @@ static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
                 }
 
                 if (imp->imp_invalid) {
+                        CDEBUG(D_HA, "%s: reconnected but import is invalid; "
+                               "marking evicted\n", imp->imp_obd->obd_name);
                         IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
                 } else if (MSG_CONNECT_RECOVERING & msg_flags) {
                         CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
@@ -731,6 +759,8 @@ static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
                 imp->imp_last_replay_transno = 0;
                 IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY);
         } else {
+                DEBUG_REQ(D_HA, request, "%s: evicting (reconnect/recover flags"
+                          " not set: %x)", imp->imp_obd->obd_name, msg_flags);
                 imp->imp_remote_handle =
                                 *lustre_msg_get_handle(request->rq_repmsg);
                 IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
@@ -885,6 +915,20 @@ finish:
                 imp->imp_obd->obd_namespace->ns_orig_connect_flags = 
                                                         ocd->ocd_connect_flags;
 
+                if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
+                    (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
+                        /* We need a per-message support flag, because
+                           a. we don't know if the incoming connect reply
+                              supports AT or not (in reply_in_callback)
+                              until we unpack it.
+                           b. failovered server means export and flags are gone
+                              (in ptlrpc_send_reply).
+                           Can only be set when we know AT is supported at
+                           both ends */
+                        imp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
+                else
+                        imp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
+
                 LASSERT((cli->cl_max_pages_per_rpc <= PTLRPC_MAX_BRW_PAGES) &&
                         (cli->cl_max_pages_per_rpc > 0));
         }
@@ -1150,12 +1194,19 @@ int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
         if (ptlrpc_import_in_recovery(imp)) {
                 struct l_wait_info lwi;
                 cfs_duration_t timeout;
-                if (imp->imp_server_timeout)
-                        timeout = cfs_time_seconds(obd_timeout / 2);
-                else
-                        timeout = cfs_time_seconds(obd_timeout);
-                
-                timeout = MAX(timeout * HZ, 1);
+
+
+                if (AT_OFF) {
+                        if (imp->imp_server_timeout)
+                                timeout = cfs_time_seconds(obd_timeout / 2);
+                        else
+                                timeout = cfs_time_seconds(obd_timeout);
+                } else {
+                        int idx = import_at_get_index(imp,
+                                imp->imp_client->cli_request_portal);
+                        timeout = cfs_time_seconds(
+                                at_get(&imp->imp_at.iat_service_estimate[idx]));
+                }
                 
                 lwi = LWI_TIMEOUT_INTR(cfs_timeout_cap(timeout), 
                                        back_to_sleep, LWI_ON_SIGNAL_NOOP, NULL);
@@ -1177,11 +1228,19 @@ int ptlrpc_disconnect_import(struct obd_import *imp, int noclose)
                  * it fails.  We can get through the above with a down server
                  * if the client doesn't know the server is gone yet. */
                 req->rq_no_resend = 1;
-#ifdef CRAY_XT3
-                req->rq_timeout = obd_timeout / 3;
+
+#ifndef CRAY_XT3
+                /* We want client umounts to happen quickly, no matter the
+                   server state... */
+                req->rq_timeout = min_t(int, req->rq_timeout,
+                                        INITIAL_CONNECT_TIMEOUT);
 #else
-                req->rq_timeout = 5;
+                /* ... but we always want liblustre clients to nicely
+                   disconnect, so only use the adaptive value. */
+                if (AT_OFF)
+                        req->rq_timeout = obd_timeout / 3;
 #endif
+
                 IMPORT_SET_STATE(imp, LUSTRE_IMP_CONNECTING);
                 req->rq_send_state =  LUSTRE_IMP_CONNECTING;
                 ptlrpc_request_set_replen(req);
@@ -1198,9 +1257,132 @@ out:
                 IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
         memset(&imp->imp_remote_handle, 0, sizeof(imp->imp_remote_handle));
         imp->imp_conn_cnt = 0;
+        /* Try all connections in the future - bz 12758 */
         imp->imp_last_recon = 0;
         spin_unlock(&imp->imp_lock);
 
         RETURN(rc);
 }
 
+
+/* Adaptive Timeout utils */
+extern unsigned int at_min, at_max, at_history;
+
+/* Bin into timeslices using AT_BINS bins.
+   This gives us a max of the last binlimit*AT_BINS secs without the storage,
+   but still smoothing out a return to normalcy from a slow response.
+   (E.g. remember the maximum latency in each minute of the last 4 minutes.) */
+int at_add(struct adaptive_timeout *at, unsigned int val)
+{
+        unsigned int old = at->at_current;
+        time_t now = cfs_time_current_sec();
+        time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
+
+        LASSERT(at);
+#if 0
+        CDEBUG(D_INFO, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
+               val, at, now - at->at_binstart, at->at_current,
+               at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
+#endif
+        if (val == 0)
+                /* 0's don't count, because we never want our timeout to
+                   drop to 0, and because 0 could mean an error */
+                return 0;
+
+        spin_lock(&at->at_lock);
+
+        if (unlikely(at->at_binstart == 0)) {
+                /* Special case to remove default from history */
+                at->at_current = val;
+                at->at_worst_ever = val;
+                at->at_worst_time = now;
+                at->at_hist[0] = val;
+                at->at_binstart = now;
+        } else if (now - at->at_binstart < binlimit ) {
+                /* in bin 0 */
+                at->at_hist[0] = max(val, at->at_hist[0]);
+                at->at_current = max(val, at->at_current);
+        } else {
+                int i, shift;
+                unsigned int maxv = val;
+                /* move bins over */
+                shift = (now - at->at_binstart) / binlimit;
+                LASSERT(shift > 0);
+                for(i = AT_BINS - 1; i >= 0; i--) {
+                        if (i >= shift) {
+                                at->at_hist[i] = at->at_hist[i - shift];
+                                maxv = max(maxv, at->at_hist[i]);
+                        } else {
+                                at->at_hist[i] = 0;
+                        }
+                }
+                at->at_hist[0] = val;
+                at->at_current = maxv;
+                at->at_binstart += shift * binlimit;
+        }
+
+        if (at->at_current > at->at_worst_ever) {
+                at->at_worst_ever = at->at_current;
+                at->at_worst_time = now;
+        }
+
+        if (at->at_flags & AT_FLG_NOHIST)
+                /* Only keep last reported val; keeping the rest of the history
+                   for proc only */
+                at->at_current = val;
+
+        if (at_max > 0)
+                at->at_current =  min(at->at_current, at_max);
+        at->at_current =  max(at->at_current, at_min);
+
+#if 0
+        if (at->at_current != old)
+                CDEBUG(D_ADAPTTO, "AT %p change: old=%u new=%u delta=%d "
+                       "(val=%u) hist %u %u %u %u\n", at,
+                       old, at->at_current, at->at_current - old, val,
+                       at->at_hist[0], at->at_hist[1], at->at_hist[2],
+                       at->at_hist[3]);
+#endif
+
+        /* if we changed, report the old value */
+        old = (at->at_current != old) ? old : 0;
+
+        spin_unlock(&at->at_lock);
+        return old;
+}
+
+/* Find the imp_at index for a given portal; assign if space available */
+int import_at_get_index(struct obd_import *imp, int portal)
+{
+        struct imp_at *at = &imp->imp_at;
+        int i;
+
+        for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
+                if (at->iat_portal[i] == portal)
+                        return i;
+                if (at->iat_portal[i] == 0)
+                        /* unused */
+                        break;
+        }
+
+        /* Not found in list, add it under a lock */
+        spin_lock(&imp->imp_lock);
+
+        /* Check unused under lock */
+        for (; i < IMP_AT_MAX_PORTALS; i++) {
+                if (at->iat_portal[i] == portal)
+                        goto out;
+                if (at->iat_portal[i] == 0)
+                        /* unused */
+                        break;
+        }
+
+        /* Not enough portals? */
+        LASSERT(i < IMP_AT_MAX_PORTALS);
+
+        at->iat_portal[i] = portal;
+out:
+        spin_unlock(&imp->imp_lock);
+        return i;
+}
+
index 29c4cfc..20a5673 100644 (file)
@@ -183,6 +183,8 @@ void ptlrpc_lprocfs_register(struct proc_dir_entry *root, char *dir,
                              svc_counter_config, "req_qdepth", "reqs");
         lprocfs_counter_init(svc_stats, PTLRPC_REQACTIVE_CNTR,
                              svc_counter_config, "req_active", "reqs");
+        lprocfs_counter_init(svc_stats, PTLRPC_TIMEOUT,
+                             svc_counter_config, "req_timeout", "sec");
         lprocfs_counter_init(svc_stats, PTLRPC_REQBUF_AVAIL_CNTR,
                              svc_counter_config, "reqbuf_avail", "bufs");
         for (i = 0; i < EXTRA_LAST_OPC; i++) {
@@ -359,6 +361,36 @@ ptlrpc_lprocfs_svc_req_history_next(struct seq_file *s,
         return srhi;
 }
 
+/* common ost/mdt srv_request_history_print_fn */
+void target_print_req(void *seq_file, struct ptlrpc_request *req)
+{
+        /* Called holding srv_lock with irqs disabled.
+         * Print specific req contents and a newline.
+         * CAVEAT EMPTOR: check request message length before printing!!!
+         * You might have received any old crap so you must be just as
+         * careful here as the service's request parser!!! */
+        struct seq_file *sf = seq_file;
+
+        switch (req->rq_phase) {
+        case RQ_PHASE_NEW:
+                /* still awaiting a service thread's attention, or rejected
+                 * because the generic request message didn't unpack */
+                seq_printf(sf, "<not swabbed>\n");
+                break;
+        case RQ_PHASE_INTERPRET:
+                /* being handled, so basic msg swabbed, and opc is valid
+                 * but racing with mds_handle() */
+        case RQ_PHASE_COMPLETE:
+                /* been handled by mds_handle() reply state possibly still
+                 * volatile */
+                seq_printf(sf, "opc %d\n", lustre_msg_get_opc(req->rq_reqmsg));
+                break;
+        default:
+                DEBUG_REQ(D_ERROR, req, "bad phase %d", req->rq_phase);
+        }
+}
+EXPORT_SYMBOL(target_print_req);
+
 static int ptlrpc_lprocfs_svc_req_history_show(struct seq_file *s, void *iter)
 {
         struct ptlrpc_service      *svc = s->private;
@@ -379,11 +411,13 @@ static int ptlrpc_lprocfs_svc_req_history_show(struct seq_file *s, void *iter)
                  * must be just as careful as the service's request
                  * parser. Currently I only print stuff here I know is OK
                  * to look at coz it was set up in request_in_callback()!!! */
-                seq_printf(s, LPD64":%s:%s:"LPD64":%d:%s ",
+                seq_printf(s, LPD64":%s:%s:x"LPD64":%d:%s:%ld:%lds(%+lds) ",
                            req->rq_history_seq, libcfs_nid2str(req->rq_self),
                            libcfs_id2str(req->rq_peer), req->rq_xid,
-                           req->rq_reqlen,ptlrpc_rqphase2str(req));
-
+                           req->rq_reqlen, ptlrpc_rqphase2str(req),
+                           req->rq_arrival_time.tv_sec,
+                           req->rq_sent - req->rq_arrival_time.tv_sec,
+                           req->rq_sent - req->rq_deadline);
                 if (svc->srv_request_history_print_fn == NULL)
                         seq_printf(s, "\n");
                 else
@@ -420,6 +454,34 @@ ptlrpc_lprocfs_svc_req_history_open(struct inode *inode, struct file *file)
         return 0;
 }
 
+/* See also lprocfs_rd_timeouts */
+static int ptlrpc_lprocfs_rd_timeouts(char *page, char **start, off_t off,
+                                      int count, int *eof, void *data)
+{
+        struct ptlrpc_service *svc = data;
+        unsigned int cur, worst;
+        time_t worstt;
+        struct dhms ts;
+        int rc = 0;
+
+        *eof = 1;
+        cur = at_get(&svc->srv_at_estimate);
+        worst = svc->srv_at_estimate.at_worst_ever;
+        worstt = svc->srv_at_estimate.at_worst_time;
+        s2dhms(&ts, cfs_time_current_sec() - worstt);
+        if (AT_OFF)
+                rc += snprintf(page + rc, count - rc,
+                              "adaptive timeouts off, using obd_timeout %u\n",
+                              obd_timeout);
+        rc += snprintf(page + rc, count - rc,
+                       "%10s : cur %3u  worst %3u (at %ld, "DHMS_FMT" ago) ",
+                       "service", cur, worst, worstt,
+                       DHMS_VARS(&ts));
+        rc = lprocfs_at_hist_helper(page, count, rc,
+                                    &svc->srv_at_estimate);
+        return rc;
+}
+
 void ptlrpc_lprocfs_register_service(struct proc_dir_entry *entry,
                                      struct ptlrpc_service *svc)
 {
@@ -432,6 +494,9 @@ void ptlrpc_lprocfs_register_service(struct proc_dir_entry *entry,
                  .write_fptr = ptlrpc_lprocfs_write_req_history_max,
                  .read_fptr  = ptlrpc_lprocfs_read_req_history_max,
                  .data       = svc},
+                {.name       = "timeouts",
+                 .read_fptr  = ptlrpc_lprocfs_rd_timeouts,
+                 .data       = svc},
                 {NULL}
         };
         static struct file_operations req_history_fops = {
index 4f71dcb..d7256c2 100644 (file)
@@ -35,7 +35,8 @@
 
 static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len,
                          lnet_ack_req_t ack, struct ptlrpc_cb_id *cbid,
-                         struct ptlrpc_connection *conn, int portal, __u64 xid)
+                         struct ptlrpc_connection *conn, int portal, __u64 xid,
+                         unsigned int offset)
 {
         int              rc;
         lnet_md_t         md;
@@ -64,11 +65,11 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len,
                 RETURN (-ENOMEM);
         }
 
-        CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
-               len, portal, xid);
+        CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64", offset %u\n",
+               len, portal, xid, offset);
 
         rc = LNetPut (conn->c_self, *mdh, ack,
-                      conn->c_peer, portal, xid, 0, 0);
+                      conn->c_peer, portal, xid, offset, 0);
         if (unlikely(rc != 0)) {
                 int rc2;
                 /* We're going to get an UNLINK event when I unlink below,
@@ -302,7 +303,47 @@ void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
         }
 }
 
-int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult)
+static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags)
+{
+        struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+        int service_time = max_t(int, cfs_time_current_sec() -
+                                 req->rq_arrival_time.tv_sec, 1);
+
+        if (!(flags & PTLRPC_REPLY_EARLY) &&
+            (req->rq_type != PTL_RPC_MSG_ERR)) {
+                /* early replies and errors don't count toward our service
+                   time estimate */
+                int oldse = at_add(&svc->srv_at_estimate, service_time);
+                if (oldse != 0)
+                        DEBUG_REQ(D_ADAPTTO, req,
+                                  "svc %s changed estimate from %d to %d",
+                                  svc->srv_name, oldse,
+                                  at_get(&svc->srv_at_estimate));
+        }
+        /* Report actual service time for client latency calc */
+        lustre_msg_set_service_time(req->rq_repmsg, service_time);
+        /* Report service time estimate for future client reqs, but report 0
+         * (to be ignored by client) if it's a error reply during recovery.
+         * (bz15815) */
+        if (req->rq_type == PTL_RPC_MSG_ERR &&
+            (req->rq_export == NULL || req->rq_export->exp_obd->obd_recovering))
+                lustre_msg_set_timeout(req->rq_repmsg, 0);
+        else
+                lustre_msg_set_timeout(req->rq_repmsg,
+                                       at_get(&svc->srv_at_estimate));
+
+        if (req->rq_reqmsg &&
+            !(lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
+                CDEBUG(D_ADAPTTO, "No early reply support: flags=%#x "
+                       "req_flags=%#x magic=%d:%x/%x len=%d\n",
+                       flags, lustre_msg_get_flags(req->rq_reqmsg),
+                       lustre_msg_is_v1(req->rq_reqmsg),
+                       lustre_msg_get_magic(req->rq_reqmsg),
+                       lustre_msg_get_magic(req->rq_repmsg), req->rq_replen);
+        }
+}
+
+int ptlrpc_send_reply (struct ptlrpc_request *req, int flags)
 {
         struct ptlrpc_service     *svc = req->rq_rqbd->rqbd_service;
         struct ptlrpc_reply_state *rs = req->rq_reply_state;
@@ -319,12 +360,14 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult)
         LASSERT (req->rq_no_reply == 0);
         LASSERT (req->rq_reqbuf != NULL);
         LASSERT (rs != NULL);
-        LASSERT (may_be_difficult || !rs->rs_difficult);
+        LASSERT ((flags & PTLRPC_REPLY_MAYBE_DIFFICULT) || !rs->rs_difficult);
         LASSERT (req->rq_repmsg != NULL);
         LASSERT (req->rq_repmsg == rs->rs_msg);
         LASSERT (rs->rs_cb_id.cbid_fn == reply_out_callback);
         LASSERT (rs->rs_cb_id.cbid_arg == rs);
 
+        /* There may be no rq_export during failover */
+
         if (unlikely(req->rq_export && req->rq_export->exp_obd &&
                      req->rq_export->exp_obd->obd_fail)) {
                 /* Failed obd's only send ENODEV */
@@ -344,6 +387,8 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult)
 
         target_pack_pool_reply(req);
 
+        ptlrpc_at_set_reply(req, flags);
+
         if (req->rq_export == NULL || req->rq_export->exp_connection == NULL)
                 conn = ptlrpc_get_connection(req->rq_peer, req->rq_self, NULL);
         else
@@ -360,14 +405,16 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult)
         if (unlikely(rc))
                 goto out;
 
+        req->rq_sent = cfs_time_current_sec();
+
         rc = ptl_send_buf (&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len,
                            rs->rs_difficult ? LNET_ACK_REQ : LNET_NOACK_REQ,
-                           &rs->rs_cb_id, conn,
-                           svc->srv_rep_portal, req->rq_xid);
+                           &rs->rs_cb_id, conn, svc->srv_rep_portal,
+                           req->rq_xid, req->rq_reply_off);
 out:
         if (unlikely(rc != 0)) {
                 atomic_dec (&svc->srv_outstanding_replies);
-                ptlrpc_rs_decref(rs);
+                ptlrpc_req_drop_rs(req);
         }
         ptlrpc_put_connection(conn);
         return rc;
@@ -441,6 +488,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         lustre_msg_set_type(request->rq_reqmsg, PTL_RPC_MSG_REQUEST);
         lustre_msg_set_conn_cnt(request->rq_reqmsg,
                                 request->rq_import->imp_conn_cnt);
+        lustre_msghdr_set_flags(request->rq_reqmsg,
+                                request->rq_import->imp_msghdr_flags);
 
         rc = sptlrpc_cli_wrap_request(request);
         if (rc)
@@ -456,10 +505,15 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         if (!noreply) {
                 LASSERT (request->rq_replen != 0);
                 if (request->rq_repbuf == NULL) {
+                        LASSERT(request->rq_repdata == NULL);
+                        LASSERT(request->rq_repmsg == NULL);
                         rc = sptlrpc_cli_alloc_repbuf(request,
                                                       request->rq_replen);
                         if (rc)
                                 GOTO(cleanup_bulk, rc);
+                } else {
+                        request->rq_repdata = NULL;
+                        request->rq_repmsg = NULL;
                 }
 
                 rc = LNetMEAttach(request->rq_reply_portal,/*XXX FIXME bug 249*/
@@ -475,6 +529,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         spin_lock(&request->rq_lock);
         /* If the MD attach succeeds, there _will_ be a reply_in callback */
         request->rq_receiving_reply = !noreply;
+        /* We are responsible for unlinking the reply buffer */
+        request->rq_must_unlink = !noreply;
         /* Clear any flags that may be present from previous sends. */
         request->rq_replied = 0;
         request->rq_err = 0;
@@ -487,13 +543,18 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         if (!noreply) {
                 reply_md.start     = request->rq_repbuf;
                 reply_md.length    = request->rq_repbuf_len;
-                reply_md.threshold = 1;
-                reply_md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT;
+                /* Allow multiple early replies */
+                reply_md.threshold = LNET_MD_THRESH_INF;
+                /* Manage remote for early replies */
+                reply_md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |
+                        LNET_MD_MANAGE_REMOTE;
                 reply_md.user_ptr  = &request->rq_reply_cbid;
                 reply_md.eq_handle = ptlrpc_eq_h;
 
-                rc = LNetMDAttach(reply_me_h, reply_md, LNET_UNLINK,
-                                 &request->rq_reply_md_h);
+                /* We must see the unlink callback to unset rq_must_unlink,
+                   so we can't auto-unlink */
+                rc = LNetMDAttach(reply_me_h, reply_md, LNET_RETAIN,
+                                  &request->rq_reply_md_h);
                 if (rc != 0) {
                         CERROR("LNetMDAttach failed: %d\n", rc);
                         LASSERT (rc == -ENOMEM);
@@ -518,15 +579,23 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
 
         OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_SEND, request->rq_timeout + 5);
 
-        request->rq_sent = cfs_time_current_sec();
         do_gettimeofday(&request->rq_arrival_time);
+        request->rq_sent = cfs_time_current_sec();
+        /* We give the server rq_timeout secs to process the req, and
+           add the network latency for our local timeout. */
+        request->rq_deadline = request->rq_sent + request->rq_timeout +
+                ptlrpc_at_get_net_latency(request);
+
         ptlrpc_pinger_sending_on_import(request->rq_import);
+
+        DEBUG_REQ(D_INFO, request, "send flg=%x",
+                  lustre_msg_get_flags(request->rq_reqmsg));
         rc = ptl_send_buf(&request->rq_req_md_h,
                           request->rq_reqbuf, request->rq_reqdata_len,
                           LNET_NOACK_REQ, &request->rq_req_cbid,
                           connection,
                           request->rq_request_portal,
-                          request->rq_xid);
+                          request->rq_xid, 0);
         if (rc == 0) {
                 ptlrpc_lprocfs_rpc_sent(request);
                 RETURN(rc);
@@ -535,8 +604,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         ptlrpc_req_finished(request);
         if (noreply)
                 RETURN(rc);
-        else
-                GOTO(cleanup_me, rc);
+
  cleanup_me:
         /* MEUnlink is safe; the PUT didn't even get off the ground, and
          * nobody apart from the PUT's target has the right nid+XID to
index 19f66f5..4e8a146 100644 (file)
@@ -38,6 +38,7 @@
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_net.h>
+#include <obd_cksum.h>
 
 static inline int lustre_msg_hdr_size_v2(int count)
 {
@@ -84,6 +85,15 @@ int lustre_msg_check_version(struct lustre_msg *msg, __u32 version)
         }
 }
 
+/* early reply size */
+int lustre_msg_early_size() {
+        static int size = 0;
+        if (!size)
+                size = lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, NULL);
+        return size;
+}
+EXPORT_SYMBOL(lustre_msg_early_size);
+
 int lustre_msg_size_v2(int count, int *lengths)
 {
         int size;
@@ -272,7 +282,7 @@ void lustre_put_emerg_rs(struct ptlrpc_reply_state *rs)
 }
 
 int lustre_pack_reply_v2(struct ptlrpc_request *req, int count,
-                         int *lens, char **bufs)
+                         int *lens, char **bufs, int flags)
 {
         struct ptlrpc_reply_state *rs;
         int                        msg_len, rc;
@@ -280,6 +290,9 @@ int lustre_pack_reply_v2(struct ptlrpc_request *req, int count,
 
         LASSERT(req->rq_reply_state == NULL);
 
+        if ((flags & LPRFL_EARLY_REPLY) == 0)
+                req->rq_packed_final = 1;
+
         msg_len = lustre_msg_size_v2(count, lens);
         rc = sptlrpc_svc_alloc_rs(req, msg_len);
         if (rc)
@@ -296,6 +309,7 @@ int lustre_pack_reply_v2(struct ptlrpc_request *req, int count,
         req->rq_replen = msg_len;
         req->rq_reply_state = rs;
         req->rq_repmsg = rs->rs_msg;
+
         lustre_init_msg_v2(rs->rs_msg, count, lens, bufs);
         lustre_msg_add_version(rs->rs_msg, PTLRPC_MSG_VERSION);
         lustre_set_rep_swabbed(req, MSG_PTLRPC_BODY_OFF);
@@ -306,8 +320,8 @@ int lustre_pack_reply_v2(struct ptlrpc_request *req, int count,
 }
 EXPORT_SYMBOL(lustre_pack_reply_v2);
 
-int lustre_pack_reply(struct ptlrpc_request *req, int count, int *lens,
-                      char **bufs)
+int lustre_pack_reply_flags(struct ptlrpc_request *req, int count, int *lens,
+                            char **bufs, int flags)
 {
         int rc = 0;
         int size[] = { sizeof(struct ptlrpc_body) };
@@ -323,7 +337,7 @@ int lustre_pack_reply(struct ptlrpc_request *req, int count, int *lens,
         switch (req->rq_reqmsg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
         case LUSTRE_MSG_MAGIC_V2_SWABBED:
-                rc = lustre_pack_reply_v2(req, count, lens, bufs);
+                rc = lustre_pack_reply_v2(req, count, lens, bufs, flags);
                 break;
         default:
                 LASSERTF(0, "incorrect message magic: %08x\n",
@@ -336,6 +350,12 @@ int lustre_pack_reply(struct ptlrpc_request *req, int count, int *lens,
         return rc;
 }
 
+int lustre_pack_reply(struct ptlrpc_request *req, int count, int *lens,
+                      char **bufs)
+{
+        return lustre_pack_reply_flags(req, count, lens, bufs, 0);
+}
+
 void *lustre_msg_buf_v2(struct lustre_msg_v2 *m, int n, int min_size)
 {
         int i, offset, buflen, bufcount;
@@ -468,8 +488,8 @@ static int lustre_unpack_msg_v2(struct lustre_msg_v2 *m, int len)
                 __swab32s(&m->lm_bufcount);
                 __swab32s(&m->lm_secflvr);
                 __swab32s(&m->lm_repsize);
-                __swab32s(&m->lm_timeout);
-                CLASSERT(offsetof(typeof(*m), lm_padding_1) != 0);
+                __swab32s(&m->lm_cksum);
+                __swab32s(&m->lm_flags);
                 CLASSERT(offsetof(typeof(*m), lm_padding_2) != 0);
                 CLASSERT(offsetof(typeof(*m), lm_padding_3) != 0);
         }
@@ -729,6 +749,35 @@ void *lustre_swab_repbuf(struct ptlrpc_request *req, int index, int min_size,
         return lustre_swab_buf(req->rq_repmsg, index, min_size, swabber);
 }
 
+__u32 lustre_msghdr_get_flags(struct lustre_msg *msg)
+{
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+        case LUSTRE_MSG_MAGIC_V1_SWABBED:
+                return 0;
+        case LUSTRE_MSG_MAGIC_V2:
+        case LUSTRE_MSG_MAGIC_V2_SWABBED:
+                /* already in host endian */
+                return msg->lm_flags;
+        default:
+                LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
+                return 0;
+        }
+}
+
+void lustre_msghdr_set_flags(struct lustre_msg *msg, __u32 flags)
+{
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+                return;
+        case LUSTRE_MSG_MAGIC_V2:
+                msg->lm_flags = flags;
+                return;
+        default:
+                LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
+        }
+}
+
 __u32 lustre_msg_get_flags(struct lustre_msg *msg)
 {
         switch (msg->lm_magic) {
@@ -1132,6 +1181,17 @@ __u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg)
         }
 }
 
+int lustre_msg_is_v1(struct lustre_msg *msg)
+{
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+        case LUSTRE_MSG_MAGIC_V1_SWABBED:
+                return 1;
+        default:
+                return 0;
+        }
+}
+
 __u32 lustre_msg_get_magic(struct lustre_msg *msg)
 {
         switch (msg->lm_magic) {
@@ -1144,6 +1204,88 @@ __u32 lustre_msg_get_magic(struct lustre_msg *msg)
         }
 }
 
+__u32 lustre_msg_get_timeout(struct lustre_msg *msg)
+{
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+        case LUSTRE_MSG_MAGIC_V1_SWABBED:
+                return 0;
+        case LUSTRE_MSG_MAGIC_V2:
+        case LUSTRE_MSG_MAGIC_V2_SWABBED: {
+                struct ptlrpc_body *pb;
+
+                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                if (!pb) {
+                        CERROR("invalid msg %p: no ptlrpc body!\n", msg);
+                        return 0;
+
+                }
+                return pb->pb_timeout;
+        }
+        default:
+                CERROR("incorrect message magic: %08x\n", msg->lm_magic);
+                return 0;
+        }
+}
+
+__u32 lustre_msg_get_service_time(struct lustre_msg *msg)
+{
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+        case LUSTRE_MSG_MAGIC_V1_SWABBED:
+                return 0;
+        case LUSTRE_MSG_MAGIC_V2:
+        case LUSTRE_MSG_MAGIC_V2_SWABBED: {
+                struct ptlrpc_body *pb;
+
+                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                if (!pb) {
+                        CERROR("invalid msg %p: no ptlrpc body!\n", msg);
+                        return 0;
+
+                }
+                return pb->pb_service_time;
+        }
+        default:
+                CERROR("incorrect message magic: %08x\n", msg->lm_magic);
+                return 0;
+        }
+}
+
+__u32 lustre_msg_get_cksum(struct lustre_msg *msg)
+{
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+        case LUSTRE_MSG_MAGIC_V1_SWABBED:
+                return 0;
+        case LUSTRE_MSG_MAGIC_V2:
+        case LUSTRE_MSG_MAGIC_V2_SWABBED:
+                return msg->lm_cksum;
+        default:
+                CERROR("incorrect message magic: %08x\n", msg->lm_magic);
+                return 0;
+        }
+}
+
+__u32 lustre_msg_calc_cksum(struct lustre_msg *msg)
+{
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+        case LUSTRE_MSG_MAGIC_V1_SWABBED:
+                return 0;
+        case LUSTRE_MSG_MAGIC_V2:
+        case LUSTRE_MSG_MAGIC_V2_SWABBED: {
+                struct ptlrpc_body *pb;
+                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
+                return crc32_le(~(__u32)0, (char *)pb, sizeof(*pb));
+        }
+        default:
+                CERROR("incorrect message magic: %08x\n", msg->lm_magic);
+                return 0;
+        }
+}
+
 void lustre_msg_set_handle(struct lustre_msg *msg, struct lustre_handle *handle)
 {
         switch (msg->lm_magic) {
@@ -1272,6 +1414,56 @@ void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt)
         }
 }
 
+void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout)
+{
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+                return;
+        case LUSTRE_MSG_MAGIC_V2: {
+                struct ptlrpc_body *pb;
+
+                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
+                pb->pb_timeout = timeout;
+                return;
+        }
+        default:
+                LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
+        }
+}
+
+void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time)
+{
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+                return;
+        case LUSTRE_MSG_MAGIC_V2: {
+                struct ptlrpc_body *pb;
+
+                pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+                LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
+                pb->pb_service_time = service_time;
+                return;
+        }
+        default:
+                LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
+        }
+}
+
+void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum)
+{
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+                return;
+        case LUSTRE_MSG_MAGIC_V2:
+                msg->lm_cksum = cksum;
+                return;
+        default:
+                LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
+        }
+}
+
+
 void ptlrpc_request_set_replen(struct ptlrpc_request *req)
 {
         int count = req_capsule_filled_sizes(&req->rq_pill, RCL_SERVER);
@@ -1305,8 +1497,8 @@ void lustre_swab_ptlrpc_body(struct ptlrpc_body *b)
         __swab32s (&b->pb_flags);
         __swab32s (&b->pb_op_flags);
         __swab32s (&b->pb_conn_cnt);
-        CLASSERT(offsetof(typeof(*b), pb_padding_1) != 0);
-        CLASSERT(offsetof(typeof(*b), pb_padding_2) != 0);
+        __swab32s (&b->pb_timeout);
+        __swab32s (&b->pb_service_time);
         __swab32s (&b->pb_limit);
         __swab64s (&b->pb_slv);
 }
@@ -1978,8 +2170,9 @@ void _debug_req(struct ptlrpc_request *req, __u32 mask,
         va_start(args, fmt);
         libcfs_debug_vmsg2(data->msg_cdls, data->msg_subsys, mask, data->msg_file,
                            data->msg_fn, data->msg_line, fmt, args,
-                           " req@%p x"LPD64"/t"LPD64"("LPD64") o%d->%s@%s:%d lens"
-                           " %d/%d ref %d fl "REQ_FLAGS_FMT"/%x/%x rc %d/%d\n",
+                           " req@%p x"LPD64"/t"LPD64"("LPD64") o%d->%s@%s:%d/%d"
+                           " lens %d/%d e %d to %d dl %ld ref %d "
+                           "fl "REQ_FLAGS_FMT"/%x/%x rc %d/%d\n",
                            req, req->rq_xid, req->rq_transno,
                            req->rq_reqmsg ? lustre_msg_get_transno(req->rq_reqmsg) : 0,
                            req->rq_reqmsg ? lustre_msg_get_opc(req->rq_reqmsg) : -1,
@@ -1990,10 +2183,10 @@ void _debug_req(struct ptlrpc_request *req, __u32 mask,
                            (char *)req->rq_import->imp_connection->c_remote_uuid.uuid :
                            req->rq_export ?
                            (char *)req->rq_export->exp_connection->c_remote_uuid.uuid : "<?>",
-                           (req->rq_import && req->rq_import->imp_client) ?
-                           req->rq_import->imp_client->cli_request_portal : -1,
-                           req->rq_reqlen, req->rq_replen, atomic_read(&req->rq_refcount),
-                           DEBUG_REQ_FLAGS(req),
+                           req->rq_request_portal, req->rq_reply_portal,
+                           req->rq_reqlen, req->rq_replen,
+                           req->rq_early_count, req->rq_timeout, req->rq_deadline,
+                           atomic_read(&req->rq_refcount), DEBUG_REQ_FLAGS(req),
                            req->rq_reqmsg && req_ptlrpc_body_swabbed(req) ?
                            lustre_msg_get_flags(req->rq_reqmsg) : -1,
                            req->rq_repmsg && rep_ptlrpc_body_swabbed(req) ?
index 60e0555..318ed0e 100644 (file)
@@ -57,7 +57,6 @@ int ptlrpc_ping(struct obd_import *imp)
                   imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd));
         req->rq_no_resend = req->rq_no_delay = 1;
         ptlrpc_request_set_replen(req);
-        req->rq_timeout = PING_INTERVAL;
         ptlrpcd_add_req(req);
 
         RETURN(0);
@@ -66,9 +65,14 @@ int ptlrpc_ping(struct obd_import *imp)
 void ptlrpc_update_next_ping(struct obd_import *imp)
 {
 #ifdef ENABLE_PINGER
-        imp->imp_next_ping = cfs_time_shift(
-                                (imp->imp_state == LUSTRE_IMP_DISCON ?
-                                 RECONNECT_INTERVAL : PING_INTERVAL));
+        int time = PING_INTERVAL;
+        if (imp->imp_state == LUSTRE_IMP_DISCON) {
+                int dtime = max_t(int, CONNECTION_SWITCH_MIN,
+                                  AT_OFF ? 0 :
+                                  at_get(&imp->imp_at.iat_net_latency));
+                time = min(time, dtime);
+        }
+        imp->imp_next_ping = cfs_time_shift(time);
 #endif /* ENABLE_PINGER */
 }
 
@@ -436,7 +440,7 @@ static int ping_evictor_main(void *arg)
                 obd = pet_exp->exp_obd;
                 spin_unlock(&pet_lock);
 
-                expire_time = cfs_time_current_sec() - (3 * obd_timeout / 2);
+                expire_time = cfs_time_current_sec() - PING_EVICT_TIMEOUT;
 
                 CDEBUG(D_HA, "evicting all exports of obd %s older than %ld\n",
                        obd->obd_name, expire_time);
index 21c97cb..2449ac9 100644 (file)
@@ -152,6 +152,7 @@ EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
 EXPORT_SYMBOL(ptlrpc_init_rq_pool);
 EXPORT_SYMBOL(ptlrpc_free_rq_pool);
 EXPORT_SYMBOL(ptlrpc_prep_req_pool);
+EXPORT_SYMBOL(ptlrpc_at_set_req_timeout);
 EXPORT_SYMBOL(ptlrpc_request_alloc);
 EXPORT_SYMBOL(ptlrpc_request_alloc_pool);
 EXPORT_SYMBOL(ptlrpc_request_free);
@@ -202,6 +203,7 @@ EXPORT_SYMBOL(lustre_msg_swabbed);
 EXPORT_SYMBOL(lustre_msg_check_version);
 EXPORT_SYMBOL(lustre_pack_request);
 EXPORT_SYMBOL(lustre_pack_reply);
+EXPORT_SYMBOL(lustre_pack_reply_flags);
 EXPORT_SYMBOL(lustre_shrink_msg);
 EXPORT_SYMBOL(lustre_free_reply_state);
 EXPORT_SYMBOL(lustre_msg_size);
@@ -270,6 +272,7 @@ EXPORT_SYMBOL(lustre_msg_get_limit);
 EXPORT_SYMBOL(lustre_msg_set_slv);
 EXPORT_SYMBOL(lustre_msg_set_limit);
 EXPORT_SYMBOL(lustre_msg_get_conn_cnt);
+EXPORT_SYMBOL(lustre_msg_is_v1);
 EXPORT_SYMBOL(lustre_msg_get_magic);
 EXPORT_SYMBOL(lustre_msg_set_handle);
 EXPORT_SYMBOL(lustre_msg_set_type);
index cb06039..eac5128 100644 (file)
@@ -426,9 +426,10 @@ static int log_commit_thread(void *arg)
                                 break;
                         }
 
-                        /* XXX FIXME bug 249, 5515 */
+                        /* bug 5515 */
                         request->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
                         request->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
+                        ptlrpc_at_set_req_timeout(request);
 
                         ptlrpc_request_set_replen(request);
                         mutex_down(&llcd->llcd_ctxt->loc_sem);
index 315d439..a92a5c4 100644 (file)
@@ -110,30 +110,30 @@ struct ptlrpc_sec_policy * sptlrpc_rpcflavor2policy(__u16 flavor)
         if (number >= SPTLRPC_POLICY_MAX)
                 return NULL;
 
-again:
-        read_lock(&policy_lock);
-        policy = policies[number];
-        if (policy && !try_module_get(policy->sp_owner))
-                policy = NULL;
-        if (policy == NULL)
-                flag = atomic_read(&loaded);
-        read_unlock(&policy_lock);
-
-        /* if failure, try to load gss module, once */
-        if (unlikely(policy == NULL) && flag == 0 &&
-            number == SPTLRPC_POLICY_GSS) {
+        while (1) {
+                read_lock(&policy_lock);
+                policy = policies[number];
+                if (policy && !try_module_get(policy->sp_owner))
+                        policy = NULL;
+                if (policy == NULL)
+                        flag = atomic_read(&loaded);
+                read_unlock(&policy_lock);
+
+                if (policy != NULL || flag != 0 ||
+                    number != SPTLRPC_POLICY_GSS)
+                        break;
+
+                /* try to load gss module, once */
                 mutex_down(&load_mutex);
                 if (atomic_read(&loaded) == 0) {
-                        if (request_module("ptlrpc_gss") != 0)
-                                CERROR("Unable to load module ptlrpc_gss\n");
-                        else
+                        if (request_module("ptlrpc_gss") == 0)
                                 CWARN("module ptlrpc_gss loaded on demand\n");
+                        else
+                                CERROR("Unable to load module ptlrpc_gss\n");
 
                         atomic_set(&loaded, 1);
                 }
                 mutex_up(&load_mutex);
-
-                goto again;
         }
 
         return policy;
@@ -147,6 +147,8 @@ __u16 sptlrpc_name2rpcflavor(const char *name)
                 return SPTLRPC_FLVR_PLAIN;
         if (!strcmp(name, "krb5n"))
                 return SPTLRPC_FLVR_KRB5N;
+        if (!strcmp(name, "krb5a"))
+                return SPTLRPC_FLVR_KRB5A;
         if (!strcmp(name, "krb5i"))
                 return SPTLRPC_FLVR_KRB5I;
         if (!strcmp(name, "krb5p"))
@@ -844,10 +846,7 @@ int sptlrpc_cli_wrap_request(struct ptlrpc_request *req)
         RETURN(rc);
 }
 
-/*
- * rq_nob_received is the actual received data length
- */
-int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
+static int do_cli_unwrap_reply(struct ptlrpc_request *req)
 {
         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
         int                    rc;
@@ -856,39 +855,34 @@ int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
 
         LASSERT(ctx);
         LASSERT(ctx->cc_sec);
-        LASSERT(ctx->cc_ops);
         LASSERT(req->rq_repbuf);
+        LASSERT(req->rq_repdata);
+        LASSERT(req->rq_repmsg == NULL);
 
-        req->rq_repdata_len = req->rq_nob_received;
-
-        if (req->rq_nob_received < sizeof(struct lustre_msg)) {
+        if (req->rq_repdata_len < sizeof(struct lustre_msg)) {
                 CERROR("replied data length %d too small\n",
-                       req->rq_nob_received);
+                       req->rq_repdata_len);
                 RETURN(-EPROTO);
         }
 
+        /* v2 message, check request/reply policy match */
+        rpc_flvr = WIRE_FLVR_RPC(req->rq_repdata->lm_secflvr);
 
-        /*
-         * v2 message, check request/reply policy match
-         */
-        rpc_flvr = WIRE_FLVR_RPC(req->rq_repbuf->lm_secflvr);
-
-        if (req->rq_repbuf->lm_magic == LUSTRE_MSG_MAGIC_V2_SWABBED)
+        if (req->rq_repdata->lm_magic == LUSTRE_MSG_MAGIC_V2_SWABBED)
                 __swab16s(&rpc_flvr);
 
         if (RPC_FLVR_POLICY(rpc_flvr) !=
-                RPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
+            RPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
                 CERROR("request policy was %u while reply with %u\n",
-                        RPC_FLVR_POLICY(req->rq_flvr.sf_rpc),
-                        RPC_FLVR_POLICY(rpc_flvr));
+                       RPC_FLVR_POLICY(req->rq_flvr.sf_rpc),
+                       RPC_FLVR_POLICY(rpc_flvr));
                 RETURN(-EPROTO);
         }
 
         /* do nothing if it's null policy; otherwise unpack the
-         * wrapper message
-         */
+         * wrapper message */
         if (RPC_FLVR_POLICY(rpc_flvr) != SPTLRPC_POLICY_NULL &&
-            lustre_unpack_msg(req->rq_repbuf, req->rq_nob_received))
+            lustre_unpack_msg(req->rq_repdata, req->rq_repdata_len))
                 RETURN(-EPROTO);
 
         switch (RPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
@@ -910,6 +904,144 @@ int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
         RETURN(rc);
 }
 
+/*
+ * upon this be called, the reply buffer should have been un-posted,
+ * so nothing is going to change.
+ */
+int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
+{
+        LASSERT(req->rq_repbuf);
+        LASSERT(req->rq_repdata == NULL);
+        LASSERT(req->rq_repmsg == NULL);
+        LASSERT(req->rq_reply_off + req->rq_nob_received <= req->rq_repbuf_len);
+
+        if (req->rq_reply_off == 0) {
+                CERROR("real reply with offset 0\n");
+                return -EPROTO;
+        }
+
+        if (req->rq_reply_off % 8 != 0) {
+                CERROR("reply at odd offset %u\n", req->rq_reply_off);
+                return -EPROTO;
+        }
+
+        req->rq_repdata = (struct lustre_msg *)
+                                (req->rq_repbuf + req->rq_reply_off);
+        req->rq_repdata_len = req->rq_nob_received;
+
+        return do_cli_unwrap_reply(req);
+}
+
+/*
+ * Upon called, the receive buffer might be still posted, so the reply data
+ * might be changed at any time, no matter we're holding rq_lock or not. we
+ * expect the rq_reply_off be 0, rq_nob_received is the early reply size.
+ *
+ * we allocate a separate buffer to hold early reply data, pointed by
+ * rq_repdata, rq_repdata_len is the early reply size, and round up to power2
+ * is the actual buffer size.
+ *
+ * caller _must_ call sptlrpc_cli_finish_early_reply() after this, before
+ * process another early reply or real reply, to restore ptlrpc_request
+ * to normal status.
+ */
+int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req)
+{
+        struct lustre_msg      *early_buf;
+        int                     early_bufsz, early_size;
+        int                     rc;
+        ENTRY;
+
+        LASSERT(req->rq_repbuf);
+        LASSERT(req->rq_repdata == NULL);
+        LASSERT(req->rq_repmsg == NULL);
+
+        early_size = req->rq_nob_received;
+        if (early_size < sizeof(struct lustre_msg)) {
+                CERROR("early reply length %d too small\n", early_size);
+                RETURN(-EPROTO);
+        }
+
+        early_bufsz = size_roundup_power2(early_size);
+        OBD_ALLOC(early_buf, early_bufsz);
+        if (early_buf == NULL)
+                RETURN(-ENOMEM);
+
+        /* copy data out, do it inside spinlock */
+        spin_lock(&req->rq_lock);
+
+        if (req->rq_replied) {
+                spin_unlock(&req->rq_lock);
+                GOTO(err_free, rc = -EALREADY);
+        }
+
+&nb