Whamcloud - gitweb
Branch b1_8_gate
authorvitaly <vitaly>
Wed, 19 Nov 2008 21:14:46 +0000 (21:14 +0000)
committervitaly <vitaly>
Wed, 19 Nov 2008 21:14:46 +0000 (21:14 +0000)
b=16129
i=adilger
i=green

- a high priority request list is added into service;
- once a lock is canceled, all the IO requests, including coming
ones, under this lock, are moved into this list;
- PING is also added into this list;
- once a lock cancel timeout occurs, the timeout is prolonged
if there is an IO rpc under this lock;
- another request list is added into the export, used to speed up
the rpc-lock matching.

18 files changed:
libcfs/include/libcfs/libcfs_debug.h
lustre/include/lustre_dlm.h
lustre/include/lustre_export.h
lustre/include/lustre_net.h
lustre/include/obd_support.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/mgs/mgs_handler.c
lustre/obdclass/genops.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/service.c
lustre/tests/sanityN.sh

index 8487c39..d2663c8 100644 (file)
@@ -294,6 +294,11 @@ do {                                                                    \
 
 #endif /* !CDEBUG_ENTRY_EXIT */
 
+#define RETURN_EXIT                                                     \
+do {                                                                    \
+        EXIT_NESTING;                                                   \
+        return;                                                         \
+} while (0)
 
 struct libcfs_debug_msg_data {
         cfs_debug_limit_state_t *msg_cdls;
index aa0e358..3eac378 100644 (file)
@@ -432,6 +432,7 @@ struct ldlm_namespace {
 
         unsigned int           ns_max_unused;
         unsigned int           ns_max_age;
+        unsigned int           ns_timeouts;
          /**
           * Seconds.
           */
index b2a5c86..94033ef 100644 (file)
@@ -148,6 +148,7 @@ struct obd_export {
                                   exp_flvr_changed:1,
                                   exp_flvr_adapt:1,
                                   exp_libclient:1; /* liblustre client? */
+        struct list_head          exp_queued_rpc;  /* RPC to be handled */
         /* also protected by exp_lock */
         enum lustre_sec_part      exp_sp_peer;
         struct sptlrpc_flavor     exp_flvr;             /* current */
index 9d7f95b..0561fea 100644 (file)
 #define MGS_MAXREPSIZE  (9 * 1024)
 
 /* Absolute limits */
-#define OSS_THREADS_MIN 2
+#define OSS_THREADS_MIN 3       /* difficult replies, HPQ, others */
 #define OSS_THREADS_MAX 512
 #define OST_NBUFS       (64 * num_online_cpus())
 #define OST_BUFSIZE     (8 * 1024)
@@ -317,6 +317,20 @@ struct ptlrpc_request_pool {
 struct lu_context;
 struct lu_env;
 
+struct ldlm_lock;
+
+struct ptlrpc_hpreq_ops {
+        /**
+         * Check if the lock handle of the given lock is the same as
+         * taken from the request.
+         */
+        int  (*hpreq_lock_match)(struct ptlrpc_request *, struct ldlm_lock *);
+        /**
+         * Check if the request is a high priority one.
+         */
+        int  (*hpreq_check)(struct ptlrpc_request *);
+};
+
 /**
  * Represents remote procedure call.
  */
@@ -325,6 +339,8 @@ struct ptlrpc_request {
         struct list_head rq_list;
         struct list_head rq_timed_list;         /* server-side early replies */
         struct list_head rq_history_list;       /* server-side history */
+        struct list_head rq_exp_list;           /* server-side per-export list */
+        struct ptlrpc_hpreq_ops *rq_ops;        /* server-side hp handlers */
         __u64            rq_history_seq;        /* history sequence # */
         int rq_status;
         spinlock_t rq_lock;
@@ -348,7 +364,8 @@ struct ptlrpc_request {
                 rq_early:1, rq_must_unlink:1,
                 /* server-side flags */
                 rq_packed_final:1,  /* packed final reply */
-                rq_sent_final:1;    /* stop sending early replies */
+                rq_sent_final:1,    /* stop sending early replies */
+                rq_hp:1;            /* high priority RPC */
 
         enum rq_phase rq_phase; /* one of RQ_PHASE_* */
         enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
@@ -548,9 +565,9 @@ ptlrpc_rqphase2str(struct ptlrpc_request *req)
         FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"),                  \
         FLAG(req->rq_no_resend, "N"),                                           \
         FLAG(req->rq_waiting, "W"),                                             \
-        FLAG(req->rq_wait_ctx, "C")
+        FLAG(req->rq_wait_ctx, "C"), FLAG(req->rq_hp, "H")
 
-#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s"
+#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s"
 
 void _debug_req(struct ptlrpc_request *req, __u32 mask,
                 struct libcfs_debug_msg_data *data, const char *fmt, ...)
@@ -659,6 +676,9 @@ struct ptlrpc_request_buffer_desc {
 
 typedef int (*svc_handler_t)(struct ptlrpc_request *req);
 typedef void (*svcreq_printfn_t)(void *, struct ptlrpc_request *);
+typedef int (*svc_hpreq_handler_t)(struct ptlrpc_request *);
+
+#define PTLRPC_SVC_HP_RATIO 10
 
 struct ptlrpc_service {
         struct list_head srv_list;              /* chain thru all services */
@@ -673,6 +693,7 @@ struct ptlrpc_service {
         int              srv_threads_running;   /* # running threads */
         int              srv_n_difficult_replies; /* # 'difficult' replies */
         int              srv_n_active_reqs;     /* # reqs being served */
+        int              srv_n_hpreq;           /* # HPreqs being served */
         cfs_duration_t   srv_rqbd_timeout;      /* timeout before re-posting reqs, in tick */
         int              srv_watchdog_factor;   /* soft watchdog timeout mutiplier */
         unsigned         srv_cpu_affinity:1;    /* bind threads to CPUs */
@@ -690,8 +711,11 @@ struct ptlrpc_service {
         cfs_timer_t       srv_at_timer;         /* early reply timer */
 
         int               srv_n_queued_reqs;    /* # reqs in either of the queues below */
+        int               srv_hpreq_count;      /* # hp requests handled */
+        int               srv_hpreq_ratio;      /* # hp per lp reqs to handle */
         struct list_head  srv_req_in_queue;     /* incoming reqs */
         struct list_head  srv_request_queue;    /* reqs waiting for service */
+        struct list_head  srv_request_hpq;      /* high priority queue */
 
         struct list_head  srv_request_history;  /* request history */
         __u64             srv_request_seq;      /* next request sequence # */
@@ -716,6 +740,7 @@ struct ptlrpc_service {
 
         struct list_head   srv_threads;         /* service thread list */
         svc_handler_t      srv_handler;
+        svc_hpreq_handler_t srv_hpreq_handler;  /* hp request handler */
 
         char *srv_name; /* only statically allocated strings here; we don't clean them */
         char *srv_thread_name; /* only statically allocated strings here; we don't clean them */
@@ -973,7 +998,8 @@ struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
                                        cfs_proc_dir_entry_t *proc_entry,
                                        svcreq_printfn_t,
                                        int min_threads, int max_threads,
-                                       char *threadname, __u32 ctx_tags);
+                                       char *threadname, __u32 ctx_tags,
+                                       svc_hpreq_handler_t);
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
 
 int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc);
@@ -982,6 +1008,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service);
 int liblustre_check_services (void *arg);
 void ptlrpc_daemonize(char *name);
 int ptlrpc_service_health_check(struct ptlrpc_service *);
+void ptlrpc_hpreq_reorder(struct ptlrpc_request *req);
 
 
 struct ptlrpc_svc_data {
index 50379c5..a02b764 100644 (file)
@@ -292,6 +292,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 
 #define OBD_FAIL_PTLRPC_DUMP_LOG         0x50e
 #define OBD_FAIL_PTLRPC_LONG_UNLINK      0x50f
+#define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT    0x510
+#define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT  0x511
 
 #define OBD_FAIL_OBD_PING_NET            0x600
 #define OBD_FAIL_OBD_LOG_CANCEL_NET      0x601
index 50d4504..1ec702a 100644 (file)
@@ -369,6 +369,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
         CFS_INIT_LIST_HEAD(&lock->l_cache_locks_list);
         lu_ref_init(&lock->l_reference);
         lu_ref_add(&lock->l_reference, "hash", lock);
+        lock->l_callback_timeout = 0;
 
         RETURN(lock);
 }
@@ -1874,7 +1875,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                                    data->msg_fn, data->msg_line, fmt, args,
                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
                        "res: \?\? rrc=\?\? type: \?\?\? flags: %x remote: "
-                       LPX64" expref: %d pid: %u\n", lock,
+                       LPX64" expref: %d pid: %u timeout: %lu\n", lock,
                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
                        lock->l_readers, lock->l_writers,
                        ldlm_lockname[lock->l_granted_mode],
@@ -1882,7 +1883,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                        lock->l_flags, lock->l_remote_handle.cookie,
                        lock->l_export ?
                        atomic_read(&lock->l_export->exp_refcount) : -99,
-                       lock->l_pid);
+                       lock->l_pid, lock->l_callback_timeout);
                 va_end(args);
                 return;
         }
@@ -1894,7 +1895,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
                        "] (req "LPU64"->"LPU64") flags: %x remote: "LPX64
-                       " expref: %d pid: %u\n",
+                       " expref: %d pid: %u timeout %lu\n",
                        lock->l_resource->lr_namespace->ns_name, lock,
                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
                        lock->l_readers, lock->l_writers,
@@ -1910,7 +1911,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                        lock->l_flags, lock->l_remote_handle.cookie,
                        lock->l_export ?
                        atomic_read(&lock->l_export->exp_refcount) : -99,
-                       lock->l_pid);
+                       lock->l_pid, lock->l_callback_timeout);
                 break;
 
         case LDLM_FLOCK:
@@ -1919,7 +1920,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
                        "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
                        "["LPU64"->"LPU64"] flags: %x remote: "LPX64
-                       " expref: %d pid: %u\n",
+                       " expref: %d pid: %u timeout: %lu\n",
                        lock->l_resource->lr_namespace->ns_name, lock,
                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
                        lock->l_readers, lock->l_writers,
@@ -1935,7 +1936,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                        lock->l_flags, lock->l_remote_handle.cookie,
                        lock->l_export ?
                        atomic_read(&lock->l_export->exp_refcount) : -99,
-                       lock->l_pid);
+                       lock->l_pid, lock->l_callback_timeout);
                 break;
 
         case LDLM_IBITS:
@@ -1944,7 +1945,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
                        "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
                        "flags: %x remote: "LPX64" expref: %d "
-                       "pid %u\n",
+                       "pid: %u timeout: %lu\n",
                        lock->l_resource->lr_namespace->ns_name,
                        lock, lock->l_handle.h_cookie,
                        atomic_read (&lock->l_refc),
@@ -1959,7 +1960,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                        lock->l_flags, lock->l_remote_handle.cookie,
                        lock->l_export ?
                        atomic_read(&lock->l_export->exp_refcount) : -99,
-                       lock->l_pid);
+                       lock->l_pid, lock->l_callback_timeout);
                 break;
 
         default:
