Whamcloud - gitweb
- ELC fixes and optimizations
authortappro <tappro>
Thu, 13 Sep 2007 21:04:13 +0000 (21:04 +0000)
committertappro <tappro>
Thu, 13 Sep 2007 21:04:13 +0000 (21:04 +0000)
  b=13060
  i=vitaly,huanghua

34 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_dlm.h
lustre/include/lustre_mds.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/include/obd_ost.h
lustre/include/obd_support.h
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/liblustre/dir.c
lustre/liblustre/rw.c
lustre/liblustre/super.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/lmv/lmv_obd.c
lustre/lov/lov_internal.h
lustre/lov/lov_obd.c
lustre/lov/lov_request.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_locks.c
lustre/mds/mds_lib.c
lustre/mds/mds_reint.c
lustre/mdt/mdt_reint.c
lustre/mgc/mgc_request.c
lustre/obdecho/echo_client.c
lustre/obdfilter/filter.c
lustre/osc/osc_request.c
lustre/ptlrpc/wiretest.c
lustre/tests/recovery-small.sh
lustre/tests/sanity.sh
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 9cd7877..3eadc05 100644 (file)
@@ -520,9 +520,10 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
 #define OBD_CONNECT_QUOTA64    0x00080000ULL /* 64bit qunit_data.qd_count b=10707*/
 #define OBD_CONNECT_MDS_CAPA   0x00100000ULL /* MDS capability */
 #define OBD_CONNECT_OSS_CAPA   0x00200000ULL /* OSS capability */
-#define OBD_CONNECT_MDS_MDS    0x00400000ULL /* MDS-MDS connection*/
+#define OBD_CONNECT_CANCELSET  0x00400000ULL /* Early batched cancels. */
 #define OBD_CONNECT_SOM        0x00800000ULL /* SOM feature */
-#define OBD_CONNECT_CANCELSET  0x01000000ULL /* Early batched cancels. */
+#define OBD_CONNECT_AT         0x01000000ULL /* client uses adaptive timeouts */
+#define OBD_CONNECT_MDS_MDS    0x02000000ULL /* MDS-MDS connection*/
 #define OBD_CONNECT_REAL       0x00000200ULL /* real connection */
 /* also update obd_connect_names[] for lprocfs_rd_connect_flags()
  * and lustre/utils/wirecheck.c */
index cbf2b59..8c7e472 100644 (file)
@@ -128,8 +128,21 @@ typedef enum {
  * w/o involving separate thread. in order to decrease cs rate */
 #define LDLM_FL_ATOMIC_CB      0x4000000
 
+/* It may happen that a client initiate 2 operations, e.g. unlink and mkdir,
+ * such that server send blocking ast for conflict locks to this client for
+ * the 1st operation, whereas the 2nd operation has canceled this lock and
+ * is waiting for rpc_lock which is taken by the 1st operation.
+ * LDLM_FL_BL_AST is to be set by ldlm_callback_handler() to the lock not allow
+ * ELC code to cancel it. 
+ * LDLM_FL_BL_DONE is to be set by ldlm_cancel_callback() when lock cache is
+ * droped to let ldlm_callback_handler() return EINVAL to the server. It is
+ * used when ELC rpc is already prepared and is waiting for rpc_lock, too late
+ * to send a separate CANCEL rpc. */
+#define LDLM_FL_BL_AST          0x10000000
+#define LDLM_FL_BL_DONE         0x20000000
+
 /* Cancel lock asynchronously. See ldlm_cli_cancel_unused_resource. */
-#define LDLM_FL_ASYNC           0x20000000
+#define LDLM_FL_ASYNC           0x40000000
 
 /* The blocking callback is overloaded to perform two functions.  These flags
  * indicate which operation should be performed. */
@@ -364,6 +377,16 @@ struct ldlm_ast_work {
         int w_datalen;
 };
 
+/* ldlm_enqueue parameters common */
+struct ldlm_enqueue_info {
+        __u32 ei_type;   /* Type of the lock being enqueued. */
+        __u32 ei_mode;   /* Mode of the lock being enqueued. */
+        void *ei_cb_bl;  /* Different callbacks for lock handling (blocking, */
+        void *ei_cb_cp;  /* completion, glimpse) */
+        void *ei_cb_gl;
+        void *ei_cbdata; /* Data to be passed into callbacks. */
+};
+
 extern struct obd_ops ldlm_obd_ops;
 
 extern char *ldlm_lockname[];
@@ -434,7 +457,6 @@ int ldlm_replay_locks(struct obd_import *imp);
 void ldlm_resource_iterate(struct ldlm_namespace *, const struct ldlm_res_id *,
                            ldlm_iterator_t iter, void *data);
 
-
 /* ldlm_flock.c */
 int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data);
 
@@ -476,6 +498,18 @@ static inline struct ldlm_lock *ldlm_handle2lock(const struct lustre_handle *h)
         return __ldlm_handle2lock(h, 0);
 }
 
+static inline int ldlm_res_lvbo_update(struct ldlm_resource *res,
+                                       struct lustre_msg *m, int buf_idx,
+                                       int increase)
+{
+        if (res->lr_namespace->ns_lvbo &&
+            res->lr_namespace->ns_lvbo->lvbo_update) {
+                return res->lr_namespace->ns_lvbo->lvbo_update(res, m, buf_idx,
+                                                               increase);
+        }
+        return 0;
+}
+
 #define LDLM_LOCK_PUT(lock)                     \
 do {                                            \
         /*LDLM_DEBUG((lock), "put");*/          \
@@ -564,13 +598,10 @@ int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
 int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp);
 int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data);
 int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
+                     struct ldlm_enqueue_info *einfo,
                      const struct ldlm_res_id *res_id,
-                     ldlm_type_t type, ldlm_policy_data_t *policy,
-                     ldlm_mode_t mode, int *flags,
-                     ldlm_blocking_callback blocking,
-                     ldlm_completion_callback completion,
-                     ldlm_glimpse_callback glimpse,
-                     void *data, void *lvb, __u32 lvb_len, void *lvb_swabber,
+                     ldlm_policy_data_t *policy, int *flags,
+                     void *lvb, __u32 lvb_len, void *lvb_swabber,
                      struct lustre_handle *lockh, int async);
 struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
                                              int bufcount, int *size,
@@ -603,7 +634,7 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *, const struct ldlm_res_id *,
 int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
                                     const struct ldlm_res_id *res_id,
                                     ldlm_policy_data_t *policy,
-                                    int mode, int flags, void *opaque);
+                                    ldlm_mode_t mode, int flags, void *opaque);
 int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *head,
                         int count, int flags);
 int ldlm_cli_join_lru(struct ldlm_namespace *,
index c36024d..b051786 100644 (file)
@@ -56,6 +56,7 @@ struct mds_update_record {
         __u32 ur_mode;
         __u32 ur_flags;
         struct lvfs_grp_hash_entry *ur_grp_entry;
+        struct ldlm_request *ur_dlm;
 };
 
 /* file data for open files on MDS */
index 9446c16..d16b2ea 100644 (file)
@@ -132,31 +132,15 @@ struct obd_info;
 
 typedef int (*obd_enqueue_update_f)(struct obd_info *oinfo, int rc);
 
-/* obd_enqueue parameters common for all levels (lov, osc). */
-struct obd_enqueue_info {
-        /* Flags used while lock handling. */
-        int   ei_flags;
-        /* Type of the lock being enqueued. */
-        __u32 ei_type;
-        /* Mode of the lock being enqueued. */
-        __u32 ei_mode;
-        /* Different callbacks for lock handling (blocking, completion,
-           glimpse */
-        void *ei_cb_bl;
-        void *ei_cb_cp;
-        void *ei_cb_gl;
-        /* Data to be passed into callbacks. */
-        void *ei_cbdata;
-        /* Request set for OSC async requests. */
-        struct ptlrpc_request_set *ei_rqset;
-};
-
 /* obd info for a particular level (lov, osc). */
 struct obd_info {
         /* Lock policy. It keeps an extent which is specific for a particular
          * OSC. (e.g. lov_prep_enqueue_set initialises extent of the policy,
          * and osc_enqueue passes it into ldlm_lock_match & ldlm_cli_enqueue. */
         ldlm_policy_data_t      oi_policy;
+        /* Flags used while lock handling. The flags obtained on the enqueue
+         * request are set here, therefore they are request specific. */
+        int                     oi_flags;
         /* Lock handle specific for every OSC lock. */
         struct lustre_handle   *oi_lockh;
         /* lsm data specific for every OSC. */
@@ -1144,7 +1128,8 @@ struct obd_ops {
                           int niocount, struct niobuf_local *local,
                           struct obd_trans_info *oti, int rc);
         int (*o_enqueue)(struct obd_export *, struct obd_info *oinfo,
-                         struct obd_enqueue_info *einfo);
+                         struct ldlm_enqueue_info *einfo,
+                         struct ptlrpc_request_set *rqset);
         int (*o_match)(struct obd_export *, struct lov_stripe_md *, __u32 type,
                        ldlm_policy_data_t *, __u32 mode, int *flags, void *data,
                        struct lustre_handle *lockh);
@@ -1241,10 +1226,9 @@ struct md_ops {
                         __u64, struct ptlrpc_request **);
         int (*m_done_writing)(struct obd_export *, struct md_op_data  *,
                               struct obd_client_handle *);
-        int (*m_enqueue)(struct obd_export *, int, struct lookup_intent *,
-                         int, struct md_op_data *, struct lustre_handle *,
-                         void *, int, ldlm_completion_callback,
-                         ldlm_blocking_callback, void *, int);
+        int (*m_enqueue)(struct obd_export *, struct ldlm_enqueue_info *,
+                         struct lookup_intent *, struct md_op_data *,
+                         struct lustre_handle *, void *, int, int);
         int (*m_getattr)(struct obd_export *, const struct lu_fid *,
                          struct obd_capa *, obd_valid, int,
                          struct ptlrpc_request **);
index 90662fe..385df8f 100644 (file)
@@ -1357,30 +1357,30 @@ static inline int obd_iocontrol(unsigned int cmd, struct obd_export *exp,
 
 static inline int obd_enqueue_rqset(struct obd_export *exp,
                                     struct obd_info *oinfo,
-                                    struct obd_enqueue_info *einfo)
+                                    struct ldlm_enqueue_info *einfo)
 {
+        struct ptlrpc_request_set *set = NULL;
         int rc;
         ENTRY;
 
         EXP_CHECK_DT_OP(exp, enqueue);
         EXP_COUNTER_INCREMENT(exp, enqueue);
 
-        einfo->ei_rqset =  ptlrpc_prep_set();
-        if (einfo->ei_rqset == NULL)
+        set =  ptlrpc_prep_set();
+        if (set == NULL)
                 RETURN(-ENOMEM);
 
-        rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo);
+        rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo, set);
         if (rc == 0)