@@ -1967,7 +1968,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                                    data->msg_fn, data->msg_line, fmt, args,
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
                        "res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x "
-                       "remote: "LPX64" expref: %d pid: %u\n",
+                       "remote: "LPX64" expref: %d pid: %u timeout %lu\n",
                        lock->l_resource->lr_namespace->ns_name,
                        lock, lock->l_handle.h_cookie,
                        atomic_read (&lock->l_refc),
@@ -1981,7 +1982,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                        lock->l_flags, lock->l_remote_handle.cookie,
                        lock->l_export ?
                        atomic_read(&lock->l_export->exp_refcount) : -99,
-                       lock->l_pid);
+                       lock->l_pid, lock->l_callback_timeout);
                 break;
         }
         va_end(args);
index b809cba..9d6bb38 100644 (file)
@@ -238,6 +238,31 @@ static int expired_lock_main(void *arg)
 
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
 
+/**
+ * Check if there is a request in the export request list
+ * which prevents the lock canceling.
+ */
+static int ldlm_lock_busy(struct ldlm_lock *lock)
+{
+        struct ptlrpc_request *req;
+        int match = 0;
+        ENTRY;
+
+        if (lock->l_export == NULL)
+                return 0;
+
+        spin_lock(&lock->l_export->exp_lock);
+        list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+                if (req->rq_ops->hpreq_lock_match) {
+                        match = req->rq_ops->hpreq_lock_match(req, lock);
+                        if (match)
+                                break;
+                }
+        }
+        spin_unlock(&lock->l_export->exp_lock);
+        RETURN(match);
+}
+
 /* This is called from within a timer interrupt and cannot schedule */
 static void waiting_locks_callback(unsigned long unused)
 {
@@ -248,7 +273,6 @@ repeat:
         while (!list_empty(&waiting_locks_list)) {
                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
                                   l_pending_chain);
-
                 if (cfs_time_after(lock->l_callback_timeout, cfs_time_current()) ||
                     (lock->l_req_mode == LCK_GROUP))
                         break;
@@ -286,6 +310,29 @@ repeat:
                         goto repeat;
                 }
 
+                /* Check if we need to prolong timeout */
+                if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
+                    ldlm_lock_busy(lock)) {
+                        int cont = 1;
+
+                        if (lock->l_pending_chain.next == &waiting_locks_list)
+                                cont = 0;
+
+                        LDLM_LOCK_GET(lock);
+                        spin_unlock_bh(&waiting_locks_spinlock);
+                        LDLM_DEBUG(lock, "prolong the busy lock");
+                        ldlm_refresh_waiting_lock(lock);
+                        spin_lock_bh(&waiting_locks_spinlock);
+
+                        if (!cont) {
+                                LDLM_LOCK_PUT(lock);
+                                break;
+                        }
+
+                        LDLM_LOCK_PUT(lock);
+                        continue;
+                }
+                lock->l_resource->lr_namespace->ns_timeouts++;
                 LDLM_ERROR(lock, "lock callback timer expired after %lds: "
                            "evicting client at %s ",
                            cfs_time_current_sec()- lock->l_enqueued_time.tv_sec,
@@ -335,15 +382,21 @@ repeat:
  */
 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock)
 {
-        int timeout;
+        cfs_time_t timeout;
         cfs_time_t timeout_rounded;
 
         if (!list_empty(&lock->l_pending_chain))
                 return 0;
 
-        timeout = ldlm_get_enq_timeout(lock);
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
+            OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+                timeout = 2;
+        else
+                timeout = ldlm_get_enq_timeout(lock);
 
-        lock->l_callback_timeout = cfs_time_shift(timeout);
+        timeout = cfs_time_shift(timeout);
+        if (likely(cfs_time_after(timeout, lock->l_callback_timeout)))
+                lock->l_callback_timeout = timeout;
 
         timeout_rounded = round_timeout(lock->l_callback_timeout);
 
@@ -475,7 +528,6 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
         LDLM_DEBUG(lock, "refreshed");
         return 1;
 }