-                rc = ptlrpc_set_wait(einfo->ei_rqset);
-        ptlrpc_set_destroy(einfo->ei_rqset);
-        einfo->ei_rqset = NULL;
-
+                rc = ptlrpc_set_wait(set);
+        ptlrpc_set_destroy(set);
         RETURN(rc);
 }
 
 static inline int obd_enqueue(struct obd_export *exp,
                               struct obd_info *oinfo,
-                              struct obd_enqueue_info *einfo)
+                              struct ldlm_enqueue_info *einfo,
+                              struct ptlrpc_request_set *set)
 {
         int rc;
         ENTRY;
@@ -1388,7 +1388,7 @@ static inline int obd_enqueue(struct obd_export *exp,
         EXP_CHECK_DT_OP(exp, enqueue);
         EXP_COUNTER_INCREMENT(exp, enqueue);
 
-        rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo);
+        rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo, set);
         RETURN(rc);
 }
 
@@ -1714,23 +1714,20 @@ static inline int md_done_writing(struct obd_export *exp,
         RETURN(rc);
 }
 
-static inline int md_enqueue(struct obd_export *exp, int lock_type,
-                             struct lookup_intent *it, int lock_mode,
+static inline int md_enqueue(struct obd_export *exp,
+                             struct ldlm_enqueue_info *einfo,
+                             struct lookup_intent *it,
                              struct md_op_data *op_data,
                              struct lustre_handle *lockh,
                              void *lmm, int lmmsize,
-                             ldlm_completion_callback cb_completion,
-                             ldlm_blocking_callback cb_blocking,
-                             void *cb_data, int extra_lock_flags)
+                             int extra_lock_flags)
 {
         int rc;
         ENTRY;
         EXP_CHECK_MD_OP(exp, enqueue);
         EXP_MD_COUNTER_INCREMENT(exp, enqueue);
-        rc = MDP(exp->exp_obd, enqueue)(exp, lock_type, it, lock_mode,
-                                        op_data, lockh, lmm, lmmsize,
-                                        cb_completion, cb_blocking,
-                                        cb_data, extra_lock_flags);
+        rc = MDP(exp->exp_obd, enqueue)(exp, einfo, it, op_data, lockh,
+                                        lmm, lmmsize, extra_lock_flags);
         RETURN(rc);
 }
 
index cfc9e5d..23bbb04 100644 (file)
@@ -31,7 +31,7 @@ struct osc_async_args {
 struct osc_enqueue_args {
         struct obd_export       *oa_exp;
         struct obd_info         *oa_oi;
-        struct obd_enqueue_info *oa_ei;
+        struct ldlm_enqueue_info*oa_ei;
 };
 
 #endif
index 1bfe62d..f6717a1 100644 (file)
@@ -162,6 +162,8 @@ extern int obd_race_state;
 #define OBD_FAIL_LDLM_RECOV_CLIENTS      0x30d
 #define OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT 0x30e
 #define OBD_FAIL_LDLM_GLIMPSE            0x30f
+#define OBD_FAIL_LDLM_CANCEL_RACE        0x310
+#define OBD_FAIL_LDLM_CANCEL_EVICT_RACE  0x311
 
 #define OBD_FAIL_OSC                     0x400
 #define OBD_FAIL_OSC_BRW_READ_BULK       0x401
index 7d9e1bf..82fbeed 100644 (file)
@@ -382,7 +382,8 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
 
                 ldlm_resource_unlink_lock(lock);
 
-                ldlm_extent_policy(res, lock, flags);
+                if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
+                        ldlm_extent_policy(res, lock, flags);
                 ldlm_grant_lock(lock, work_list);
                 RETURN(LDLM_ITER_CONTINUE);
         }
index 62b2f03..766aeba 100644 (file)
@@ -1389,6 +1389,7 @@ void ldlm_cancel_callback(struct ldlm_lock *lock)
                         LDLM_DEBUG(lock, "no blocking ast");
                 }
         }
+        lock->l_flags |= LDLM_FL_BL_DONE;
 }
 
 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
@@ -1504,6 +1505,8 @@ void ldlm_cancel_locks_for_export(struct obd_export *exp)
                 spin_unlock(&exp->exp_ldlm_data.led_lock);
 
                 LDLM_DEBUG(lock, "export %p", exp);
+                ldlm_res_lvbo_update(res, NULL, 0, 1);
+
                 ldlm_lock_cancel(lock);
                 ldlm_reprocess_all(res);
 
index d5385ed..5b22f25 100644 (file)
@@ -582,9 +582,14 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 rc = ptlrpc_queue_wait(req);
                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
         }
-
-        if (rc != 0)
+        if (rc != 0) {
+                /* If client canceled the lock but the cancel has not been
+                 * recieved yet, we need to update lvbo to have the proper
+                 * attributes cached. */
+                if (rc == -EINVAL)
+                        ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1);
                 rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
+        }
 
         ptlrpc_req_finished(req);
 
@@ -734,8 +739,8 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
         else if (rc != 0)
                 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
         else
-                rc = res->lr_namespace->ns_lvbo->lvbo_update
-                        (res, req->rq_repmsg, REPLY_REC_OFF, 1);
+                rc = ldlm_res_lvbo_update(res, req->rq_repmsg,
+                                          REPLY_REC_OFF, 1);
         ptlrpc_req_finished(req);
         RETURN(rc);
 }
@@ -996,8 +1001,8 @@ existing_lock:
                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
                            "(err=%d, rc=%d)", err, rc);
 
+                lock_res_and_lock(lock);
                 if (rc == 0) {
-                        lock_res_and_lock(lock);
                         size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
                         if (size[DLM_REPLY_REC_OFF] > 0) {
                                 void *lvb = lustre_msg_buf(req->rq_repmsg,
@@ -1009,13 +1014,11 @@ existing_lock:
                                 memcpy(lvb, lock->l_resource->lr_lvb_data,
                                        size[DLM_REPLY_REC_OFF]);
                         }
-                        unlock_res_and_lock(lock);
                 } else {
-                        lock_res_and_lock(lock);
                         ldlm_resource_unlink_lock(lock);
                         ldlm_lock_destroy_nolock(lock);
-                        unlock_res_and_lock(lock);
                 }
+                unlock_res_and_lock(lock);
 
                 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
                         ldlm_reprocess_all(lock->l_resource);
@@ -1134,6 +1137,8 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
         int i, count, done = 0;
         ENTRY;
 
+        LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, "
+                          "starting at %d", dlm_req->lock_count, first);
         count = dlm_req->lock_count ? dlm_req->lock_count : 1;
         if (first >= count)
                 RETURN(0);
@@ -1143,8 +1148,6 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
                 RETURN(0);
 
-        LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks",
-                          count - first);
         for (i = first; i < count; i++) {
                 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
                 if (!lock) {
@@ -1156,32 +1159,22 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
 
                 res = lock->l_resource;
                 done++;
-                ldlm_lock_cancel(lock);
-                if (ldlm_del_waiting_lock(lock))
-                        CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
 
                 if (res != pres) {
                         if (pres != NULL) {
-                                if (pres->lr_namespace->ns_lvbo &&
-                                    pres->lr_namespace->ns_lvbo->lvbo_update) {
-                                        (void)pres->lr_namespace->ns_lvbo->
-                                                lvbo_update(pres, NULL, 0, 1);
-                                }
                                 ldlm_reprocess_all(pres);
                                 ldlm_resource_putref(pres);
                         }
-                        if (res != NULL)
+                        if (res != NULL) {
                                 ldlm_resource_getref(res);
+                                ldlm_res_lvbo_update(res, NULL, 0, 1);
+                        }
                         pres = res;
                 }
+                ldlm_lock_cancel(lock);
                 LDLM_LOCK_PUT(lock);
         }
         if (pres != NULL) {
-                if (pres->lr_namespace->ns_lvbo &&
-                    pres->lr_namespace->ns_lvbo->lvbo_update) {
-                        (void)pres->lr_namespace->ns_lvbo->
-                                lvbo_update(pres, NULL, 0, 1);
-                }
                 ldlm_reprocess_all(pres);
                 ldlm_resource_putref(pres);
         }
@@ -1500,8 +1493,8 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
 
         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle[0]);
         if (!lock) {
-                CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
-                       dlm_req->lock_handle[0].cookie);
+                CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
+                       "disappeared\n", dlm_req->lock_handle[0].cookie);
                 ldlm_callback_reply(req, -EINVAL);
                 RETURN(0);
         }
@@ -1509,6 +1502,22 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
         lock_res_and_lock(lock);
         lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
+        if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
+                /* If somebody cancels locks and cache is already droped,
+                 * we can tell the server we have no lock. Otherwise, we
+                 * should send cancel after dropping the cache. */
+                if ((lock->l_flags & LDLM_FL_CANCELING) &&
+                    (lock->l_flags & LDLM_FL_BL_DONE)) {
+                        LDLM_DEBUG(lock, "callback on lock "
+                                   LPX64" - lock disappeared\n",
+                                   dlm_req->lock_handle[0].cookie);
+                        unlock_res_and_lock(lock);
+                        LDLM_LOCK_PUT(lock);
+                        ldlm_callback_reply(req, -EINVAL);
+                        RETURN(0);
+                }
+                lock->l_flags |= LDLM_FL_BL_AST;
+        }
         unlock_res_and_lock(lock);
 
         /* We want the ost thread to get this reply so that it can respond
index f06737c..25b742b 100644 (file)
@@ -487,7 +487,7 @@ cleanup:
 })
 
 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
- * @count locks in @cancel. */
+ * @count locks in @cancels. */
 struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
                                              int bufcount, int *size,
                                              struct list_head *cancels,
@@ -539,13 +539,10 @@ struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
  * request was created in ldlm_cli_enqueue and it is the async request,
  * pass it to the caller in @reqp. */
 int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
+                     struct ldlm_enqueue_info *einfo,
                      const struct ldlm_res_id *res_id,
-                     ldlm_type_t type, ldlm_policy_data_t *policy,
-                     ldlm_mode_t mode, int *flags,
-                     ldlm_blocking_callback blocking,
-                     ldlm_completion_callback completion,
-                     ldlm_glimpse_callback glimpse,
-                     void *data, void *lvb, __u32 lvb_len, void *lvb_swabber,
+                     ldlm_policy_data_t *policy, int *flags,
+                     void *lvb, __u32 lvb_len, void *lvb_swabber,
                      struct lustre_handle *lockh, int async)
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
@@ -570,12 +567,14 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                 LDLM_DEBUG(lock, "client-side enqueue START");
                 LASSERT(exp == lock->l_conn_export);
         } else {
-                lock = ldlm_lock_create(ns, res_id, type, mode, blocking,
-                                        completion, glimpse, data, lvb_len);
+                lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
+                                        einfo->ei_mode, einfo->ei_cb_bl,
+                                        einfo->ei_cb_cp, einfo->ei_cb_gl,
+                                        einfo->ei_cbdata, lvb_len);
                 if (lock == NULL)
                         RETURN(-ENOMEM);
                 /* for the local lock, add the reference */
-                ldlm_lock_addref_internal(lock, mode);
+                ldlm_lock_addref_internal(lock, einfo->ei_mode);
                 ldlm_lock2handle(lock, lockh);
                 lock->l_lvb_swabber = lvb_swabber;
                 if (policy != NULL) {
@@ -584,8 +583,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                          * descriptor (ldlm_lock2desc() below) but use an
                          * inodebits lock internally with both bits set.
                          */
-                        if (type == LDLM_IBITS && !(exp->exp_connect_flags &
-                                                    OBD_CONNECT_IBITS))
+                        if (einfo->ei_type == LDLM_IBITS &&
+                            !(exp->exp_connect_flags & OBD_CONNECT_IBITS))
                                 lock->l_policy_data.l_inodebits.bits =
                                         MDS_INODELOCK_LOOKUP |
                                         MDS_INODELOCK_UPDATE;
@@ -593,7 +592,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                                 lock->l_policy_data = *policy;
                 }
 
-                if (type == LDLM_EXTENT)
+                if (einfo->ei_type == LDLM_EXTENT)
                         lock->l_req_extent = policy->l_extent;
                 LDLM_DEBUG(lock, "client-side enqueue START");
         }