-
 #else /* !__KERNEL__ */
 
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
@@ -621,6 +673,30 @@ static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
         RETURN(rc);
 }
 
+/**
+ * Check if there are requests in the export request list which prevent
+ * the lock canceling and make these requests high priority ones.
+ */
+static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
+{
+        struct ptlrpc_request *req;
+        ENTRY;
+
+        if (lock->l_export == NULL) {
+                LDLM_DEBUG(lock, "client lock: no-op");
+                RETURN_EXIT;
+        }
+
+        spin_lock(&lock->l_export->exp_lock);
+        list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
+                if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
+                    req->rq_ops->hpreq_lock_match(req, lock))
+                        ptlrpc_hpreq_reorder(req);
+        }
+        spin_unlock(&lock->l_export->exp_lock);
+        EXIT;
+}
+
 /*
  * ->l_blocking_ast() method for server-side locks. This is invoked when newly
  * enqueued server lock conflicts with given one.
@@ -650,6 +726,8 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 ldlm_lock_dump(D_ERROR, lock, 0);
         }
 
+        ldlm_lock_reorder_req(lock);
+
         req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
                                         &RQF_LDLM_BL_CALLBACK,
                                         LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK);
@@ -2193,7 +2271,7 @@ static int ldlm_setup(void)
                                 ldlm_svc_proc_dir, NULL,
                                 ldlm_min_threads, ldlm_max_threads,
                                 "ldlm_cb",
-                                LCT_MD_THREAD|LCT_DT_THREAD);
+                                LCT_MD_THREAD|LCT_DT_THREAD, NULL);
 
         if (!ldlm_state->ldlm_cb_service) {
                 CERROR("failed to start service\n");
@@ -2208,7 +2286,8 @@ static int ldlm_setup(void)
                                 ldlm_svc_proc_dir, NULL,
                                 ldlm_min_threads, ldlm_max_threads,
                                 "ldlm_cn",
-                                LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD);
+                                LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
+                                NULL);
 
         if (!ldlm_state->ldlm_cancel_service) {
                 CERROR("failed to start service\n");
index 92068f6..1317844 100644 (file)
@@ -120,6 +120,7 @@ int ldlm_get_enq_timeout(struct ldlm_lock *lock)
         timeout = timeout + (timeout >> 1); /* 150% */
         return max(timeout, ldlm_enqueue_min);
 }
+EXPORT_SYMBOL(ldlm_get_enq_timeout);
 
 static int is_granted_or_cancelled(struct ldlm_lock *lock)
 {
index 1b4e46e..320a870 100644 (file)
@@ -287,6 +287,12 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns)
                 lock_vars[0].write_fptr = lprocfs_wr_uint;
                 lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
 
+                snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_timeouts",
+                         ns->ns_name);
+                lock_vars[0].data = &ns->ns_timeouts;
+                lock_vars[0].read_fptr = lprocfs_rd_uint;
+                lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
                 snprintf(lock_name, MAX_STRING_SIZE, "%s/max_nolock_bytes",
                          ns->ns_name);
                 lock_vars[0].data = &ns->ns_max_nolock_size;
@@ -370,6 +376,7 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obd, char *name,
         ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE;
         ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE;
         ns->ns_ctime_age_limit = LDLM_CTIME_AGE_LIMIT;
+        ns->ns_timeouts = 0;
         spin_lock_init(&ns->ns_unused_lock);
         ns->ns_orig_connect_flags = 0;
         ns->ns_connect_flags = 0;
index e724962..9fbe6c2 100644 (file)
@@ -248,7 +248,7 @@ static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                                 mgs_handle, LUSTRE_MGS_NAME,
                                 obd->obd_proc_entry, target_print_req,
                                 MGS_THREADS_AUTO_MIN, MGS_THREADS_AUTO_MAX,
-                                "ll_mgs", LCT_MD_THREAD);
+                                "ll_mgs", LCT_MD_THREAD, NULL);
 
         if (!mgs->mgs_service) {
                 CERROR("failed to start service\n");
index edd6dfb..5adafa3 100644 (file)
@@ -717,6 +717,7 @@ static void class_export_destroy(struct obd_export *exp)
 
         LASSERT(list_empty(&exp->exp_outstanding_replies));
         LASSERT(list_empty(&exp->exp_req_replay_queue));
+        LASSERT(list_empty(&exp->exp_queued_rpc));
         obd_destroy_export(exp);
         class_decref(obd, "export", exp);
 
@@ -745,6 +746,7 @@ struct obd_export *class_new_export(struct obd_device *obd,
         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
+        CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
         class_handle_hash(&export->exp_handle, export_handle_addref);
         export->exp_last_request_time = cfs_time_current_sec();
         spin_lock_init(&export->exp_lock);
index 8b2b7fb..bdea74e 100644 (file)
@@ -300,10 +300,9 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
 
-        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
-                                  lustre_swab_ost_body);
-        if (body == NULL)
-                RETURN(-EFAULT);
+        /* ost_body is varified and swabbed in ost_hpreq_handler() */
+        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+        LASSERT(body != NULL);
 
         oinfo.oi_oa = &body->oa;
         oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
@@ -493,7 +492,9 @@ static void ost_brw_lock_put(int mode,
 struct ost_prolong_data {
         struct obd_export *opd_exp;
         ldlm_policy_data_t opd_policy;
+        struct obdo *opd_oa;
         ldlm_mode_t opd_mode;
+        int opd_lock_match;
 };
 
 static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
@@ -523,6 +524,14 @@ static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
                 return LDLM_ITER_CONTINUE;
         }
 
+        /* Fill the obdo with the matched lock handle.
+         * XXX: it is possible in some cases the IO RPC is covered by several
+         * locks, even for the write case, so it may need to be a lock list. */
+        if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) {
+                opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie;
+                opd->opd_oa->o_valid |= OBD_MD_FLHANDLE;
+        }
+
         if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
                 /* ignore locks not being cancelled */
                 return LDLM_ITER_CONTINUE;
@@ -531,17 +540,18 @@ static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
         /* OK. this is a possible lock the user holds doing I/O
          * let's refresh eviction timer for it */
         ldlm_refresh_waiting_lock(lock);
+        opd->opd_lock_match = 1;
 
         return LDLM_ITER_CONTINUE;
 }
 
-static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
-                              struct niobuf_remote *nb, struct obdo *oa,
-                              ldlm_mode_t mode)
+static int ost_rw_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
+                                struct niobuf_remote *nb, struct obdo *oa,
+                                ldlm_mode_t mode)
 {
         struct ldlm_res_id res_id;
         int nrbufs = obj->ioo_bufcnt;
-        struct ost_prolong_data opd;
+        struct ost_prolong_data opd = { 0 };
         ENTRY;
 
         osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id);
@@ -562,16 +572,28 @@ static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
                 lock = ldlm_handle2lock(&oa->o_handle);
                 if (lock != NULL) {
                         ost_prolong_locks_iter(lock, &opd);
+                        if (opd.opd_lock_match) {
+                                LDLM_LOCK_PUT(lock);
+                                RETURN(1);
+                        }
+
+                        /* Check if the lock covers the whole IO region,
+                         * otherwise iterate through the resource. */
+                        if (lock->l_policy_data.l_extent.end >=
+                            opd.opd_policy.l_extent.end &&
+                            lock->l_policy_data.l_extent.start <=
+                            opd.opd_policy.l_extent.start) {
+                                LDLM_LOCK_PUT(lock);
+                                RETURN(0);
+                        }
                         LDLM_LOCK_PUT(lock);
-                        EXIT;
-                        return;
                 }
         }
 
+        opd.opd_oa = oa;
         ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
                               ost_prolong_locks_iter, &opd);
-
-        EXIT;
+        RETURN(opd.opd_lock_match);
 }
 
 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
@@ -586,7 +608,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         struct l_wait_info lwi;
         struct lustre_handle lockh = { 0 };
         __u32  size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
-        int objcount, niocount, npages, nob = 0, rc, i;
+        int niocount, npages, nob = 0, rc, i;
         int no_reply = 0;
         ENTRY;
 
@@ -608,49 +630,18 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (exp->exp_failed)
                 GOTO(out, rc = -ENOTCONN);
 
-        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
-                                  lustre_swab_ost_body);
-        if (body == NULL) {
-                CERROR("Missing/short ost_body\n");
-                GOTO(out, rc = -EFAULT);
-        }
-
-        objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
-                   sizeof(*ioo);
-        if (objcount == 0) {
-                CERROR("Missing/short ioobj\n");
-                GOTO(out, rc = -EFAULT);
-        }
-        if (objcount > 1) {
-                CERROR("too many ioobjs (%d)\n", objcount);
-                GOTO(out, rc = -EFAULT);
-        }
+        /* ost_body, ioobj & noibuf_remote are verified and swabbed in
+         * ost_rw_hpreq_check(). */
+        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+        LASSERT(body != NULL);
 
-        ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo),
-                                 lustre_swab_obd_ioobj);
-        if (ioo == NULL) {
-                CERROR("Missing/short ioobj\n");
-                GOTO(out, rc = -EFAULT);
-        }
+        ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo));
+        LASSERT(ioo != NULL);
 
         niocount = ioo->ioo_bufcnt;
-        if (niocount > PTLRPC_MAX_BRW_PAGES) {
-                DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",
-                          niocount);
-                GOTO(out, rc = -EFAULT);
-        }
-
-        remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
-                                       niocount * sizeof(*remote_nb),
-                                       lustre_swab_niobuf_remote);
-        if (remote_nb == NULL) {
-                CERROR("Missing/short niobuf\n");
-                GOTO(out, rc = -EFAULT);
-        }
-        if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
-                for (i = 1; i < niocount; i++)
-                        lustre_swab_niobuf_remote (&remote_nb[i]);
-        }
+        remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+                                   niocount * sizeof(*remote_nb));
+        LASSERT(remote_nb != NULL);
 
         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
                 capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3);
@@ -695,7 +686,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (desc == NULL) /* XXX: check all cleanup stuff */
                 GOTO(out, rc = -ENOMEM);
 
-        ost_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
+        ost_rw_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
 
         nob = 0;
         for (i = 0; i < npages; i++) {
@@ -860,7 +851,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         __u32                   *rcs;
         __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         int objcount, niocount, npages;
-        int rc, swab, i, j;
+        int rc, i, j;
         obd_count                client_cksum = 0, server_cksum = 0;
         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
         int                      no_reply = 0;
@@ -888,56 +879,22 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (exp->exp_failed)
                 GOTO(out, rc = -ENOTCONN);
 
-        swab = lustre_msg_swabbed(req->rq_reqmsg);
-        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
-                                  lustre_swab_ost_body);
-        if (body == NULL) {
-                CERROR("Missing/short ost_body\n");
-                GOTO(out, rc = -EFAULT);
-        }
+        /* ost_body, ioobj & noibuf_remote are verified and swabbed in
+         * ost_rw_hpreq_check(). */
+        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+        LASSERT(body != NULL);
 
-        lustre_set_req_swabbed(req, REQ_REC_OFF + 1);
         objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
-                   sizeof(*ioo);
-        if (objcount == 0) {
-                CERROR("Missing/short ioobj\n");
-                GOTO(out, rc = -EFAULT);
-        }
-        if (objcount > 1) {
-                CERROR("too many ioobjs (%d)\n", objcount);
-                GOTO(out, rc = -EFAULT);
-        }
-
+                sizeof(*ioo);
         ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
                              objcount * sizeof(*ioo));
-        LASSERT (ioo != NULL);
-        for (niocount = i = 0; i < objcount; i++) {
-                if (swab)
-                        lustre_swab_obd_ioobj(&ioo[i]);
-                if (ioo[i].ioo_bufcnt == 0) {
-                        CERROR("ioo[%d] has zero bufcnt\n", i);
-                        GOTO(out, rc = -EFAULT);
-                }
+        LASSERT(ioo != NULL);
+        for (niocount = i = 0; i < objcount; i++)
                 niocount += ioo[i].ioo_bufcnt;
-        }
 
-        if (niocount > PTLRPC_MAX_BRW_PAGES) {
-                DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",
-                          niocount);
-                GOTO(out, rc = -EFAULT);
-        }
-
-        remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
-                                       niocount * sizeof(*remote_nb),
-                                       lustre_swab_niobuf_remote);
-        if (remote_nb == NULL) {
-                CERROR("Missing/short niobuf\n");
-                GOTO(out, rc = -EFAULT);
-        }
-        if (swab) {                             /* swab the remaining niobufs */
-                for (i = 1; i < niocount; i++)
-                        lustre_swab_niobuf_remote (&remote_nb[i]);
-        }
+        remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+                                   niocount * sizeof(*remote_nb));
+        LASSERT(remote_nb != NULL);
 
         if (body->oa.o_valid & OBD_MD_FLOSSCAPA)
                 capa = lustre_unpack_capa(req->rq_reqmsg, REQ_REC_OFF + 3);
@@ -975,7 +932,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 GOTO(out_lock, rc = -ETIMEDOUT);
         }
 
-        ost_prolong_locks(exp, ioo, remote_nb,&body->oa,  LCK_PW);
+        ost_rw_prolong_locks(exp, ioo, remote_nb,&body->oa,  LCK_PW);
 
         /* obd_preprw clobbers oa->valid, so save what we need */
         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
@@ -1590,6 +1547,250 @@ int ost_msg_check_version(struct lustre_msg *msg)
         return rc;
 }
 