@@ -603,7 +602,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
         if (reqp == NULL || *reqp == NULL) {
                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
                 if (req == NULL) {
-                        failed_lock_cleanup(ns, lock, lockh, mode);
+                        failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
                         LDLM_LOCK_PUT(lock);
                         RETURN(-ENOMEM);
                 }
@@ -621,7 +620,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
         lock->l_conn_export = exp;
         lock->l_export = NULL;
-        lock->l_blocking_ast = blocking;
+        lock->l_blocking_ast = einfo->ei_cb_bl;
 
         /* Dump lock data into the request buffer */
         body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
@@ -640,7 +639,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
          * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
          * [i_size, OBD_OBJECT_EOF] lock is taken.
          */
-        LASSERT(ergo(LIBLUSTRE_CLIENT, type != LDLM_EXTENT ||
+        LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
                      policy->l_extent.end == OBD_OBJECT_EOF));
 
         if (async) {
@@ -650,9 +649,9 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
         LDLM_DEBUG(lock, "sending request");
         rc = ptlrpc_queue_wait(req);
-        err = ldlm_cli_enqueue_fini(exp, req, type, policy ? 1 : 0,
-                                    mode, flags, lvb, lvb_len, lvb_swabber,
-                                    lockh, rc);
+        err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
+                                    einfo->ei_mode, flags, lvb, lvb_len,
+                                    lvb_swabber, lockh, rc);
 
         /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
          * one reference that we took */
@@ -772,10 +771,13 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
 }
 
 /* Cancel locks locally.
- * Returns: 1 if there is a need to send a cancel RPC to server. 0 otherwise. */
+ * Returns:
+ * LDLM_FL_LOCAL_ONLY if tere is no need in a CANCEL rpc to the server;
+ * LDLM_FL_CANCELING otherwise;
+ * LDLM_FL_BL_AST if there is a need in a separate CANCEL rpc. */
 static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
 {
-        int rc = 0;
+        int rc = LDLM_FL_LOCAL_ONLY;
         ENTRY;
         
         if (lock->l_conn_export) {
@@ -788,14 +790,15 @@ static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
                 local_only = (lock->l_flags &
                               (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
                 ldlm_cancel_callback(lock);
+                rc = (lock->l_flags & LDLM_FL_BL_AST) ?
+                        LDLM_FL_BL_AST : LDLM_FL_CANCELING;
                 unlock_res_and_lock(lock);
 
-                if (local_only)
-                        CDEBUG(D_INFO, "not sending request (at caller's "
+                if (local_only) {
+                        CDEBUG(D_DLMTRACE, "not sending request (at caller's "
                                "instruction)\n");
-                else
-                        rc = 1;
-
+                        rc = LDLM_FL_LOCAL_ONLY;
+                }
                 ldlm_lock_cancel(lock);
         } else {
                 if (lock->l_resource->lr_namespace->ns_client) {
@@ -818,7 +821,7 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
 {
         struct ldlm_request *dlm;
         struct ldlm_lock *lock;
-        int max;
+        int max, packed = 0;
         ENTRY;
 
         dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm));
@@ -837,22 +840,13 @@ static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
         list_for_each_entry(lock, head, l_bl_ast) {
                 if (!count--)
                         break;
-                /* Pack the lock handle to the given request buffer. */
                 LASSERT(lock->l_conn_export);
-                /* Cannot be set on a lock in a resource granted list.*/
-                LASSERT(!(lock->l_flags &
-                          (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)));
-                /* If @lock is marked CANCEL_ON_BLOCK, cancel
-                 * will not be sent in ldlm_cli_cancel(). It 
-                 * is used for liblustre clients, no cancel on 
-                 * block requests. However, even for liblustre 
-                 * clients, when the flag is set, batched cancel
-                 * should be sent (what if no block rpc has
-                 * come). To not send another separated rpc in
-                 * this case, the caller pass CANCEL_ON_BLOCK
-                 * flag to ldlm_cli_cancel_unused_resource(). */
+                /* Pack the lock handle to the given request buffer. */
+                LDLM_DEBUG(lock, "packing");
                 dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
+                packed++;
         }
+        CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
         EXIT;
 }
 
@@ -873,6 +867,9 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
         LASSERT(exp != NULL);
         LASSERT(count > 0);
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
+                RETURN(count);
+
         free = ldlm_req_handles_avail(exp, size, 2, 0);
         if (count > free)
                 count = free;
@@ -943,11 +940,13 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
 
         /* concurrent cancels on the same handle can happen */
         lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING);
-        if (lock == NULL)
+        if (lock == NULL) {
+                LDLM_DEBUG_NOLOCK("lock is already being destroyed\n");
                 RETURN(0);
-
+        }
+        
         rc = ldlm_cli_cancel_local(lock);
-        if (rc <= 0)
+        if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY)
                 GOTO(out, rc);
 
         list_add(&lock->l_bl_ast, &head);
@@ -981,15 +980,22 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
         spin_lock(&ns->ns_unused_lock);
         count += ns->ns_nr_unused - ns->ns_max_unused;
         while (!list_empty(&ns->ns_unused_list)) {
-                struct list_head *tmp = ns->ns_unused_list.next;
-                lock = list_entry(tmp, struct ldlm_lock, l_lru);
-
                 if (max && added >= max)
                         break;
 
+                list_for_each_entry(lock, &ns->ns_unused_list, l_lru) {
+                        /* somebody is already doing CANCEL or there is a
+                         * blocking request will send cancel. */
+                        if (!(lock->l_flags & LDLM_FL_CANCELING) &&
+                            !(lock->l_flags & LDLM_FL_BL_AST))
+                                break;
+                }
+                if (&lock->l_lru == &ns->ns_unused_list)
+                        break;
+
                 if ((added >= count) && 
                     (!(flags & LDLM_CANCEL_AGED) ||
-                     cfs_time_before_64(cur, ns->ns_max_age +
+                     cfs_time_before_64(cur, (__u64)ns->ns_max_age +
                                         lock->l_last_used)))
                         break;
 
@@ -997,10 +1003,14 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                 spin_unlock(&ns->ns_unused_lock);
 
                 lock_res_and_lock(lock);
-                if ((ldlm_lock_remove_from_lru(lock) == 0) ||
-                    (lock->l_flags & LDLM_FL_CANCELING)) {
+                /* Check flags again under the lock. */
+                if ((lock->l_flags & LDLM_FL_CANCELING) ||
+                    (lock->l_flags & LDLM_FL_BL_AST) ||
+                    (ldlm_lock_remove_from_lru(lock) == 0)) {
                         /* other thread is removing lock from lru or
-                         * somebody is already doing CANCEL. */
+                         * somebody is already doing CANCEL or
+                         * there is a blocking request which will send
+                         * cancel by itseft. */
                         unlock_res_and_lock(lock);
                         LDLM_LOCK_PUT(lock);
                         spin_lock(&ns->ns_unused_lock);
@@ -1040,13 +1050,24 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
         list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
                 if (left-- == 0)
                         break;
+
                 rc = ldlm_cli_cancel_local(lock);
-                if (rc == 0) {
+                if (rc == LDLM_FL_BL_AST) {
+                        CFS_LIST_HEAD(head);
+
+                        LDLM_DEBUG(lock, "Cancel lock separately");
+                        list_del_init(&lock->l_bl_ast);
+                        list_add(&lock->l_bl_ast, &head);
+                        ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
+                        rc = LDLM_FL_LOCAL_ONLY;
+                }
+                if (rc == LDLM_FL_LOCAL_ONLY) {
                         /* CANCEL RPC should not be sent to server. */
                         list_del_init(&lock->l_bl_ast);
                         LDLM_LOCK_PUT(lock);
                         added--;
                 }
+
         } 
         RETURN(added);
 }
@@ -1114,6 +1135,12 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                         continue;
                 }
 
+                /* If somebody is already doing CANCEL, or blocking ast came,
+                 * skip this lock. */
+                if (lock->l_flags & LDLM_FL_BL_AST || 
+                    lock->l_flags & LDLM_FL_CANCELING)
+                        continue;
+
                 if (lockmode_compat(lock->l_granted_mode, mode))
                         continue;
 
@@ -1124,10 +1151,6 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                       policy->l_inodebits.bits))
                         continue;
 
-                /* If somebody is already doing CANCEL, skip it. */
-                if (lock->l_flags & LDLM_FL_CANCELING)
-                        continue;
-                
                 /* See CBPENDING comment in ldlm_cancel_lru */
                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
                         lock_flags;
@@ -1142,7 +1165,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
         /* Handle only @count inserted locks. */
         left = count;
         list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
-                int rc = 0;
+                int rc = LDLM_FL_LOCAL_ONLY;
 
                 if (left-- == 0)
                         break;
@@ -1151,7 +1174,16 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                 else
                         rc = ldlm_cli_cancel_local(lock);
 
-                if (rc == 0) {
+                if (rc == LDLM_FL_BL_AST) {
+                        CFS_LIST_HEAD(head);
+
+                        LDLM_DEBUG(lock, "Cancel lock separately");
+                        list_del_init(&lock->l_bl_ast);
+                        list_add(&lock->l_bl_ast, &head);
+                        ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
+                        rc = LDLM_FL_LOCAL_ONLY;
+                }
+                if (rc == LDLM_FL_LOCAL_ONLY) {
                         /* CANCEL RPC should not be sent to server. */
                         list_del_init(&lock->l_bl_ast);
                         LDLM_LOCK_PUT(lock);
@@ -1215,7 +1247,7 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
 int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
                                     const struct ldlm_res_id *res_id,
                                     ldlm_policy_data_t *policy,
-                                    int mode, int flags, void *opaque)
+                                    ldlm_mode_t mode, int flags, void *opaque)
 {
         struct ldlm_resource *res;
         CFS_LIST_HEAD(cancels);
index 8d632ea..db20488 100644 (file)
@@ -85,13 +85,15 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page)
         rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
                              &res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
         if (!rc) {
+                struct ldlm_enqueue_info einfo = {LDLM_IBITS, LCK_CR,
+                        llu_md_blocking_ast, ldlm_completion_ast, NULL, inode};
+
                 llu_prep_md_op_data(&op_data, inode, NULL, NULL, 0, 0,
                                     LUSTRE_OPC_ANY);
 
-                rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, &it, LCK_CR,
+                rc = md_enqueue(sbi->ll_md_exp, &einfo, &it,
                                 &op_data, &lockh, NULL, 0,
-                                ldlm_completion_ast, llu_md_blocking_ast,
-                                inode, LDLM_FL_CANCEL_ON_BLOCK);
+                                LDLM_FL_CANCEL_ON_BLOCK);
                 request = (struct ptlrpc_request *)it.d.lustre.it_data;
                 if (request)
                         ptlrpc_req_finished(request);
index b716544..c4946af 100644 (file)
@@ -269,7 +269,7 @@ int llu_glimpse_size(struct inode *inode)
         struct intnl_stat *st = llu_i2stat(inode);
         struct llu_sb_info *sbi = llu_i2sbi(inode);
         struct lustre_handle lockh = { 0 };
-        struct obd_enqueue_info einfo = { 0 };
+        struct ldlm_enqueue_info einfo = { 0 };
         struct obd_info oinfo = { { { 0 } } };
         int rc;
         ENTRY;
@@ -288,7 +288,6 @@ int llu_glimpse_size(struct inode *inode)
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_flags = LDLM_FL_HAS_INTENT;
         einfo.ei_cb_bl = llu_extent_lock_callback;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = llu_glimpse_callback;
@@ -297,6 +296,7 @@ int llu_glimpse_size(struct inode *inode)
         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
         oinfo.oi_lockh = &lockh;
         oinfo.oi_md = lli->lli_smd;
+        oinfo.oi_flags = LDLM_FL_HAS_INTENT;
 
         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
         if (rc) {
@@ -318,7 +318,7 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
 {
         struct llu_sb_info *sbi = llu_i2sbi(inode);
         struct intnl_stat *st = llu_i2stat(inode);
-        struct obd_enqueue_info einfo = { 0 };
+        struct ldlm_enqueue_info einfo = { 0 };
         struct obd_info oinfo = { { { 0 } } };
         struct ost_lvb lvb;
         int rc;
@@ -338,7 +338,6 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = mode;
-        einfo.ei_flags = ast_flags;
         einfo.ei_cb_bl = llu_extent_lock_callback;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = llu_glimpse_callback;
@@ -347,8 +346,9 @@ int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
         oinfo.oi_policy = *policy;
         oinfo.oi_lockh = lockh;
         oinfo.oi_md = lsm;
+        oinfo.oi_flags = ast_flags;
 
-        rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo);
+        rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
         *policy = oinfo.oi_policy;
         if (rc > 0)
                 rc = -EIO;
index 60c109b..6c9dc66 100644 (file)
@@ -1338,9 +1338,11 @@ static int llu_file_flock(struct inode *ino,
                            fid_oid(&lli->lli_fid),
                            fid_ver(&lli->lli_fid),
                            LDLM_FLOCK} };
+        struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
+                ldlm_flock_completion_ast, NULL, file_lock };
+
         struct lustre_handle lockh = {0};
         ldlm_policy_data_t flock;
-        ldlm_mode_t mode = 0;
         int flags = 0;
         int rc;
 
@@ -1353,13 +1355,13 @@ static int llu_file_flock(struct inode *ino,
 
         switch (file_lock->fl_type) {
         case F_RDLCK:
-                mode = LCK_PR;
+                einfo.ei_mode = LCK_PR;
                 break;
         case F_UNLCK:
-                mode = LCK_NL;
+                einfo.ei_mode = LCK_NL;
                 break;
         case F_WRLCK:
-                mode = LCK_PW;
+                einfo.ei_mode = LCK_PW;
                 break;
         default:
                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
@@ -1390,7 +1392,7 @@ static int llu_file_flock(struct inode *ino,
 #endif
 #endif
                 flags = LDLM_FL_TEST_LOCK;
-                file_lock->fl_type = mode;
+                file_lock->fl_type = einfo.ei_mode;
                 break;
         default:
                 CERROR("unknown fcntl cmd: %d\n", cmd);
@@ -1399,13 +1401,11 @@ static int llu_file_flock(struct inode *ino,
 
         CDEBUG(D_DLMTRACE, "inode=%llu, pid=%u, flags=%#x, mode=%u, "
                "start="LPU64", end="LPU64"\n", (unsigned long long)st->st_ino,
-               flock.l_flock.pid, flags, mode, flock.l_flock.start,
+               flock.l_flock.pid, flags, einfo.ei_mode, flock.l_flock.start,
                flock.l_flock.end);
 
-        rc = ldlm_cli_enqueue(llu_i2mdcexp(ino), NULL, &res_id,
-                              LDLM_FLOCK, &flock, mode, &flags, NULL,
-                              ldlm_flock_completion_ast, NULL,
-                              file_lock, NULL, 0, NULL, &lockh, 0);
+        rc = ldlm_cli_enqueue(llu_i2mdcexp(ino), NULL, &einfo, &res_id, 
+                              &flock, &flags, NULL, 0, NULL, &lockh, 0);
         RETURN(rc);
 }
 
@@ -1691,6 +1691,9 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags,
         struct llu_inode_info *lli2 = NULL;
         struct lov_stripe_md *lsm;
         struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
+        struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CR,
+                llu_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
+
         struct ptlrpc_request *req = NULL;
         struct lustre_md md;
         struct md_op_data data;
@@ -1720,9 +1723,8 @@ static int llu_lov_setstripe_ea_info(struct inode *ino, int flags,
         llu_prep_md_op_data(&data, NULL, ino, NULL, 0, O_RDWR, 
                             LUSTRE_OPC_ANY);
 
-        rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, &oit, LCK_CR, &data,
-                        &lockh, lum, lum_size, ldlm_completion_ast,
-                        llu_md_blocking_ast, NULL, LDLM_FL_INTENT_ONLY);
+        rc = md_enqueue(sbi->ll_md_exp, &einfo, &oit, &data,
+                        &lockh, lum, lum_size, LDLM_FL_INTENT_ONLY);
         if (rc)
                 GOTO(out, rc);
 
index bb6cdf4..4c463d1 100644 (file)
@@ -297,6 +297,8 @@ static struct page *ll_get_dir_page(struct inode *dir, __u32 hash, int exact,
         rc = md_lock_match(ll_i2sbi(dir)->ll_md_exp, LDLM_FL_BLOCK_GRANTED,
                            ll_inode2fid(dir), LDLM_IBITS, &policy, mode, &lockh);
         if (!rc) {
+                struct ldlm_enqueue_info einfo = { LDLM_IBITS, mode,
+                       ll_md_blocking_ast, ldlm_completion_ast, NULL, dir };
                 struct lookup_intent it = { .it_op = IT_READDIR };
                 struct ptlrpc_request *request;
                 struct md_op_data *op_data;
@@ -306,10 +308,8 @@ static struct page *ll_get_dir_page(struct inode *dir, __u32 hash, int exact,
                 if (IS_ERR(op_data))
                         return (void *)op_data;
 
-                rc = md_enqueue(ll_i2sbi(dir)->ll_md_exp, LDLM_IBITS, &it,
-                                mode, op_data, &lockh, NULL, 0,
-                                ldlm_completion_ast, ll_md_blocking_ast, dir,
-                                0);
+                rc = md_enqueue(ll_i2sbi(dir)->ll_md_exp, &einfo, &it,
+                                op_data, &lockh, NULL, 0, 0);
 
                 ll_finish_md_op_data(op_data);
 
index 82a6108..62b0bd0 100644 (file)
@@ -1101,7 +1101,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
                      lstat_t *st)
 {
         struct lustre_handle lockh = { 0 };
-        struct obd_enqueue_info einfo = { 0 };
+        struct ldlm_enqueue_info einfo = { 0 };
         struct obd_info oinfo = { { { 0 } } };
         struct ost_lvb lvb;
         int rc;
@@ -1110,7 +1110,6 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_flags = LDLM_FL_HAS_INTENT;
         einfo.ei_cb_bl = ll_extent_lock_callback;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
@@ -1119,6 +1118,7 @@ int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
         oinfo.oi_lockh = &lockh;
         oinfo.oi_md = lsm;
+        oinfo.oi_flags = LDLM_FL_HAS_INTENT;
 
         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
         if (rc == -ENOENT)
@@ -1149,7 +1149,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
         struct ll_inode_info *lli = ll_i2info(inode);
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct lustre_handle lockh = { 0 };
-        struct obd_enqueue_info einfo = { 0 };
+        struct ldlm_enqueue_info einfo = { 0 };
         struct obd_info oinfo = { { { 0 } } };
         int rc;
         ENTRY;
@@ -1173,7 +1173,6 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
          *       acquired only if there were no conflicting locks. */
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = LCK_PR;
-        einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT;
         einfo.ei_cb_bl = ll_extent_lock_callback;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
@@ -1182,6 +1181,7 @@ int ll_glimpse_size(struct inode *inode, int ast_flags)
         oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
         oinfo.oi_lockh = &lockh;
         oinfo.oi_md = lli->lli_smd;
+        oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
 
         rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
         if (rc == -ENOENT)
@@ -1206,7 +1206,7 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ost_lvb lvb;
-        struct obd_enqueue_info einfo = { 0 };
+        struct ldlm_enqueue_info einfo = { 0 };
         struct obd_info oinfo = { { { 0 } } };
         int rc;
         ENTRY;
@@ -1228,7 +1228,6 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
 
         einfo.ei_type = LDLM_EXTENT;
         einfo.ei_mode = mode;
-        einfo.ei_flags = ast_flags;
         einfo.ei_cb_bl = ll_extent_lock_callback;
         einfo.ei_cb_cp = ldlm_completion_ast;
         einfo.ei_cb_gl = ll_glimpse_callback;
@@ -1237,8 +1236,9 @@ int ll_extent_lock(struct ll_file_data *fd, struct inode *inode,
         oinfo.oi_policy = *policy;
         oinfo.oi_lockh = lockh;
         oinfo.oi_md = lsm;
+        oinfo.oi_flags = ast_flags;
 
-        rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo);
+        rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
         *policy = oinfo.oi_policy;
         if (rc > 0)
                 rc = -EIO;
@@ -2029,6 +2029,9 @@ static int join_file(struct inode *head_inode, struct file *head_filp,
         struct dentry *tail_dentry = tail_filp->f_dentry;
         struct lookup_intent oit = {.it_op = IT_OPEN,
                                    .it_flags = head_filp->f_flags|O_JOIN_FILE};
+        struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
+                ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
+
         struct lustre_handle lockh;
         struct md_op_data *op_data;
         int    rc;
@@ -2046,9 +2049,8 @@ static int join_file(struct inode *head_inode, struct file *head_filp,
         if (IS_ERR(op_data))
                 RETURN(PTR_ERR(op_data));
 
-        rc = md_enqueue(ll_i2mdexp(head_inode), LDLM_IBITS, &oit, LCK_CW,
-                        op_data, &lockh, NULL, 0, ldlm_completion_ast,
-                        ll_md_blocking_ast, NULL, 0);
+        rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit, 
+                         op_data, &lockh, NULL, 0, 0);
 
         ll_finish_md_op_data(op_data);
         if (rc < 0)
@@ -2426,9 +2428,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                             fid_oid(ll_inode2fid(inode)),
                             fid_ver(ll_inode2fid(inode)),
                             LDLM_FLOCK} };
+        struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
+                ldlm_flock_completion_ast, NULL, file_lock };
         struct lustre_handle lockh = {0};
         ldlm_policy_data_t flock;
-        ldlm_mode_t mode = 0;
         int flags = 0;
         int rc;
         ENTRY;
@@ -2450,7 +2453,7 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
 
         switch (file_lock->fl_type) {
         case F_RDLCK:
-                mode = LCK_PR;
+                einfo.ei_mode = LCK_PR;
                 break;
         case F_UNLCK:
                 /* An unlock request may or may not have any relation to
@@ -2461,10 +2464,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                  * information that is given with a normal read or write record
                  * lock request. To avoid creating another ldlm unlock (cancel)
                  * message we'll treat a LCK_NL flock request as an unlock. */
-                mode = LCK_NL;
+                einfo.ei_mode = LCK_NL;
                 break;
         case F_WRLCK:
-                mode = LCK_PW;
+                einfo.ei_mode = LCK_PW;
                 break;
         default:
                 CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
@@ -2491,7 +2494,7 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
                 flags = LDLM_FL_TEST_LOCK;
                 /* Save the old mode so that if the mode in the lock changes we
                  * can decrement the appropriate reader or writer refcount. */
-                file_lock->fl_type = mode;
+                file_lock->fl_type = einfo.ei_mode;
                 break;
         default:
                 CERROR("unknown fcntl lock command: %d\n", cmd);
@@ -2500,12 +2503,10 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
 
         CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
                "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
-               flags, mode, flock.l_flock.start, flock.l_flock.end);
+               flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
 
-        rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &res_id,
-                              LDLM_FLOCK, &flock, mode, &flags, NULL,
-                              ldlm_flock_completion_ast, NULL, file_lock,
-                              NULL, 0, NULL, &lockh, 0);
+        rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
+                              &flock, &flags, NULL, 0, NULL, &lockh, 0);
         if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
                 ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
 #ifdef HAVE_F_OP_FLOCK
index b48e161..dbebe38 100644 (file)
@@ -1432,11 +1432,9 @@ static int lmv_done_writing(struct obd_export *exp,
 }
 
 static int
-lmv_enqueue_slaves(struct obd_export *exp, int locktype,
-                   struct lookup_intent *it, int lockmode,
-                   struct md_op_data *op_data, struct lustre_handle *lockh,
-                   void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
-                   ldlm_blocking_callback cb_blocking, void *cb_data)
+lmv_enqueue_slaves(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                   struct lookup_intent *it, struct md_op_data *op_data,
+                   struct lustre_handle *lockh, void *lmm, int lmmsize)
 {
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
@@ -1463,9 +1461,8 @@ lmv_enqueue_slaves(struct obd_export *exp, int locktype,
                 if (tgt_exp == NULL)
                         continue;
 
-                rc = md_enqueue(tgt_exp, locktype, it, lockmode, op_data2,
-                                lockh + i, lmm, lmmsize, cb_compl, cb_blocking,
-                                cb_data, 0);
+                rc = md_enqueue(tgt_exp, einfo, it, op_data2,
+                                lockh + i, lmm, lmmsize, 0);
 
                 CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n",
                        PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
@@ -1491,7 +1488,7 @@ cleanup:
                 /* drop all taken locks */
                 while (--i >= 0) {
                         if (lockh[i].cookie)
-                                ldlm_lock_decref(lockh + i, lockmode);
+                                ldlm_lock_decref(lockh + i, einfo->ei_mode);
                         lockh[i].cookie = 0;
                 }
         }
@@ -1499,11 +1496,9 @@ cleanup:
 }
 
 static int
-lmv_enqueue_remote(struct obd_export *exp, int lock_type,
-                   struct lookup_intent *it, int lock_mode,
-                   struct md_op_data *op_data, struct lustre_handle *lockh,
-                   void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
-                   ldlm_blocking_callback cb_blocking, void *cb_data,
+lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                   struct lookup_intent *it, struct md_op_data *op_data,
+                   struct lustre_handle *lockh, void *lmm, int lmmsize,
                    int extra_lock_flags)
 {
         struct ptlrpc_request *req = it->d.lustre.it_data;
@@ -1550,9 +1545,8 @@ lmv_enqueue_remote(struct obd_export *exp, int lock_type,
         rdata->op_fid1 = fid_copy;
         rdata->op_bias = MDS_CROSS_REF;
 
-        rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, rdata,
-                        lockh, lmm, lmmsize, cb_compl, cb_blocking,
-                        cb_data, extra_lock_flags);
+        rc = md_enqueue(tgt_exp, einfo, it, rdata, lockh,
+                        lmm, lmmsize, extra_lock_flags);
         OBD_FREE_PTR(rdata);
         EXIT;
 out:
@@ -1561,11 +1555,9 @@ out:
 }
 
 static int
-lmv_enqueue(struct obd_export *exp, int lock_type,
-            struct lookup_intent *it, int lock_mode,
-            struct md_op_data *op_data, struct lustre_handle *lockh,
-            void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
-            ldlm_blocking_callback cb_blocking, void *cb_data,
+lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+            struct lookup_intent *it, struct md_op_data *op_data,
+            struct lustre_handle *lockh, void *lmm, int lmmsize,
             int extra_lock_flags)
 {
         struct obd_device *obd = exp->exp_obd;
@@ -1580,9 +1572,8 @@ lmv_enqueue(struct obd_export *exp, int lock_type,
                 RETURN(rc);
 
         if (op_data->op_mea1 && it->it_op == IT_UNLINK) {
-                rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
-                                        op_data, lockh, lmm, lmmsize,
-                                        cb_compl, cb_blocking, cb_data);
+                rc = lmv_enqueue_slaves(exp, einfo, it, op_data,
+                                        lockh, lmm, lmmsize);
                 RETURN(rc);
         }
 
@@ -1611,15 +1602,12 @@ lmv_enqueue(struct obd_export *exp, int lock_type,
         CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it),
                PFID(&op_data->op_fid1));
 
-        rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, op_data, lockh,
-                        lmm, lmmsize, cb_compl, cb_blocking, cb_data,
-                        extra_lock_flags);
+        rc = md_enqueue(tgt_exp, einfo, it, op_data, lockh,
+                        lmm, lmmsize, extra_lock_flags);
 
         if (rc == 0 && it->it_op == IT_OPEN)
-                rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode,
-                                        op_data, lockh, lmm, lmmsize,
-                                        cb_compl, cb_blocking, cb_data,
-                                        extra_lock_flags);
+                rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
+                                        lmm, lmmsize, extra_lock_flags);
         RETURN(rc);
 }
 