+static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
+                                   struct ldlm_lock *lock)
+{
+        struct niobuf_remote *nb;
+        struct obd_ioobj *ioo;
+        struct ost_body *body;
+        int objcount, niocount;
+        int mode, opc, i;
+        __u64 start, end;
+        ENTRY;
+
+        opc = lustre_msg_get_opc(req->rq_reqmsg);
+        LASSERT(opc == OST_READ || opc == OST_WRITE);
+
+        /* As the request may be covered by several locks, do not look at
+         * o_handle, look at the RPC IO region. */
+        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
+                                  lustre_swab_obdo);
+        objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
+                sizeof(*ioo);
+        ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
+                             objcount * sizeof(*ioo));
+        LASSERT(ioo != NULL);
+        for (niocount = i = 0; i < objcount; i++)
+                niocount += ioo[i].ioo_bufcnt;
+
+        nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+                            niocount * sizeof(*nb));
+        LASSERT(nb != NULL);
+
+        mode = LCK_PW;
+        if (opc == OST_READ)
+                mode |= LCK_PR;
+
+        start = nb[0].offset & CFS_PAGE_MASK;
+        end = (nb[ioo->ioo_bufcnt - 1].offset +
+               nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
+
+        if (!(lock->l_granted_mode & mode))
+                RETURN(0);
+
+        if (lock->l_policy_data.l_extent.end < start ||
+            lock->l_policy_data.l_extent.start > end)
+                RETURN(0);
+
+        RETURN(1);
+}
+
+/**
+ * Swab buffers needed to call ost_rw_prolong_locks() and call it.
+ * Return the value from ost_rw_prolong_locks() which is non-zero if
+ * there is a cancelled lock which is waiting for this IO request.
+ */
+static int ost_rw_hpreq_check(struct ptlrpc_request *req)
+{
+        struct niobuf_remote *nb;
+        struct obd_ioobj *ioo;
+        struct ost_body *body;
+        int objcount, niocount;
+        int mode, opc, i;
+        ENTRY;
+
+        opc = lustre_msg_get_opc(req->rq_reqmsg);
+        LASSERT(opc == OST_READ || opc == OST_WRITE);
+
+        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+        LASSERT(body != NULL);
+
+        objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
+                sizeof(*ioo);
+        ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
+                             objcount * sizeof(*ioo));
+        LASSERT(ioo != NULL);
+
+        for (niocount = i = 0; i < objcount; i++)
+                niocount += ioo[i].ioo_bufcnt;
+        nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+                            niocount * sizeof(*nb));
+        LASSERT(nb != NULL);
+        LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK));
+
+        mode = LCK_PW;
+        if (opc == OST_READ)
+                mode |= LCK_PR;
+        RETURN(ost_rw_prolong_locks(req->rq_export, ioo, nb, &body->oa, mode));
+}
+
+static int ost_punch_prolong_locks(struct obd_export *exp, struct obdo *oa)
+{
+        struct ldlm_res_id res_id = { .name = { oa->o_id } };
+        struct ost_prolong_data opd = { 0 };
+        __u64 start, end;
+        ENTRY;
+
+        start = oa->o_size;
+        end = start + oa->o_blocks;
+
+        opd.opd_mode = LCK_PW;
+        opd.opd_exp = exp;
+        opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
+        if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
+                opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
+        else
+                opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
+
+        CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
+               res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
+               opd.opd_policy.l_extent.end);
+
+        opd.opd_oa = oa;
+        ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
+                              ost_prolong_locks_iter, &opd);
+        RETURN(opd.opd_lock_match);
+}
+
+static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
+                                      struct ldlm_lock *lock)
+{
+        struct ost_body *body;
+        ENTRY;
+
+        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
+                                  lustre_swab_obdo);
+        LASSERT(body != NULL);
+
+        if (body->oa.o_valid & OBD_MD_FLHANDLE &&
+            body->oa.o_handle.cookie == lock->l_handle.h_cookie)
+                RETURN(1);
+        RETURN(0);
+}
+
+static int ost_punch_hpreq_check(struct ptlrpc_request *req)
+{
+        struct ost_body *body = lustre_msg_buf(req->rq_reqmsg,
+                                               REQ_REC_OFF, sizeof(*body));
+        LASSERT(body != NULL);
+        LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
+                !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
+
+        RETURN(ost_punch_prolong_locks(req->rq_export, &body->oa));
+}
+
+struct ptlrpc_hpreq_ops ost_hpreq_rw = {
+        .hpreq_lock_match  = ost_rw_hpreq_lock_match,
+        .hpreq_check       = ost_rw_hpreq_check,
+};
+
+struct ptlrpc_hpreq_ops ost_hpreq_punch = {
+        .hpreq_lock_match  = ost_punch_hpreq_lock_match,
+        .hpreq_check       = ost_punch_hpreq_check,
+};
+
+/** Assign high priority operations to the request if needed. */
+static int ost_hpreq_handler(struct ptlrpc_request *req)
+{
+        ENTRY;
+        if (req->rq_export) {
+                int opc = lustre_msg_get_opc(req->rq_reqmsg);
+                struct ost_body *body;
+
+                if (opc == OST_READ || opc == OST_WRITE) {
+                        struct niobuf_remote *nb;
+                        struct obd_ioobj *ioo;
+                        int objcount, niocount;
+                        int swab, i;
+
+                        body = lustre_swab_reqbuf(req, REQ_REC_OFF,
+                                                  sizeof(*body),
+                                                  lustre_swab_obdo);
+                        if (!body) {
+                                CERROR("Missing/short ost_body\n");
+                                RETURN(-EFAULT);
+                        }
+                        objcount = lustre_msg_buflen(req->rq_reqmsg,
+                                                     REQ_REC_OFF + 1) /
+                                sizeof(*ioo);
+                        if (objcount == 0) {
+                                CERROR("Missing/short ioobj\n");
+                                RETURN(-EFAULT);
+                        }
+                        if (objcount > 1) {
+                                CERROR("too many ioobjs (%d)\n", objcount);
+                                RETURN(-EFAULT);
+                        }
+
+                        swab = !lustre_req_swabbed(req, REQ_REC_OFF + 1) &&
+                                lustre_msg_swabbed(req->rq_reqmsg);
+                        ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
+                                                 objcount * sizeof(*ioo),
+                                                 lustre_swab_obd_ioobj);
+                        if (!ioo) {
+                                CERROR("Missing/short ioobj\n");
+                                RETURN(-EFAULT);
+                        }
+                        for (niocount = i = 0; i < objcount; i++) {
+                                if (i > 0 && swab)
+                                        lustre_swab_obd_ioobj(&ioo[i]);
+                                if (ioo[i].ioo_bufcnt == 0) {
+                                        CERROR("ioo[%d] has zero bufcnt\n", i);
+                                        RETURN(-EFAULT);
+                                }
+                                niocount += ioo[i].ioo_bufcnt;
+                        }
+                        if (niocount > PTLRPC_MAX_BRW_PAGES) {
+                                DEBUG_REQ(D_ERROR, req, "bulk has too many "
+                                          "pages (%d)", niocount);
+                                RETURN(-EFAULT);
+                        }
+
+                        swab = !lustre_req_swabbed(req, REQ_REC_OFF + 2) &&
+                                lustre_msg_swabbed(req->rq_reqmsg);
+                        nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
+                                                niocount * sizeof(*nb),
+                                                lustre_swab_niobuf_remote);
+                        if (!nb) {
+                                CERROR("Missing/short niobuf\n");
+                                RETURN(-EFAULT);
+                        }
+
+                        if (swab) {
+                                /* swab remaining niobufs */
+                                for (i = 1; i < niocount; i++)
+                                        lustre_swab_niobuf_remote(&nb[i]);
+                        }
+
+                        if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
+                                req->rq_ops = &ost_hpreq_rw;
+                } else if (opc == OST_PUNCH) {
+                        body = lustre_swab_reqbuf(req, REQ_REC_OFF,
+                                                  sizeof(*body),
+                                                  lustre_swab_obdo);
+                        if (!body) {
+                                CERROR("Missing/short ost_body\n");
+                                RETURN(-EFAULT);
+                        }
+
+                        if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
+                            !(body->oa.o_flags & OBD_FL_TRUNCLOCK))
+                                req->rq_ops = &ost_hpreq_punch;
+                }
+        }
+        RETURN(0);
+}
+
 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
 int ost_handle(struct ptlrpc_request *req)
 {
@@ -1955,7 +2156,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                 /* Insure a 4x range for dynamic threads */
                 if (oss_min_threads > OSS_THREADS_MAX / 4)
                         oss_min_threads = OSS_THREADS_MAX / 4;
-                oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4);
+                oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
         }
 
         ost->ost_service =
@@ -1965,7 +2166,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                                 ost_handle, LUSTRE_OSS_NAME,
                                 obd->obd_proc_entry, target_print_req,
                                 oss_min_threads, oss_max_threads,