@@ -1782,7 +1770,8 @@ static int lmv_early_cancel_stripes(struct obd_export *exp,
                         st_fid = &obj->lo_inodes[i].li_fid;
                         if (tgt_exp != st_exp) {
                                 rc = md_cancel_unused(st_exp, st_fid, &policy,
-                                                      mode, 0, NULL);
+                                                      mode, LDLM_FL_ASYNC,
+                                                      NULL);
                                 if (rc)
                                         break;
                         } else {
index a8711e0..721dfa5 100644 (file)
@@ -36,7 +36,7 @@ struct lov_request {
 };
 
 struct lov_request_set {
-        struct obd_enqueue_info *set_ei;
+        struct ldlm_enqueue_info*set_ei;
         struct obd_info         *set_oi;
         atomic_t                 set_refcount;
         struct obd_export       *set_exp;
@@ -200,9 +200,10 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *obd_info,
                       obd_off end, struct lov_request_set **reqset);
 int lov_fini_sync_set(struct lov_request_set *set);
 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
-                         struct obd_enqueue_info *einfo,
+                         struct ldlm_enqueue_info *einfo,
                          struct lov_request_set **reqset);
-int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc);
+int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
+                         struct ptlrpc_request_set *rqset);
 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
                        struct lov_stripe_md *lsm,
                        ldlm_policy_data_t *policy, __u32 mode,
index 5e6e1bb..c31295e 100644 (file)
@@ -1838,12 +1838,13 @@ static int lov_enqueue_interpret(struct ptlrpc_request_set *rqset,
 {
         struct lov_request_set *lovset = (struct lov_request_set *)data;
         ENTRY;
-        rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc);
+        rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc, rqset);
         RETURN(rc);
 }
 
 static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