-                                "ll_ost", LCT_DT_THREAD);
+                                "ll_ost", LCT_DT_THREAD, NULL);
         if (ost->ost_service == NULL) {
                 CERROR("failed to start service\n");
                 GOTO(out_lprocfs, rc = -ENOMEM);
@@ -1994,7 +2195,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                                 ost_handle, "ost_create",
                                 obd->obd_proc_entry, target_print_req,
                                 oss_min_create_threads, oss_max_create_threads,
-                                "ll_ost_creat", LCT_DT_THREAD);
+                                "ll_ost_creat", LCT_DT_THREAD, NULL);
         if (ost->ost_create_service == NULL) {
                 CERROR("failed to start OST create service\n");
                 GOTO(out_service, rc = -ENOMEM);
@@ -2011,7 +2212,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                                 ost_handle, "ost_io",
                                 obd->obd_proc_entry, target_print_req,
                                 oss_min_threads, oss_max_threads,
-                                "ll_ost_io", LCT_DT_THREAD);
+                                "ll_ost_io", LCT_DT_THREAD, ost_hpreq_handler);
         if (ost->ost_io_service == NULL) {
                 CERROR("failed to start OST I/O service\n");
                 GOTO(out_create, rc = -ENOMEM);
index 000904c..045365c 100644 (file)
@@ -538,6 +538,7 @@ static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
         CFS_INIT_LIST_HEAD(&request->rq_ctx_chain);
         CFS_INIT_LIST_HEAD(&request->rq_set_chain);
         CFS_INIT_LIST_HEAD(&request->rq_history_list);
+        CFS_INIT_LIST_HEAD(&request->rq_exp_list);
         cfs_waitq_init(&request->rq_reply_waitq);
         request->rq_xid = ptlrpc_next_xid();
         atomic_set(&request->rq_refcount, 1);
@@ -1705,6 +1706,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
+        LASSERTF(list_empty(&request->rq_exp_list), "req %p\n", request);
         LASSERTF(!request->rq_replay, "req %p\n", request);
         LASSERT(request->rq_cli_ctx);
 
index 29b805c..d5af015 100644 (file)
@@ -514,6 +514,32 @@ static int ptlrpc_lprocfs_rd_timeouts(char *page, char **start, off_t off,
         return rc;
 }
 
+static int ptlrpc_lprocfs_rd_hp_ratio(char *page, char **start, off_t off,
+                                      int count, int *eof, void *data)
+{
+        struct ptlrpc_service *svc = data;
+        int rc = snprintf(page, count, "%d", svc->srv_hpreq_ratio);
+        return rc;
+}
+
+static int ptlrpc_lprocfs_wr_hp_ratio(struct file *file, const char *buffer,
+                                      unsigned long count, void *data)
+{
+        struct ptlrpc_service *svc = data;
+        int rc, val;
+        
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc < 0)
+                return rc;
+        if (val < 0)
+                return -ERANGE;
+
+        spin_lock(&svc->srv_lock);
+        svc->srv_hpreq_ratio = val;
+        spin_unlock(&svc->srv_lock);
+        return count;
+}
+
 void ptlrpc_lprocfs_register_service(struct proc_dir_entry *entry,
                                      struct ptlrpc_service *svc)
 {
@@ -529,6 +555,10 @@ void ptlrpc_lprocfs_register_service(struct proc_dir_entry *entry,
                 {.name       = "timeouts",
                  .read_fptr  = ptlrpc_lprocfs_rd_timeouts,
                  .data       = svc},
+                {.name       = "high_priority_ratio",
+                 .read_fptr  = ptlrpc_lprocfs_rd_hp_ratio,
+                 .write_fptr = ptlrpc_lprocfs_wr_hp_ratio,
+                 .data       = svc},
                 {NULL}
         };
         static struct file_operations req_history_fops = {
index 2b9466a..8739098 100644 (file)
@@ -754,6 +754,9 @@ void *lustre_swab_buf(struct lustre_msg *msg, int index, int min_size,
 void *lustre_swab_reqbuf(struct ptlrpc_request *req, int index, int min_size,
                          void *swabber)
 {
+        if (lustre_req_swabbed(req, index))
+                return lustre_msg_buf(req->rq_reqmsg, index, min_size);
+
         lustre_set_req_swabbed(req, index);
         return lustre_swab_buf(req->rq_reqmsg, index, min_size, swabber);
 }
@@ -761,6 +764,9 @@ void *lustre_swab_reqbuf(struct ptlrpc_request *req, int index, int min_size,
 void *lustre_swab_repbuf(struct ptlrpc_request *req, int index, int min_size,
                          void *swabber)
 {
+        if (lustre_rep_swabbed(req, index))
+                return lustre_msg_buf(req->rq_repmsg, index, min_size);
+
         lustre_set_rep_swabbed(req, index);
         return lustre_swab_buf(req->rq_repmsg, index, min_size, swabber);
 }
index d8e7c86..03adc07 100644 (file)
@@ -211,6 +211,7 @@ EXPORT_SYMBOL(ptlrpc_start_thread);
 EXPORT_SYMBOL(ptlrpc_unregister_service);
 EXPORT_SYMBOL(ptlrpc_daemonize);
 EXPORT_SYMBOL(ptlrpc_service_health_check);
+EXPORT_SYMBOL(ptlrpc_hpreq_reorder);
 
 /* pack_generic.c */
 EXPORT_SYMBOL(lustre_msg_swabbed);
index d8d737d..a0bf907 100644 (file)
@@ -300,7 +300,7 @@ struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
                                c->psc_watchdog_factor,
                                h, name, proc_entry,
                                prntfn, c->psc_min_threads, c->psc_max_threads,
-                               threadname, c->psc_ctx_tags);
+                               threadname, c->psc_ctx_tags, NULL);
 }
 EXPORT_SYMBOL(ptlrpc_init_svc_conf);
 
@@ -320,7 +320,8 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
                 cfs_proc_dir_entry_t *proc_entry,
                 svcreq_printfn_t svcreq_printfn,
                 int min_threads, int max_threads,
-                char *threadname, __u32 ctx_tags)
+                char *threadname, __u32 ctx_tags,
+                svc_hpreq_handler_t hp_handler)
 {
         int                    rc;
         struct ptlrpc_service *service;
@@ -355,11 +356,16 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         service->srv_threads_max = max_threads;
         service->srv_thread_name = threadname;
         service->srv_ctx_tags = ctx_tags;
+        service->srv_hpreq_handler = hp_handler;
+        service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
+        service->srv_hpreq_count = 0;
+        service->srv_n_hpreq = 0;
 
         rc = LNetSetLazyPortal(service->srv_req_portal);
         LASSERT (rc == 0);
 
         CFS_INIT_LIST_HEAD(&service->srv_request_queue);
+        CFS_INIT_LIST_HEAD(&service->srv_request_hpq);
         CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds);
         CFS_INIT_LIST_HEAD(&service->srv_active_rqbds);
         CFS_INIT_LIST_HEAD(&service->srv_history_rqbds);