-                       struct obd_enqueue_info *einfo)
+                       struct ldlm_enqueue_info *einfo,
+                       struct ptlrpc_request_set *rqset)
 {
         struct lov_request_set *set;
         struct lov_request *req;
@@ -1856,7 +1857,7 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
         ASSERT_LSM_MAGIC(oinfo->oi_md);
 
         /* we should never be asked to replay a lock this way. */
-        LASSERT((einfo->ei_flags & LDLM_FL_REPLAY) == 0);
+        LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0);
 
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
@@ -1870,20 +1871,20 @@ static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 req = list_entry(pos, struct lov_request, rq_link);
 
                 rc = obd_enqueue(lov->lov_tgts[req->rq_idx]->ltd_exp,
-                                 &req->rq_oi, einfo);
+                                 &req->rq_oi, einfo, rqset);
                 if (rc != ELDLM_OK)
                         GOTO(out, rc);
         }
 
-        if (einfo->ei_rqset && !list_empty(&einfo->ei_rqset->set_requests)) {
+        if (rqset && !list_empty(&rqset->set_requests)) {
                 LASSERT(rc == 0);
-                LASSERT(einfo->ei_rqset->set_interpret == NULL);
-                einfo->ei_rqset->set_interpret = lov_enqueue_interpret;
-                einfo->ei_rqset->set_arg = (void *)set;
+                LASSERT(rqset->set_interpret == NULL);
+                rqset->set_interpret = lov_enqueue_interpret;
+                rqset->set_arg = (void *)set;
                 RETURN(rc);
         }
 out:
-        rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc);
+        rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc, rqset);
         RETURN(rc);
 }
 
index 88a8b57..b702f7f 100644 (file)
@@ -161,7 +161,7 @@ int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
                 ldlm_lock_allow_match(lock);
                 LDLM_LOCK_PUT(lock);
         } else if ((rc == ELDLM_LOCK_ABORTED) &&
-                   (set->set_ei->ei_flags & LDLM_FL_HAS_INTENT)) {
+                   (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
                 lov_stripe_lock(set->set_oi->oi_md);
                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
@@ -192,7 +192,7 @@ int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
 /* The callback for osc_enqueue that updates lov info for every OSC request. */
 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
 {
-        struct obd_enqueue_info *einfo;
+        struct ldlm_enqueue_info *einfo;
         struct lov_request *lovreq;
 
         lovreq = container_of(oinfo, struct lov_request, rq_oi);
@@ -237,7 +237,8 @@ static int enqueue_done(struct lov_request_set *set, __u32 mode)
         RETURN(rc);
 }
 
-int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc)
+int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
+                         struct ptlrpc_request_set *rqset)
 {
         int ret = 0;
         ENTRY;
@@ -247,7 +248,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc)
         LASSERT(set->set_exp);
         /* Do enqueue_done only for sync requests and if any request
          * succeeded. */
-        if (!set->set_ei->ei_rqset) {
+        if (!rqset) {
                 if (rc)
                         set->set_completes = 0;
                 ret = enqueue_done(set, mode);
@@ -261,7 +262,7 @@ int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc)
 }
 
 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
-                         struct obd_enqueue_info *einfo,
+                         struct ldlm_enqueue_info *einfo,
                          struct lov_request_set **reqset)
 {
         struct lov_obd *lov = &exp->exp_obd->u.lov;
@@ -321,6 +322,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
                 /* Set lov request specific parameters. */
                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
                 req->rq_oi.oi_cb_up = cb_update_enqueue;
+                req->rq_oi.oi_flags = oinfo->oi_flags;
 
                 LASSERT(req->rq_oi.oi_lockh);
 
@@ -348,7 +350,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
         *reqset = set;
         RETURN(0);
 out_set:
-        lov_fini_enqueue_set(set, einfo->ei_mode, rc);
+        lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
         RETURN(rc);
 }
 
index 2b4122d..a23511d 100644 (file)
@@ -105,17 +105,11 @@ int mdc_intent_lock(struct obd_export *exp,
                     struct lookup_intent *, int,
                     struct ptlrpc_request **reqp,
                     ldlm_blocking_callback cb_blocking, int extra_lock_flags);
-int mdc_enqueue(struct obd_export *exp,
-                int lock_type,
-                struct lookup_intent *it,
-                int lock_mode,
-                struct md_op_data *op_data,
-                struct lustre_handle *lockh,
-                void *lmm,
-                int lmmlen,
-                ldlm_completion_callback cb_completion,
-                ldlm_blocking_callback cb_blocking,
-                void *cb_data, int extra_lock_flags);
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                struct lookup_intent *it, struct md_op_data *op_data,
+                struct lustre_handle *lockh, void *lmm, int lmmlen,
+                int extra_lock_flags);
+
 int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
                             struct list_head *cancels, ldlm_mode_t mode,
                             __u64 bits);
index 8b2961d..6c28587 100644 (file)
@@ -246,17 +246,10 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req,
 
 /* We always reserve enough space in the reply packet for a stripe MD, because
  * we don't know in advance the file type. */
-int mdc_enqueue(struct obd_export *exp,
-                int lock_type,
-                struct lookup_intent *it,
-                int lock_mode,
-                struct md_op_data *op_data,
-                struct lustre_handle *lockh,
-                void *lmm,
-                int lmmsize,
-                ldlm_completion_callback cb_completion,
-                ldlm_blocking_callback cb_blocking,
-                void *cb_data, int extra_lock_flags)
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                struct lookup_intent *it, struct md_op_data *op_data,
+                struct lustre_handle *lockh, void *lmm, int lmmsize,
+                int extra_lock_flags)
 {
         struct ptlrpc_request *req;
         struct obd_device *obddev = class_exp2obd(exp);
@@ -276,14 +269,13 @@ int mdc_enqueue(struct obd_export *exp,
                            [DLM_LOCKREPLY_OFF]   = sizeof(*lockrep),
                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
-                                                   cl_max_mds_easize };
+                                                   cl_max_mds_easize,
+                           0, 0, 0 };
         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
         int repbufcnt = 4, rc;
         ENTRY;
 
-        LASSERTF(lock_type == LDLM_IBITS, "lock type %d\n", lock_type);
-//        LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
-//                          ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
+        LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
 
         if (it->it_op & IT_OPEN) {
                 int do_join = !!(it->it_flags & O_JOIN_FILE);
@@ -438,9 +430,8 @@ int mdc_enqueue(struct obd_export *exp,
           * rpcs in flight counter */
         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
         mdc_enter_request(&obddev->u.cli);
-        rc = ldlm_cli_enqueue(exp, &req, &res_id, lock_type, &policy,
-                              lock_mode, &flags, cb_blocking, cb_completion,
-                              NULL, cb_data, NULL, 0, NULL, lockh, 0);
+        rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
+                              0, NULL, lockh, 0);
         mdc_exit_request(&obddev->u.cli);
         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
 
@@ -455,7 +446,7 @@ int mdc_enqueue(struct obd_export *exp,
         /* This can go when we're sure that this can never happen */
         LASSERT(rc != -ENOENT);
         if (rc == ELDLM_LOCK_ABORTED) {
-                lock_mode = 0;
+                einfo->ei_mode = 0;
                 memset(lockh, 0, sizeof(*lockh));
                 rc = 0;
         } else if (rc != 0) {
@@ -470,10 +461,10 @@ int mdc_enqueue(struct obd_export *exp,
 
                 /* If the server gave us back a different lock mode, we should
                  * fix up our variables. */
-                if (lock->l_req_mode != lock_mode) {
+                if (lock->l_req_mode != einfo->ei_mode) {
                         ldlm_lock_addref(lockh, lock->l_req_mode);
-                        ldlm_lock_decref(lockh, lock_mode);
-                        lock_mode = lock->l_req_mode;
+                        ldlm_lock_decref(lockh, einfo->ei_mode);
+                        einfo->ei_mode = lock->l_req_mode;
                 }
                 LDLM_LOCK_PUT(lock);
         }
@@ -485,7 +476,7 @@ int mdc_enqueue(struct obd_export *exp,
 
         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
-        it->d.lustre.it_lock_mode = lock_mode;
+        it->d.lustre.it_lock_mode = einfo->ei_mode;
         it->d.lustre.it_data = req;
 
         if (it->d.lustre.it_status < 0 && req->rq_replay)
@@ -740,6 +731,10 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
          * this and use the request from revalidate.  In this case, revalidate
          * never dropped its reference, so the refcounts are all OK */
         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
+                struct ldlm_enqueue_info einfo =
+                        { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
+                          ldlm_completion_ast, NULL, NULL };
+
                 /* For case if upper layer did not alloc fid, do it now. */
                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
@@ -748,11 +743,8 @@ int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
                                 RETURN(rc);
                         }
                 }
-                
-                rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it),
-                                 op_data, &lockh, lmm, lmmsize,
-                                 ldlm_completion_ast, cb_blocking, NULL,
-                                 extra_lock_flags);
+                rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
+                                 lmm, lmmsize, extra_lock_flags);
                 if (rc < 0)
                         RETURN(rc);
                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
index ff2346c..83203de 100644 (file)
@@ -122,21 +122,25 @@ static int mds_setattr_unpack(struct ptlrpc_request *req, int offset,
         r->ur_flags = rec->sa_attr_flags;
 
         LASSERT_REQSWAB (req, offset + 1);
-        if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 1) {
+        r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
+        if (r->ur_eadatalen) {
                 r->ur_eadata = lustre_msg_buf(req->rq_reqmsg, offset + 1, 0);
                 if (r->ur_eadata == NULL)
                         RETURN(-EFAULT);
-                r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
         }
-
-        if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 2) {
+        r->ur_cookielen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
+        if (r->ur_cookielen) {
                 r->ur_logcookies = lustre_msg_buf(req->rq_reqmsg, offset + 2,0);
                 if (r->ur_eadata == NULL)
                         RETURN (-EFAULT);
-
-                r->ur_cookielen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
         }
-
+        if (lustre_msg_buflen(req->rq_reqmsg, offset + 3)) {
+                r->ur_dlm = lustre_swab_reqbuf(req, offset + 3,
+                                               sizeof(*r->ur_dlm),
+                                               lustre_swab_ldlm_request); 
+                if (r->ur_dlm == NULL)
+                        RETURN (-EFAULT);
+        }
         RETURN(0);
 }
 
@@ -172,7 +176,8 @@ static int mds_create_unpack(struct ptlrpc_request *req, int offset,
         r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
 
         LASSERT_REQSWAB(req, offset + 2);
-        if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 2) {
+        r->ur_tgtlen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
+        if (r->ur_tgtlen) {
                 /* NB for now, we only seem to pass NULL terminated symlink
                  * target strings here.  If this ever changes, we'll have
                  * to stop checking for a buffer filled completely with a
@@ -183,7 +188,13 @@ static int mds_create_unpack(struct ptlrpc_request *req, int offset,
                 r->ur_tgt = lustre_msg_string(req->rq_reqmsg, offset + 2, 0);
                 if (r->ur_tgt == NULL)
                         RETURN (-EFAULT);
-                r->ur_tgtlen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
+        }
+        if (lustre_msg_buflen(req->rq_reqmsg, offset + 3)) {
+                r->ur_dlm = lustre_swab_reqbuf(req, offset + 3,
+                                               sizeof(*r->ur_dlm),
+                                               lustre_swab_ldlm_request); 
+                if (r->ur_dlm == NULL)
+                        RETURN (-EFAULT);
         }
         RETURN(0);
 }
@@ -215,6 +226,13 @@ static int mds_link_unpack(struct ptlrpc_request *req, int offset,
         if (r->ur_name == NULL)
                 RETURN (-EFAULT);
         r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
+        if (lustre_msg_buflen(req->rq_reqmsg, offset + 2)) {
+                r->ur_dlm = lustre_swab_reqbuf(req, offset + 2,
+                                               sizeof(*r->ur_dlm),
+                                               lustre_swab_ldlm_request); 
+                if (r->ur_dlm == NULL)
+                        RETURN (-EFAULT);
+        }
         RETURN(0);
 }
 
@@ -246,6 +264,14 @@ static int mds_unlink_unpack(struct ptlrpc_request *req, int offset,
         if (r->ur_name == NULL)
                 RETURN(-EFAULT);
         r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
+        
+        if (lustre_msg_buflen(req->rq_reqmsg, offset + 2)) {
+                r->ur_dlm = lustre_swab_reqbuf(req, offset + 2,
+                                               sizeof(*r->ur_dlm),
+                                               lustre_swab_ldlm_request); 
+                if (r->ur_dlm == NULL)
+                        RETURN (-EFAULT);
+        }
         RETURN(0);
 }
 
@@ -282,6 +308,13 @@ static int mds_rename_unpack(struct ptlrpc_request *req, int offset,
         if (r->ur_tgt == NULL)
                 RETURN(-EFAULT);
         r->ur_tgtlen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
+        if (lustre_msg_buflen(req->rq_reqmsg, offset + 3)) {
+                r->ur_dlm = lustre_swab_reqbuf(req, offset + 3,
+                                               sizeof(*r->ur_dlm),
+                                               lustre_swab_ldlm_request); 
+                if (r->ur_dlm == NULL)
+                        RETURN (-EFAULT);
+        }
         RETURN(0);
 }
 
@@ -317,11 +350,11 @@ static int mds_open_unpack(struct ptlrpc_request *req, int offset,
         r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
 
         LASSERT_REQSWAB(req, offset + 2);
-        if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 2) {
+        r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
+        if (r->ur_eadatalen) {
                 r->ur_eadata = lustre_msg_buf(req->rq_reqmsg, offset + 2, 0);
                 if (r->ur_eadata == NULL)
                         RETURN (-EFAULT);
-                r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
         }
         RETURN(0);
 }
index d30103c..6e2688b 100644 (file)
@@ -523,6 +523,9 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
 
         MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
 
+        if (rec->ur_dlm)
+                ldlm_request_cancel(req, rec->ur_dlm, 0);
+
         if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN ||
             (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)) {
                 de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
@@ -791,6 +794,9 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
                 GOTO(cleanup, rc = -ESTALE);
 
+        if (rec->ur_dlm)
+                ldlm_request_cancel(req, rec->ur_dlm, 0);
+
         dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX, &lockh,
                                         MDS_INODELOCK_UPDATE);
         if (IS_ERR(dparent)) {
@@ -1577,6 +1583,9 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
                 GOTO(cleanup, rc = -ENOENT);
 
+        if (rec->ur_dlm)
+                ldlm_request_cancel(req, rec->ur_dlm, 0);
+
         rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
                                          &parent_lockh, &dparent, LCK_EX,
                                          MDS_INODELOCK_UPDATE,
@@ -1814,6 +1823,9 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
                 GOTO(cleanup, rc = -ENOENT);
 
+        if (rec->ur_dlm)
+                ldlm_request_cancel(req, rec->ur_dlm, 0);
+        
         /* Step 1: Lookup the source inode and target directory by FID */
         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
         if (IS_ERR(de_src))
@@ -2155,6 +2167,9 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
 
         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
 
+        if (rec->ur_dlm)
+                ldlm_request_cancel(req, rec->ur_dlm, 0);
+
         rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir,
                                              rec->ur_fid2, &de_tgtdir, LCK_EX,
                                              rec->ur_name, rec->ur_namelen,
index 34466b8..676df34 100644 (file)
@@ -741,16 +741,16 @@ static int mdt_rename_lock(struct mdt_thread_info *info,
                                             ldlm_completion_ast, NULL, NULL, 0,
                                             NULL, lh);
         } else {
+                struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_EX,
+                        ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL };
                 int flags = 0;
 
                 /*
                  * This is the case mdt0 is remote node, issue DLM lock like
                  * other clients.
                  */
-                rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, res_id,
-                                      LDLM_IBITS, policy, LCK_EX, &flags,
-                                      ldlm_blocking_ast, ldlm_completion_ast,
-                                      NULL, NULL, NULL, 0, NULL, lh, 0);
+                rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, &einfo, res_id,
+                                      policy, &flags, NULL, 0, NULL, lh, 0);
         }
 
         RETURN(rc);
index 5de0f95..e25539a 100644 (file)
@@ -596,6 +596,9 @@ static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
                        struct lustre_handle *lockh)
 {
         struct config_llog_data *cld = (struct config_llog_data *)data;
+        struct ldlm_enqueue_info einfo = { type, mode, mgc_blocking_ast,
+                ldlm_completion_ast, NULL, data};
+
         int rc;
         ENTRY;
 
@@ -609,10 +612,8 @@ static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
         /* We need a callback for every lockholder, so don't try to
            ldlm_lock_match (see rev 1.1.2.11.2.47) */
 
-        rc = ldlm_cli_enqueue(exp, NULL, &cld->cld_resid,
-                              type, NULL, mode, flags,
-                              mgc_blocking_ast, ldlm_completion_ast, NULL,
-                              data, NULL, 0, NULL, lockh, 0);
+        rc = ldlm_cli_enqueue(exp, NULL, &einfo, &cld->cld_resid,
+                              NULL, flags, NULL, 0, NULL, lockh, 0);
         /* A failed enqueue should still call the mgc_blocking_ast, 
            where it will be requeued if needed ("grant failed"). */ 
 
index 1dcb3e0..ef16000 100644 (file)
@@ -1102,7 +1102,7 @@ echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
         struct obd_device      *obd = exp->exp_obd;
         struct echo_client_obd *ec = &obd->u.echo_client;
         struct lustre_handle   *ulh = obdo_handle (oa);
-        struct obd_enqueue_info einfo = { 0 };
+        struct ldlm_enqueue_info einfo = { 0 };
         struct obd_info oinfo = { { { 0 } } };
         struct ec_object       *eco;
         struct ec_lock         *ecl;
@@ -1140,7 +1140,7 @@ echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
         oinfo.oi_policy = ecl->ecl_policy;
         oinfo.oi_lockh = &ecl->ecl_lock_handle;
         oinfo.oi_md = eco->eco_lsm;
-        rc = obd_enqueue(ec->ec_exp, &oinfo, &einfo);
+        rc = obd_enqueue(ec->ec_exp, &oinfo, &einfo, NULL);
         if (rc != 0)
                 goto failed_1;
 
index b53ea58..a3aa191 100644 (file)
@@ -1706,8 +1706,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
                          *
                          * Of course, this will all disappear when we switch to
                          * taking liblustre locks on the OST. */
-                        if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
-                                ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
+                        ldlm_res_lvbo_update(res, NULL, 0, 1);
                 }
                 RETURN(ELDLM_LOCK_ABORTED);
         }
@@ -1733,8 +1732,8 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
          * XXX nikita: situation when ldlm_server_glimpse_ast() failed before
          * sending ast is not handled. This can result in lost client writes.
          */