@@ -520,6 +526,11 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
 {
         struct ptlrpc_service  *svc = req->rq_rqbd->rqbd_service;
 
+        if (req->rq_export) {
+                class_export_put(req->rq_export);
+                req->rq_export = NULL;
+        }
+
         if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
                 DEBUG_REQ(D_INFO, req, "free req");
 
@@ -537,7 +548,7 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
 static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
 {
         struct obd_export *oldest_exp;
-        time_t oldest_time;
+        time_t oldest_time, new_time;
 
         ENTRY;
 
@@ -548,9 +559,13 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
            of the list, we can be really lazy here - we don't have to evict
            at the exact right moment.  Eventually, all silent exports
            will make it to the top of the list. */
-        exp->exp_last_request_time = max(exp->exp_last_request_time,
-                                         cfs_time_current_sec() + extra_delay);
 
+        /* Do not pay attention on 1sec or smaller renewals. */
+        new_time = cfs_time_current_sec() + extra_delay;
+        if (exp->exp_last_request_time + 1 /*second */ >= new_time)
+                RETURN_EXIT;
+
+        exp->exp_last_request_time = new_time;
         CDEBUG(D_HA, "updating export %s at "CFS_TIME_T" exp %p\n",
                exp->exp_client_uuid.uuid,
                exp->exp_last_request_time, exp);
@@ -563,8 +578,7 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
         if (list_empty(&exp->exp_obd_chain_timed)) {
                 /* this one is not timed */
                 spin_unlock(&exp->exp_obd->obd_dev_lock);
-                EXIT;
-                return;
+                RETURN_EXIT;
         }
 
         list_move_tail(&exp->exp_obd_chain_timed,
@@ -924,6 +938,167 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
         RETURN(0);
 }
 
+/**
+ * Put the request to the export list if the request may become
+ * a high priority one.
+ */
+static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
+                             struct ptlrpc_request *req)
+{
+        int rc;
+        ENTRY;
+
+        if (svc->srv_hpreq_handler) {
+                rc = svc->srv_hpreq_handler(req);
+                if (rc)
+                        RETURN(rc);
+        }
+        if (req->rq_export && req->rq_ops) {
+                spin_lock(&req->rq_export->exp_lock);
+                list_add(&req->rq_exp_list, &req->rq_export->exp_queued_rpc);
+                spin_unlock(&req->rq_export->exp_lock);
+        }
+
+        RETURN(0);
+}
+
+/** Remove the request from the export list. */
+static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
+{
+        ENTRY;
+        if (req->rq_export && req->rq_ops) {
+                spin_lock(&req->rq_export->exp_lock);
+                list_del_init(&req->rq_exp_list);
+                spin_unlock(&req->rq_export->exp_lock);
+        }
+        EXIT;
+}
+
+/**
+ * Make the request a high priority one.
+ *
+ * All the high priority requests are queued in a separate FIFO
+ * ptlrpc_service::srv_request_hpq list which is parallel to
+ * ptlrpc_service::srv_request_queue list but has a higher priority
+ * for handling.
+ *
+ * \see ptlrpc_server_handle_request().
+ */
+static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc,
+                                        struct ptlrpc_request *req)
+{
+        ENTRY;
+        LASSERT(svc != NULL);
+        spin_lock(&req->rq_lock);
+        if (req->rq_hp == 0) {
+                int opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+                /* Add to the high priority queue. */
+                list_move_tail(&req->rq_list, &svc->srv_request_hpq);
+                req->rq_hp = 1;
+                if (opc != OBD_PING)
+                        DEBUG_REQ(D_NET, req, "high priority req");
+        }
+        spin_unlock(&req->rq_lock);
+        EXIT;
+}
+
+void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
+{
+        struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+        ENTRY;
+
+        spin_lock(&svc->srv_lock);
+        /* It may happen that the request is already taken for the processing
+         * but still in the export list, do not re-add it into the HP list. */
+        if (req->rq_phase == RQ_PHASE_NEW)
+                ptlrpc_hpreq_reorder_nolock(svc, req);
+        spin_unlock(&svc->srv_lock);
+        EXIT;
+}
+
+/** Check if the request if a high priority one. */
+static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req)
+{
+        int opc, rc = 0;
+        ENTRY;
+
+        /* Check by request opc. */
+        opc = lustre_msg_get_opc(req->rq_reqmsg);
+        if (opc == OBD_PING)
+                RETURN(1);
+
+        /* Perform request specific check. */
+        if (req->rq_ops && req->rq_ops->hpreq_check)
+                rc = req->rq_ops->hpreq_check(req);
+        RETURN(rc);
+}
+
+/** Check if a request is a high priority one. */
+static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
+                                     struct ptlrpc_request *req)
+{
+        int rc;
+        ENTRY;
+
+        rc = ptlrpc_server_hpreq_check(req);
+        if (rc < 0)
+                RETURN(rc);
+
+        spin_lock(&svc->srv_lock);
+        /* Before inserting the request into the queue, check if it is not
+         * inserted yet, or even already handled -- it may happen due to
+         * a racing ldlm_server_blocking_ast(). */
+        if (req->rq_phase == RQ_PHASE_NEW && list_empty(&req->rq_list)) {
+                if (rc)
+                        ptlrpc_hpreq_reorder_nolock(svc, req);
+                else
+                        list_add_tail(&req->rq_list, &svc->srv_request_queue);
+        }
+        spin_unlock(&svc->srv_lock);
+
+        RETURN(0);
+}
+
+/* Only allow normal priority requests on a service that has a high-priority
+ * queue if forced (i.e. cleanup), if there are other high priority requests
+ * already being processed (i.e. those threads can service more high-priority
+ * requests), or if there are enough idle threads that a later thread can do
+ * a high priority request. */
+static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
+{
+        return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
+               svc->srv_n_active_reqs < svc->srv_threads_running - 2;
+}
+
+static struct ptlrpc_request *
+ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
+{
+        struct ptlrpc_request *req = NULL;
+        ENTRY;
+
+        if (ptlrpc_server_allow_normal(svc, force) &&
+            !list_empty(&svc->srv_request_queue) &&
+            (list_empty(&svc->srv_request_hpq) ||
+             svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
+                req = list_entry(svc->srv_request_queue.next,
+                                 struct ptlrpc_request, rq_list);
+                svc->srv_hpreq_count = 0;
+        } else if (!list_empty(&svc->srv_request_hpq)) {
+                req = list_entry(svc->srv_request_hpq.next,
+                                 struct ptlrpc_request, rq_list);
+                svc->srv_hpreq_count++;
+        }
+        RETURN(req);
+}
+
+static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
+{
+        return ((ptlrpc_server_allow_normal(svc, force) &&
+                 !list_empty(&svc->srv_request_queue)) ||
+                !list_empty(&svc->srv_request_hpq));
+}
+
 /* Handle freshly incoming reqs, add to timed early reply list,
    pass on to regular request queue */
 static int
@@ -1003,10 +1178,9 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
                                           "illegal security flavor,");
                 }
 
-                class_export_put(req->rq_export);
-                req->rq_export = NULL;
                 if (rc)
                         goto err_req;
+                ptlrpc_update_export_timer(req->rq_export, 0);
         }
 
         /* req_in handling should/must be fast */
@@ -1027,12 +1201,15 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
         }
 
         ptlrpc_at_add_timed(req);
+        rc = ptlrpc_hpreq_init(svc, req);
+        if (rc)
+                GOTO(err_req, rc);
 
         /* Move it over to the request processing queue */
-        spin_lock(&svc->srv_lock);
-        list_add_tail(&req->rq_list, &svc->srv_request_queue);
+        rc = ptlrpc_server_request_add(svc, req);
+        if (rc)
+                GOTO(err_req, rc);
         cfs_waitq_signal(&svc->srv_waitq);
-        spin_unlock(&svc->srv_lock);
         RETURN(1);
 
 err_req:
@@ -1054,13 +1231,14 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         struct timeval         work_start;
         struct timeval         work_end;
         long                   timediff;
-        int                    rc;
+        int                    opc, rc;
+        int                    fail_opc = 0;
         ENTRY;
 
         LASSERT(svc);
 
         spin_lock(&svc->srv_lock);
-        if (unlikely(list_empty (&svc->srv_request_queue) ||
+        if (unlikely(!ptlrpc_server_request_pending(svc, 0) ||
             (
 #ifndef __KERNEL__
              /* !@%$# liblustre only has 1 thread */
@@ -1073,16 +1251,47 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                   * That means we always need at least 2 service threads. */
                 spin_unlock(&svc->srv_lock);
                 RETURN(0);
+             }
+
+        request = ptlrpc_server_request_get(svc, 0);
+        if  (request == NULL) {
+                spin_unlock(&svc->srv_lock);
+                RETURN(0);
+        }
+
+        opc = lustre_msg_get_opc(request->rq_reqmsg);
+        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT))
+                fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT;
+        else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+                fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT;
+
+        if (unlikely(fail_opc)) {
+                if (request->rq_export && request->rq_ops) {
+                        spin_unlock(&svc->srv_lock);
+                        OBD_FAIL_TIMEOUT(fail_opc, 4);
+                        spin_lock(&svc->srv_lock);
+                        request = ptlrpc_server_request_get(svc, 0);
+                        if  (request == NULL) {
+                                spin_unlock(&svc->srv_lock);
+                                RETURN(0);
+                        }
+                        LASSERT(ptlrpc_server_request_pending(svc, 0));
+                }
         }
 
-        request = list_entry (svc->srv_request_queue.next,
-                              struct ptlrpc_request, rq_list);
-        list_del_init (&request->rq_list);
+        list_del_init(&request->rq_list);
         svc->srv_n_queued_reqs--;
         svc->srv_n_active_reqs++;
+        if (request->rq_hp)
+                svc->srv_n_hpreq++;
 
+        /* The phase is changed under the lock here because we need to know
+         * the request is under processing (see ptlrpc_hpreq_reorder()). */
+        ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
         spin_unlock(&svc->srv_lock);
 
+        ptlrpc_hpreq_fini(request);
+
         if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
                 libcfs_debug_dumplog();
 
@@ -1115,9 +1324,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         if (thread)
                 request->rq_svc_thread->t_env->le_ses = &request->rq_session;
 
-        request->rq_export = class_conn2export(
-                                     lustre_msg_get_handle(request->rq_reqmsg));
-
         if (likely(request->rq_export)) {
                 if (unlikely(ptlrpc_check_req(request)))
                         goto put_conn;
@@ -1138,8 +1344,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                 goto put_rpc_export;
         }
 
-        ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
-
         CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc "
                "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
                (request->rq_export ?
@@ -1170,9 +1374,6 @@ put_rpc_export:
         if (export != NULL)
                 class_export_rpc_put(export);
 put_conn:
-        if (likely(request->rq_export != NULL))
-                class_export_put(request->rq_export);
-
         lu_context_exit(&request->rq_session);
         lu_context_fini(&request->rq_session);
 
@@ -1217,6 +1418,10 @@ put_conn:
         }
 
 out_req:
+        spin_lock(&svc->srv_lock);
+        if (request->rq_hp)
+                svc->srv_n_hpreq--;
+        spin_unlock(&svc->srv_lock);
         ptlrpc_server_finish_request(request);
 
         RETURN(1);
@@ -1515,7 +1720,7 @@ static int ptlrpc_main(void *arg)
                                svc->srv_rqbd_timeout == 0) ||
                               !list_empty(&svc->srv_req_in_queue) ||
                               !list_empty(&svc->srv_reply_queue) ||
-                              (!list_empty(&svc->srv_request_queue) &&
+                              (ptlrpc_server_request_pending(svc, 0) &&
                                (svc->srv_n_active_reqs <
                                 (svc->srv_threads_running - 1))) ||
                               svc->srv_at_check,
@@ -1550,7 +1755,7 @@ static int ptlrpc_main(void *arg)
                         ptlrpc_at_check_timed(svc);
 
                 /* don't handle requests in the last thread */
-                if (!list_empty (&svc->srv_request_queue) &&
+                if (ptlrpc_server_request_pending(svc, 0) &&
                     (svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) {
                         lu_context_enter(&env.le_ctx);
                         ptlrpc_server_handle_request(svc, thread);
@@ -1806,16 +2011,14 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 service->srv_n_active_reqs++;
                 ptlrpc_server_finish_request(req);
         }
-        while (!list_empty(&service->srv_request_queue)) {
-                struct ptlrpc_request *req =
-                        list_entry(service->srv_request_queue.next,
-                                   struct ptlrpc_request,
-                                   rq_list);
+        while (ptlrpc_server_request_pending(service, 1)) {
+                struct ptlrpc_request *req;
 
+                req = ptlrpc_server_request_get(service, 1);
                 list_del(&req->rq_list);
                 service->srv_n_queued_reqs--;
                 service->srv_n_active_reqs++;
-
+                ptlrpc_hpreq_fini(req);
                 ptlrpc_server_finish_request(req);
         }
         LASSERT(service->srv_n_queued_reqs == 0);
@@ -1879,14 +2082,18 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc)
         do_gettimeofday(&right_now);
 
         spin_lock(&svc->srv_lock);
-        if (list_empty(&svc->srv_request_queue)) {
+        if (!ptlrpc_server_request_pending(svc, 1)) {
                 spin_unlock(&svc->srv_lock);
                 return 0;
         }
 
         /* How long has the next entry been waiting? */
-        request = list_entry(svc->srv_request_queue.next,
-                             struct ptlrpc_request, rq_list);
+        if (list_empty(&svc->srv_request_queue))
+                request = list_entry(svc->srv_request_hpq.next,
+                                     struct ptlrpc_request, rq_list);
+        else
+                request = list_entry(svc->srv_request_queue.next,
+                                     struct ptlrpc_request, rq_list);
         timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
         spin_unlock(&svc->srv_lock);
 
index 589573c..08b4899 100644 (file)
@@ -779,6 +779,56 @@ run_test 33a "commit on sharing, cross crete/delete, 2 clients, benchmark"
 
 # End commit on sharing tests
 
+test_33() { #16129
+        for OPER in notimeout timeout ; do
+                rm $DIR1/$tfile 2>/dev/null
+                lock_in=0;
+                for f in `lctl get_param -n ldlm/namespaces/*/lock_timeouts`; do
+                        lock_in=$(($lock_in + $f))
+                done
+                if [ $OPER == "timeout" ] ; then
+                        for j in `seq $OSTCOUNT`; do
+                                #define OBD_FAIL_PTLRPC_HPREQ_TIMEOUT    0x510
+                                do_facet ost$j lctl set_param fail_loc=0x510
+                        done
+                        echo lock should expire
+                else
+                        for j in `seq $OSTCOUNT`; do
+                                #define OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT  0x511
+                                do_facet ost$j lctl set_param fail_loc=0x511
+                        done
+                        echo lock should not expire
+                fi
+                echo writing on client1
+                dd if=/dev/zero of=$DIR1/$tfile count=100 conv=notrunc > /dev/null 2>&1
+                sync &
+                # wait for the flush
+                sleep 1
+                echo reading on client2
+                dd of=/dev/null if=$DIR2/$tfile > /dev/null 2>&1
+                # wait for a lock timeout
+                sleep 4
+                lock_out=0
+                for f in `lctl get_param -n ldlm/namespaces/*/lock_timeouts`; do
+                        lock_out=$(($lock_out + $f))
+                done
+                if [ $OPER == "timeout" ] ; then 
+                        if [ $lock_in == $lock_out ]; then
+                                error "no lock timeout happened"
+                        else
+                                echo "success"
+                        fi
+                else
+                        if [ $lock_in != $lock_out ]; then
+                                error "lock timeout happened"
+                        else
+                                echo "success"
+                        fi
+                fi
+        done
+}
+run_test 33 "no lock timeout under IO"
+
 log "cleanup: ======================================================"
 
 check_and_cleanup_lustre