-        if (rc != 0 && ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
-                ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
+        if (rc != 0)
+                ldlm_res_lvbo_update(res, NULL, 0, 1);
 
         lock_res(res);
         *reply_lvb = *res_lvb;
@@ -3007,7 +3006,6 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
 {
         struct ldlm_res_id res_id = { .name = { oinfo->oi_oa->o_id, 0,
                                                 oinfo->oi_oa->o_gr, 0 } };
-        struct ldlm_valblock_ops *ns_lvbo;
         struct filter_mod_data *fmd;
         struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
@@ -3047,9 +3045,7 @@ int filter_setattr(struct obd_export *exp, struct obd_info *oinfo,
                                 &res_id, LDLM_EXTENT, 0);
 
         if (res != NULL) {
-                ns_lvbo = res->lr_namespace->ns_lvbo;
-                if (ns_lvbo && ns_lvbo->lvbo_update)
-                        rc = ns_lvbo->lvbo_update(res, NULL, 0, 0);
+                rc = ldlm_res_lvbo_update(res, NULL, 0, 0);
                 ldlm_resource_putref(res);
         }
 
index 8abd48d..6ca5e6f 100644 (file)
@@ -2779,7 +2779,7 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct obd_info *oinfo,
 static int osc_enqueue_interpret(struct ptlrpc_request *req,
                                  struct osc_enqueue_args *aa, int rc)
 {
-        int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
+        int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
         struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
         struct ldlm_lock *lock;
 
@@ -2790,7 +2790,7 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req,
         /* Complete obtaining the lock procedure. */
         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
                                    aa->oa_ei->ei_mode,
-                                   &aa->oa_ei->ei_flags,
+                                   &aa->oa_oi->oi_flags,
                                    &lsm->lsm_oinfo[0]->loi_lvb,
                                    sizeof(lsm->lsm_oinfo[0]->loi_lvb),
                                    lustre_swab_ost_lvb,
@@ -2817,13 +2817,14 @@ static int osc_enqueue_interpret(struct ptlrpc_request *req,
  * is excluded from the cluster -- such scenarious make the life difficult, so
  * release locks just after they are obtained. */
 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
-                       struct obd_enqueue_info *einfo)
+                       struct ldlm_enqueue_info *einfo,
+                       struct ptlrpc_request_set *rqset)
 {
         struct ldlm_res_id res_id = { .name = {0} };
         struct obd_device *obd = exp->exp_obd;
         struct ldlm_reply *rep;
         struct ptlrpc_request *req = NULL;
-        int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
+        int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
         int rc;
         ENTRY;
 
@@ -2841,12 +2842,12 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
 
         /* Next, search for already existing extent locks that will cover us */
         rc = ldlm_lock_match(obd->obd_namespace,
-                             einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
+                             oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
                              einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
                              oinfo->oi_lockh);
         if (rc == 1) {
                 osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
-                                        einfo->ei_flags);
+                                        oinfo->oi_flags);
                 if (intent) {
                         /* I would like to be able to ASSERT here that rss <=
                          * kms, but I can't, for reasons which are explained in
@@ -2857,7 +2858,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                 oinfo->oi_cb_up(oinfo, ELDLM_OK);
 
                 /* For async requests, decref the lock. */
-                if (einfo->ei_rqset)
+                if (rqset)
                         ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
 
                 RETURN(ELDLM_OK);
@@ -2877,7 +2878,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
 
         if (einfo->ei_mode == LCK_PR) {
                 rc = ldlm_lock_match(obd->obd_namespace,
-                                     einfo->ei_flags | LDLM_FL_LVB_READY,
+                                     oinfo->oi_flags | LDLM_FL_LVB_READY,
                                      &res_id, einfo->ei_type, &oinfo->oi_policy,
                                      LCK_PW, oinfo->oi_lockh);
                 if (rc == 1) {
@@ -2885,11 +2886,11 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                          * be more elegant than adding another parameter to
                          * lock_match.  I want a second opinion. */
                         /* addref the lock only if not async requests. */
-                        if (!einfo->ei_rqset)
+                        if (!rqset)
                                 ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
                         osc_set_data_with_check(oinfo->oi_lockh,
                                                 einfo->ei_cbdata,
-                                                einfo->ei_flags);
+                                                oinfo->oi_flags);
                         oinfo->oi_cb_up(oinfo, ELDLM_OK);
                         ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
                         RETURN(ELDLM_OK);
@@ -2914,18 +2915,15 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
         }
 
         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
-        einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
+        oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
 
-        rc = ldlm_cli_enqueue(exp, &req, &res_id, einfo->ei_type,
-                              &oinfo->oi_policy, einfo->ei_mode,
-                              &einfo->ei_flags, einfo->ei_cb_bl,
-                              einfo->ei_cb_cp, einfo->ei_cb_gl,
-                              einfo->ei_cbdata,
+        rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
+                              &oinfo->oi_policy, &oinfo->oi_flags,
                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
                               sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
                               lustre_swab_ost_lvb, oinfo->oi_lockh,
-                              einfo->ei_rqset ? 1 : 0);
-        if (einfo->ei_rqset) {
+                              rqset ? 1 : 0);
+        if (rqset) {
                 if (!rc) {
                         struct osc_enqueue_args *aa;
                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
@@ -2935,7 +2933,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
                         aa->oa_exp = exp;
 
                         req->rq_interpret_reply = osc_enqueue_interpret;
-                        ptlrpc_set_add_req(einfo->ei_rqset, req);
+                        ptlrpc_set_add_req(rqset, req);
                 } else if (intent) {
                         ptlrpc_req_finished(req);
                 }
index 3d3e732..5e6a6cc 100644 (file)
@@ -467,9 +467,10 @@ void lustre_assert_wire_constants(void)
         CLASSERT(OBD_CONNECT_QUOTA64 == 0x00080000ULL);
         CLASSERT(OBD_CONNECT_MDS_CAPA == 0x00100000ULL);
         CLASSERT(OBD_CONNECT_OSS_CAPA == 0x00200000ULL);
-        CLASSERT(OBD_CONNECT_MDS_MDS == 0x00400000ULL);
+        CLASSERT(OBD_CONNECT_CANCELSET == 0x00400000ULL);
         CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL);
-        CLASSERT(OBD_CONNECT_CANCELSET == 0x01000000ULL);
+        CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
+        CLASSERT(OBD_CONNECT_MDS_MDS == 0x02000000ULL);
 
         /* Checks for struct obdo */
         LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",
index ee6ebd7..c5a7a55 100755 (executable)
@@ -854,6 +854,19 @@ test_58() { # bug 11546
 }
 run_test 58 "Eviction in the middle of open RPC reply processing"
 
+test_59() { # bug 10589
+       zconf_mount `hostname` $MOUNT2 || error "Failed to mount $MOUNT2"
+#define OBD_FAIL_LDLM_CANCEL_EVICT_RACE  0x311
+       sysctl -w lustre.fail_loc=0x311
+       writes=`dd if=/dev/zero of=$DIR2/$tfile count=1 2>&1 | awk 'BEGIN { FS="+" } /out/ {print $1}'`
+       sysctl -w lustre.fail_loc=0
+       sync
+       zconf_umount `hostname` $DIR2 -f
+       reads=`dd if=$DIR/$tfile of=/dev/null 2>&1 | awk 'BEGIN { FS="+" } /in/ {print $1}'`
+       [ $reads -eq $writes ] || error "read" $reads "blocks, must be" $writes
+}
+run_test 59 "Read cancel race on client eviction"
+
 equals_msg `basename $0`: test complete, cleaning up
 check_and_cleanup_lustre
 [ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG || true
index 05bfe23..a1d5103 100644 (file)
@@ -4019,7 +4019,7 @@ test_119b() # bug 11737
 }
 run_test 119b "Sparse directIO read must return actual read amount"
 
-test_119a() {
+test_120a() {
         mkdir $DIR/$tdir
         cancel_lru_locks mdc
         stat $DIR/$tdir > /dev/null
@@ -4031,9 +4031,9 @@ test_119a() {
         [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
         [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
 }
-run_test 119a "Early Lock Cancel: mkdir test"
+run_test 120a "Early Lock Cancel: mkdir test"
 
-test_119b() {
+test_120b() {
         mkdir $DIR/$tdir
         cancel_lru_locks mdc
         stat $DIR/$tdir > /dev/null
@@ -4045,9 +4045,9 @@ test_119b() {
         [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
         [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
 }
-run_test 119b "Early Lock Cancel: create test"
+run_test 120b "Early Lock Cancel: create test"
 
-test_119c() {
+test_120c() {
         mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2
         touch $DIR/$tdir/d1/f1
         cancel_lru_locks mdc
@@ -4060,9 +4060,9 @@ test_119c() {
         [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
         [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
 }
-run_test 119c "Early Lock Cancel: link test"
+run_test 120c "Early Lock Cancel: link test"
 
-test_119d() {
+test_120d() {
         touch $DIR/$tdir
         cancel_lru_locks mdc
         stat $DIR/$tdir > /dev/null
@@ -4074,9 +4074,9 @@ test_119d() {
         [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
         [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
 }
-run_test 119d "Early Lock Cancel: setattr test"
+run_test 120d "Early Lock Cancel: setattr test"
 
-test_119e() {
+test_120e() {
         mkdir $DIR/$tdir
         dd if=/dev/zero of=$DIR/$tdir/f1 count=1
         cancel_lru_locks mdc
@@ -4091,9 +4091,9 @@ test_119e() {
         [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
         [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
 }
-run_test 119e "Early Lock Cancel: unlink test"
+run_test 120e "Early Lock Cancel: unlink test"
 
-test_119f() {
+test_120f() {
         mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2
         dd if=/dev/zero of=$DIR/$tdir/d1/f1 count=1
         dd if=/dev/zero of=$DIR/$tdir/d2/f2 count=1
@@ -4110,9 +4110,9 @@ test_119f() {
         [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
         [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
 }
-run_test 119f "Early Lock Cancel: rename test"
+run_test 120f "Early Lock Cancel: rename test"
 
-test_119g() {
+test_120g() {
         count=10000
         echo create $count files
         mkdir  $DIR/$tdir
@@ -4139,7 +4139,20 @@ test_119g() {
         sleep 2
         # wait for commitment of removal
 }
-run_test 119g "Early Lock Cancel: performance test"
+run_test 120g "Early Lock Cancel: performance test"
+
+test_121() { #bug #10589
+       rm -rf $DIR/$tfile
+       writes=`dd if=/dev/zero of=$DIR/$tfile count=1 2>&1 | awk 'BEGIN { FS="+" } /out/ {print $1}'`
+#define OBD_FAIL_LDLM_CANCEL_RACE        0x310
+       sysctl -w lustre.fail_loc=0x310
+       cancel_lru_locks osc > /dev/null
+       reads=`dd if=$DIR/$tfile of=/dev/null 2>&1 | awk 'BEGIN { FS="+" } /in/ {print $1}'`
+       sysctl -w lustre.fail_loc=0
+       [ $reads -eq $writes ] || error "read" $reads "blocks, must be" $writes
+}
+run_test 121 "read cancel race ========="
+
 
 TMPDIR=$OLDTMPDIR
 TMP=$OLDTMP
index 4fd177b..ff3cde1 100644 (file)
@@ -180,6 +180,7 @@ static void check_obd_connect_data(void)
         CHECK_CDEFINE(OBD_CONNECT_OSS_CAPA);
         CHECK_CDEFINE(OBD_CONNECT_MDS_MDS);
         CHECK_CDEFINE(OBD_CONNECT_SOM);
+        CHECK_CDEFINE(OBD_CONNECT_AT);
         CHECK_CDEFINE(OBD_CONNECT_CANCELSET);
 }
 
index 66065c3..d7c2574 100644 (file)
@@ -483,9 +483,10 @@ void lustre_assert_wire_constants(void)
         CLASSERT(OBD_CONNECT_QUOTA64 == 0x00080000ULL);
         CLASSERT(OBD_CONNECT_MDS_CAPA == 0x00100000ULL);
         CLASSERT(OBD_CONNECT_OSS_CAPA == 0x00200000ULL);
-        CLASSERT(OBD_CONNECT_MDS_MDS == 0x00400000ULL);
+        CLASSERT(OBD_CONNECT_CANCELSET == 0x00400000ULL);
         CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL);
-        CLASSERT(OBD_CONNECT_CANCELSET == 0x01000000ULL);
+        CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
+        CLASSERT(OBD_CONNECT_MDS_MDS == 0x02000000ULL);
 
         /* Checks for struct obdo */
         LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",