#define OBD_CONNECT_QUOTA64 0x00080000ULL /* 64bit qunit_data.qd_count b=10707*/
#define OBD_CONNECT_MDS_CAPA 0x00100000ULL /* MDS capability */
#define OBD_CONNECT_OSS_CAPA 0x00200000ULL /* OSS capability */
-#define OBD_CONNECT_MDS_MDS 0x00400000ULL /* MDS-MDS connection*/
+#define OBD_CONNECT_CANCELSET 0x00400000ULL /* Early batched cancels. */
#define OBD_CONNECT_SOM 0x00800000ULL /* SOM feature */
-#define OBD_CONNECT_CANCELSET 0x01000000ULL /* Early batched cancels. */
+#define OBD_CONNECT_AT 0x01000000ULL /* client uses adaptive timeouts */
+#define OBD_CONNECT_MDS_MDS 0x02000000ULL /* MDS-MDS connection*/
#define OBD_CONNECT_REAL 0x00000200ULL /* real connection */
/* also update obd_connect_names[] for lprocfs_rd_connect_flags()
* and lustre/utils/wirecheck.c */
* w/o involving separate thread. in order to decrease cs rate */
#define LDLM_FL_ATOMIC_CB 0x4000000
+/* It may happen that a client initiate 2 operations, e.g. unlink and mkdir,
+ * such that server send blocking ast for conflict locks to this client for
+ * the 1st operation, whereas the 2nd operation has canceled this lock and
+ * is waiting for rpc_lock which is taken by the 1st operation.
+ * LDLM_FL_BL_AST is to be set by ldlm_callback_handler() to the lock not allow
+ * ELC code to cancel it.
+ * LDLM_FL_BL_DONE is to be set by ldlm_cancel_callback() when lock cache is
+ * droped to let ldlm_callback_handler() return EINVAL to the server. It is
+ * used when ELC rpc is already prepared and is waiting for rpc_lock, too late
+ * to send a separate CANCEL rpc. */
+#define LDLM_FL_BL_AST 0x10000000
+#define LDLM_FL_BL_DONE 0x20000000
+
/* Cancel lock asynchronously. See ldlm_cli_cancel_unused_resource. */
-#define LDLM_FL_ASYNC 0x20000000
+#define LDLM_FL_ASYNC 0x40000000
/* The blocking callback is overloaded to perform two functions. These flags
* indicate which operation should be performed. */
int w_datalen;
};
+/* ldlm_enqueue parameters common */
+struct ldlm_enqueue_info {
+ __u32 ei_type; /* Type of the lock being enqueued. */
+ __u32 ei_mode; /* Mode of the lock being enqueued. */
+ void *ei_cb_bl; /* Different callbacks for lock handling (blocking, */
+ void *ei_cb_cp; /* completion, glimpse) */
+ void *ei_cb_gl;
+ void *ei_cbdata; /* Data to be passed into callbacks. */
+};
+
extern struct obd_ops ldlm_obd_ops;
extern char *ldlm_lockname[];
void ldlm_resource_iterate(struct ldlm_namespace *, const struct ldlm_res_id *,
ldlm_iterator_t iter, void *data);
-
/* ldlm_flock.c */
int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data);
return __ldlm_handle2lock(h, 0);
}
+static inline int ldlm_res_lvbo_update(struct ldlm_resource *res,
+ struct lustre_msg *m, int buf_idx,
+ int increase)
+{
+ if (res->lr_namespace->ns_lvbo &&
+ res->lr_namespace->ns_lvbo->lvbo_update) {
+ return res->lr_namespace->ns_lvbo->lvbo_update(res, m, buf_idx,
+ increase);
+ }
+ return 0;
+}
+
#define LDLM_LOCK_PUT(lock) \
do { \
/*LDLM_DEBUG((lock), "put");*/ \
int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp);
int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data);
int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
+ struct ldlm_enqueue_info *einfo,
const struct ldlm_res_id *res_id,
- ldlm_type_t type, ldlm_policy_data_t *policy,
- ldlm_mode_t mode, int *flags,
- ldlm_blocking_callback blocking,
- ldlm_completion_callback completion,
- ldlm_glimpse_callback glimpse,
- void *data, void *lvb, __u32 lvb_len, void *lvb_swabber,
+ ldlm_policy_data_t *policy, int *flags,
+ void *lvb, __u32 lvb_len, void *lvb_swabber,
struct lustre_handle *lockh, int async);
struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
int bufcount, int *size,
int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id,
ldlm_policy_data_t *policy,
- int mode, int flags, void *opaque);
+ ldlm_mode_t mode, int flags, void *opaque);
int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *head,
int count, int flags);
int ldlm_cli_join_lru(struct ldlm_namespace *,
__u32 ur_mode;
__u32 ur_flags;
struct lvfs_grp_hash_entry *ur_grp_entry;
+ struct ldlm_request *ur_dlm;
};
/* file data for open files on MDS */
typedef int (*obd_enqueue_update_f)(struct obd_info *oinfo, int rc);
-/* obd_enqueue parameters common for all levels (lov, osc). */
-struct obd_enqueue_info {
- /* Flags used while lock handling. */
- int ei_flags;
- /* Type of the lock being enqueued. */
- __u32 ei_type;
- /* Mode of the lock being enqueued. */
- __u32 ei_mode;
- /* Different callbacks for lock handling (blocking, completion,
- glimpse */
- void *ei_cb_bl;
- void *ei_cb_cp;
- void *ei_cb_gl;
- /* Data to be passed into callbacks. */
- void *ei_cbdata;
- /* Request set for OSC async requests. */
- struct ptlrpc_request_set *ei_rqset;
-};
-
/* obd info for a particular level (lov, osc). */
struct obd_info {
/* Lock policy. It keeps an extent which is specific for a particular
* OSC. (e.g. lov_prep_enqueue_set initialises extent of the policy,
* and osc_enqueue passes it into ldlm_lock_match & ldlm_cli_enqueue. */
ldlm_policy_data_t oi_policy;
+ /* Flags used while lock handling. The flags obtained on the enqueue
+ * request are set here, therefore they are request specific. */
+ int oi_flags;
/* Lock handle specific for every OSC lock. */
struct lustre_handle *oi_lockh;
/* lsm data specific for every OSC. */
int niocount, struct niobuf_local *local,
struct obd_trans_info *oti, int rc);
int (*o_enqueue)(struct obd_export *, struct obd_info *oinfo,
- struct obd_enqueue_info *einfo);
+ struct ldlm_enqueue_info *einfo,
+ struct ptlrpc_request_set *rqset);
int (*o_match)(struct obd_export *, struct lov_stripe_md *, __u32 type,
ldlm_policy_data_t *, __u32 mode, int *flags, void *data,
struct lustre_handle *lockh);
__u64, struct ptlrpc_request **);
int (*m_done_writing)(struct obd_export *, struct md_op_data *,
struct obd_client_handle *);
- int (*m_enqueue)(struct obd_export *, int, struct lookup_intent *,
- int, struct md_op_data *, struct lustre_handle *,
- void *, int, ldlm_completion_callback,
- ldlm_blocking_callback, void *, int);
+ int (*m_enqueue)(struct obd_export *, struct ldlm_enqueue_info *,
+ struct lookup_intent *, struct md_op_data *,
+ struct lustre_handle *, void *, int, int);
int (*m_getattr)(struct obd_export *, const struct lu_fid *,
struct obd_capa *, obd_valid, int,
struct ptlrpc_request **);
static inline int obd_enqueue_rqset(struct obd_export *exp,
struct obd_info *oinfo,
- struct obd_enqueue_info *einfo)
+ struct ldlm_enqueue_info *einfo)
{
+ struct ptlrpc_request_set *set = NULL;
int rc;
ENTRY;
EXP_CHECK_DT_OP(exp, enqueue);
EXP_COUNTER_INCREMENT(exp, enqueue);
- einfo->ei_rqset = ptlrpc_prep_set();
- if (einfo->ei_rqset == NULL)
+ set = ptlrpc_prep_set();
+ if (set == NULL)
RETURN(-ENOMEM);
- rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo);
+ rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo, set);
if (rc == 0)
- rc = ptlrpc_set_wait(einfo->ei_rqset);
- ptlrpc_set_destroy(einfo->ei_rqset);
- einfo->ei_rqset = NULL;
-
+ rc = ptlrpc_set_wait(set);
+ ptlrpc_set_destroy(set);
RETURN(rc);
}
static inline int obd_enqueue(struct obd_export *exp,
struct obd_info *oinfo,
- struct obd_enqueue_info *einfo)
+ struct ldlm_enqueue_info *einfo,
+ struct ptlrpc_request_set *set)
{
int rc;
ENTRY;
EXP_CHECK_DT_OP(exp, enqueue);
EXP_COUNTER_INCREMENT(exp, enqueue);
- rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo);
+ rc = OBP(exp->exp_obd, enqueue)(exp, oinfo, einfo, set);
RETURN(rc);
}
RETURN(rc);
}
-static inline int md_enqueue(struct obd_export *exp, int lock_type,
- struct lookup_intent *it, int lock_mode,
+static inline int md_enqueue(struct obd_export *exp,
+ struct ldlm_enqueue_info *einfo,
+ struct lookup_intent *it,
struct md_op_data *op_data,
struct lustre_handle *lockh,
void *lmm, int lmmsize,
- ldlm_completion_callback cb_completion,
- ldlm_blocking_callback cb_blocking,
- void *cb_data, int extra_lock_flags)
+ int extra_lock_flags)
{
int rc;
ENTRY;
EXP_CHECK_MD_OP(exp, enqueue);
EXP_MD_COUNTER_INCREMENT(exp, enqueue);
- rc = MDP(exp->exp_obd, enqueue)(exp, lock_type, it, lock_mode,
- op_data, lockh, lmm, lmmsize,
- cb_completion, cb_blocking,
- cb_data, extra_lock_flags);
+ rc = MDP(exp->exp_obd, enqueue)(exp, einfo, it, op_data, lockh,
+ lmm, lmmsize, extra_lock_flags);
RETURN(rc);
}
struct osc_enqueue_args {
struct obd_export *oa_exp;
struct obd_info *oa_oi;
- struct obd_enqueue_info *oa_ei;
+ struct ldlm_enqueue_info*oa_ei;
};
#endif
#define OBD_FAIL_LDLM_RECOV_CLIENTS 0x30d
#define OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT 0x30e
#define OBD_FAIL_LDLM_GLIMPSE 0x30f
+#define OBD_FAIL_LDLM_CANCEL_RACE 0x310
+#define OBD_FAIL_LDLM_CANCEL_EVICT_RACE 0x311
#define OBD_FAIL_OSC 0x400
#define OBD_FAIL_OSC_BRW_READ_BULK 0x401
ldlm_resource_unlink_lock(lock);
- ldlm_extent_policy(res, lock, flags);
+ if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_EVICT_RACE))
+ ldlm_extent_policy(res, lock, flags);
ldlm_grant_lock(lock, work_list);
RETURN(LDLM_ITER_CONTINUE);
}
LDLM_DEBUG(lock, "no blocking ast");
}
}
+ lock->l_flags |= LDLM_FL_BL_DONE;
}
void ldlm_unlink_lock_skiplist(struct ldlm_lock *req)
spin_unlock(&exp->exp_ldlm_data.led_lock);
LDLM_DEBUG(lock, "export %p", exp);
+ ldlm_res_lvbo_update(res, NULL, 0, 1);
+
ldlm_lock_cancel(lock);
ldlm_reprocess_all(res);
rc = ptlrpc_queue_wait(req);
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
}
-
- if (rc != 0)
+ if (rc != 0) {
+ /* If client canceled the lock but the cancel has not been
+ * recieved yet, we need to update lvbo to have the proper
+ * attributes cached. */
+ if (rc == -EINVAL)
+ ldlm_res_lvbo_update(lock->l_resource, NULL, 0, 1);
rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
+ }
ptlrpc_req_finished(req);
else if (rc != 0)
rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
else
- rc = res->lr_namespace->ns_lvbo->lvbo_update
- (res, req->rq_repmsg, REPLY_REC_OFF, 1);
+ rc = ldlm_res_lvbo_update(res, req->rq_repmsg,
+ REPLY_REC_OFF, 1);
ptlrpc_req_finished(req);
RETURN(rc);
}
LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
"(err=%d, rc=%d)", err, rc);
+ lock_res_and_lock(lock);
if (rc == 0) {
- lock_res_and_lock(lock);
size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
if (size[DLM_REPLY_REC_OFF] > 0) {
void *lvb = lustre_msg_buf(req->rq_repmsg,
memcpy(lvb, lock->l_resource->lr_lvb_data,
size[DLM_REPLY_REC_OFF]);
}
- unlock_res_and_lock(lock);
} else {
- lock_res_and_lock(lock);
ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy_nolock(lock);
- unlock_res_and_lock(lock);
}
+ unlock_res_and_lock(lock);
if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
ldlm_reprocess_all(lock->l_resource);
int i, count, done = 0;
ENTRY;
+ LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, "
+ "starting at %d", dlm_req->lock_count, first);
count = dlm_req->lock_count ? dlm_req->lock_count : 1;
if (first >= count)
RETURN(0);
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
RETURN(0);
- LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks",
- count - first);
for (i = first; i < count; i++) {
lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
if (!lock) {
res = lock->l_resource;
done++;
- ldlm_lock_cancel(lock);
- if (ldlm_del_waiting_lock(lock))
- CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
if (res != pres) {
if (pres != NULL) {
- if (pres->lr_namespace->ns_lvbo &&
- pres->lr_namespace->ns_lvbo->lvbo_update) {
- (void)pres->lr_namespace->ns_lvbo->
- lvbo_update(pres, NULL, 0, 1);
- }
ldlm_reprocess_all(pres);
ldlm_resource_putref(pres);
}
- if (res != NULL)
+ if (res != NULL) {
ldlm_resource_getref(res);
+ ldlm_res_lvbo_update(res, NULL, 0, 1);
+ }
pres = res;
}
+ ldlm_lock_cancel(lock);
LDLM_LOCK_PUT(lock);
}
if (pres != NULL) {
- if (pres->lr_namespace->ns_lvbo &&
- pres->lr_namespace->ns_lvbo->lvbo_update) {
- (void)pres->lr_namespace->ns_lvbo->
- lvbo_update(pres, NULL, 0, 1);
- }
ldlm_reprocess_all(pres);
ldlm_resource_putref(pres);
}
lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle[0]);
if (!lock) {
- CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
- dlm_req->lock_handle[0].cookie);
+ CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
+ "disappeared\n", dlm_req->lock_handle[0].cookie);
ldlm_callback_reply(req, -EINVAL);
RETURN(0);
}
/* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
lock_res_and_lock(lock);
lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
+ if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
+ /* If somebody cancels locks and cache is already droped,
+ * we can tell the server we have no lock. Otherwise, we
+ * should send cancel after dropping the cache. */
+ if ((lock->l_flags & LDLM_FL_CANCELING) &&
+ (lock->l_flags & LDLM_FL_BL_DONE)) {
+ LDLM_DEBUG(lock, "callback on lock "
+ LPX64" - lock disappeared\n",
+ dlm_req->lock_handle[0].cookie);
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_PUT(lock);
+ ldlm_callback_reply(req, -EINVAL);
+ RETURN(0);
+ }
+ lock->l_flags |= LDLM_FL_BL_AST;
+ }
unlock_res_and_lock(lock);
/* We want the ost thread to get this reply so that it can respond
})
/* Cancel lru locks and pack them into the enqueue request. Pack there the given
- * @count locks in @cancel. */
+ * @count locks in @cancels. */
struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
int bufcount, int *size,
struct list_head *cancels,
* request was created in ldlm_cli_enqueue and it is the async request,
* pass it to the caller in @reqp. */
int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
+ struct ldlm_enqueue_info *einfo,
const struct ldlm_res_id *res_id,
- ldlm_type_t type, ldlm_policy_data_t *policy,
- ldlm_mode_t mode, int *flags,
- ldlm_blocking_callback blocking,
- ldlm_completion_callback completion,
- ldlm_glimpse_callback glimpse,
- void *data, void *lvb, __u32 lvb_len, void *lvb_swabber,
+ ldlm_policy_data_t *policy, int *flags,
+ void *lvb, __u32 lvb_len, void *lvb_swabber,
struct lustre_handle *lockh, int async)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
LDLM_DEBUG(lock, "client-side enqueue START");
LASSERT(exp == lock->l_conn_export);
} else {
- lock = ldlm_lock_create(ns, res_id, type, mode, blocking,
- completion, glimpse, data, lvb_len);
+ lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
+ einfo->ei_mode, einfo->ei_cb_bl,
+ einfo->ei_cb_cp, einfo->ei_cb_gl,
+ einfo->ei_cbdata, lvb_len);
if (lock == NULL)
RETURN(-ENOMEM);
/* for the local lock, add the reference */
- ldlm_lock_addref_internal(lock, mode);
+ ldlm_lock_addref_internal(lock, einfo->ei_mode);
ldlm_lock2handle(lock, lockh);
lock->l_lvb_swabber = lvb_swabber;
if (policy != NULL) {
* descriptor (ldlm_lock2desc() below) but use an
* inodebits lock internally with both bits set.
*/
- if (type == LDLM_IBITS && !(exp->exp_connect_flags &
- OBD_CONNECT_IBITS))
+ if (einfo->ei_type == LDLM_IBITS &&
+ !(exp->exp_connect_flags & OBD_CONNECT_IBITS))
lock->l_policy_data.l_inodebits.bits =
MDS_INODELOCK_LOOKUP |
MDS_INODELOCK_UPDATE;
lock->l_policy_data = *policy;
}
- if (type == LDLM_EXTENT)
+ if (einfo->ei_type == LDLM_EXTENT)
lock->l_req_extent = policy->l_extent;
LDLM_DEBUG(lock, "client-side enqueue START");
}
if (reqp == NULL || *reqp == NULL) {
req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
if (req == NULL) {
- failed_lock_cleanup(ns, lock, lockh, mode);
+ failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
LDLM_LOCK_PUT(lock);
RETURN(-ENOMEM);
}
lock->l_conn_export = exp;
lock->l_export = NULL;
- lock->l_blocking_ast = blocking;
+ lock->l_blocking_ast = einfo->ei_cb_bl;
/* Dump lock data into the request buffer */
body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
* where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
* [i_size, OBD_OBJECT_EOF] lock is taken.
*/
- LASSERT(ergo(LIBLUSTRE_CLIENT, type != LDLM_EXTENT ||
+ LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
policy->l_extent.end == OBD_OBJECT_EOF));
if (async) {
LDLM_DEBUG(lock, "sending request");
rc = ptlrpc_queue_wait(req);
- err = ldlm_cli_enqueue_fini(exp, req, type, policy ? 1 : 0,
- mode, flags, lvb, lvb_len, lvb_swabber,
- lockh, rc);
+ err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
+ einfo->ei_mode, flags, lvb, lvb_len,
+ lvb_swabber, lockh, rc);
/* If ldlm_cli_enqueue_fini did not find the lock, we need to free
* one reference that we took */
}
/* Cancel locks locally.
- * Returns: 1 if there is a need to send a cancel RPC to server. 0 otherwise. */
+ * Returns:
+ * LDLM_FL_LOCAL_ONLY if tere is no need in a CANCEL rpc to the server;
+ * LDLM_FL_CANCELING otherwise;
+ * LDLM_FL_BL_AST if there is a need in a separate CANCEL rpc. */
static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
{
- int rc = 0;
+ int rc = LDLM_FL_LOCAL_ONLY;
ENTRY;
if (lock->l_conn_export) {
local_only = (lock->l_flags &
(LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
ldlm_cancel_callback(lock);
+ rc = (lock->l_flags & LDLM_FL_BL_AST) ?
+ LDLM_FL_BL_AST : LDLM_FL_CANCELING;
unlock_res_and_lock(lock);
- if (local_only)
- CDEBUG(D_INFO, "not sending request (at caller's "
+ if (local_only) {
+ CDEBUG(D_DLMTRACE, "not sending request (at caller's "
"instruction)\n");
- else
- rc = 1;
-
+ rc = LDLM_FL_LOCAL_ONLY;
+ }
ldlm_lock_cancel(lock);
} else {
if (lock->l_resource->lr_namespace->ns_client) {
{
struct ldlm_request *dlm;
struct ldlm_lock *lock;
- int max;
+ int max, packed = 0;
ENTRY;
dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm));
list_for_each_entry(lock, head, l_bl_ast) {
if (!count--)
break;
- /* Pack the lock handle to the given request buffer. */
LASSERT(lock->l_conn_export);
- /* Cannot be set on a lock in a resource granted list.*/
- LASSERT(!(lock->l_flags &
- (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)));
- /* If @lock is marked CANCEL_ON_BLOCK, cancel
- * will not be sent in ldlm_cli_cancel(). It
- * is used for liblustre clients, no cancel on
- * block requests. However, even for liblustre
- * clients, when the flag is set, batched cancel
- * should be sent (what if no block rpc has
- * come). To not send another separated rpc in
- * this case, the caller pass CANCEL_ON_BLOCK
- * flag to ldlm_cli_cancel_unused_resource(). */
+ /* Pack the lock handle to the given request buffer. */
+ LDLM_DEBUG(lock, "packing");
dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
+ packed++;
}
+ CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
EXIT;
}
LASSERT(exp != NULL);
LASSERT(count > 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
+ RETURN(count);
+
free = ldlm_req_handles_avail(exp, size, 2, 0);
if (count > free)
count = free;
/* concurrent cancels on the same handle can happen */
lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING);
- if (lock == NULL)
+ if (lock == NULL) {
+ LDLM_DEBUG_NOLOCK("lock is already being destroyed\n");
RETURN(0);
-
+ }
+
rc = ldlm_cli_cancel_local(lock);
- if (rc <= 0)
+ if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY)
GOTO(out, rc);
list_add(&lock->l_bl_ast, &head);
spin_lock(&ns->ns_unused_lock);
count += ns->ns_nr_unused - ns->ns_max_unused;
while (!list_empty(&ns->ns_unused_list)) {
- struct list_head *tmp = ns->ns_unused_list.next;
- lock = list_entry(tmp, struct ldlm_lock, l_lru);
-
if (max && added >= max)
break;
+ list_for_each_entry(lock, &ns->ns_unused_list, l_lru) {
+ /* somebody is already doing CANCEL or there is a
+ * blocking request will send cancel. */
+ if (!(lock->l_flags & LDLM_FL_CANCELING) &&
+ !(lock->l_flags & LDLM_FL_BL_AST))
+ break;
+ }
+ if (&lock->l_lru == &ns->ns_unused_list)
+ break;
+
if ((added >= count) &&
(!(flags & LDLM_CANCEL_AGED) ||
- cfs_time_before_64(cur, ns->ns_max_age +
+ cfs_time_before_64(cur, (__u64)ns->ns_max_age +
lock->l_last_used)))
break;
spin_unlock(&ns->ns_unused_lock);
lock_res_and_lock(lock);
- if ((ldlm_lock_remove_from_lru(lock) == 0) ||
- (lock->l_flags & LDLM_FL_CANCELING)) {
+ /* Check flags again under the lock. */
+ if ((lock->l_flags & LDLM_FL_CANCELING) ||
+ (lock->l_flags & LDLM_FL_BL_AST) ||
+ (ldlm_lock_remove_from_lru(lock) == 0)) {
/* other thread is removing lock from lru or
- * somebody is already doing CANCEL. */
+ * somebody is already doing CANCEL or
+ * there is a blocking request which will send
+ * cancel by itseft. */
unlock_res_and_lock(lock);
LDLM_LOCK_PUT(lock);
spin_lock(&ns->ns_unused_lock);
list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
if (left-- == 0)
break;
+
rc = ldlm_cli_cancel_local(lock);
- if (rc == 0) {
+ if (rc == LDLM_FL_BL_AST) {
+ CFS_LIST_HEAD(head);
+
+ LDLM_DEBUG(lock, "Cancel lock separately");
+ list_del_init(&lock->l_bl_ast);
+ list_add(&lock->l_bl_ast, &head);
+ ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
+ rc = LDLM_FL_LOCAL_ONLY;
+ }
+ if (rc == LDLM_FL_LOCAL_ONLY) {
/* CANCEL RPC should not be sent to server. */
list_del_init(&lock->l_bl_ast);
LDLM_LOCK_PUT(lock);
added--;
}
+
}
RETURN(added);
}
continue;
}
+ /* If somebody is already doing CANCEL, or blocking ast came,
+ * skip this lock. */
+ if (lock->l_flags & LDLM_FL_BL_AST ||
+ lock->l_flags & LDLM_FL_CANCELING)
+ continue;
+
if (lockmode_compat(lock->l_granted_mode, mode))
continue;
policy->l_inodebits.bits))
continue;
- /* If somebody is already doing CANCEL, skip it. */
- if (lock->l_flags & LDLM_FL_CANCELING)
- continue;
-
/* See CBPENDING comment in ldlm_cancel_lru */
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
lock_flags;
/* Handle only @count inserted locks. */
left = count;
list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
- int rc = 0;
+ int rc = LDLM_FL_LOCAL_ONLY;
if (left-- == 0)
break;
else
rc = ldlm_cli_cancel_local(lock);
- if (rc == 0) {
+ if (rc == LDLM_FL_BL_AST) {
+ CFS_LIST_HEAD(head);
+
+ LDLM_DEBUG(lock, "Cancel lock separately");
+ list_del_init(&lock->l_bl_ast);
+ list_add(&lock->l_bl_ast, &head);
+ ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
+ rc = LDLM_FL_LOCAL_ONLY;
+ }
+ if (rc == LDLM_FL_LOCAL_ONLY) {
/* CANCEL RPC should not be sent to server. */
list_del_init(&lock->l_bl_ast);
LDLM_LOCK_PUT(lock);
int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id,
ldlm_policy_data_t *policy,
- int mode, int flags, void *opaque)
+ ldlm_mode_t mode, int flags, void *opaque)
{
struct ldlm_resource *res;
CFS_LIST_HEAD(cancels);
rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
&res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
if (!rc) {
+ struct ldlm_enqueue_info einfo = {LDLM_IBITS, LCK_CR,
+ llu_md_blocking_ast, ldlm_completion_ast, NULL, inode};
+
llu_prep_md_op_data(&op_data, inode, NULL, NULL, 0, 0,
LUSTRE_OPC_ANY);
- rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, &it, LCK_CR,
+ rc = md_enqueue(sbi->ll_md_exp, &einfo, &it,
&op_data, &lockh, NULL, 0,
- ldlm_completion_ast, llu_md_blocking_ast,
- inode, LDLM_FL_CANCEL_ON_BLOCK);
+ LDLM_FL_CANCEL_ON_BLOCK);
request = (struct ptlrpc_request *)it.d.lustre.it_data;
if (request)
ptlrpc_req_finished(request);
struct intnl_stat *st = llu_i2stat(inode);
struct llu_sb_info *sbi = llu_i2sbi(inode);
struct lustre_handle lockh = { 0 };
- struct obd_enqueue_info einfo = { 0 };
+ struct ldlm_enqueue_info einfo = { 0 };
struct obd_info oinfo = { { { 0 } } };
int rc;
ENTRY;
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = LCK_PR;
- einfo.ei_flags = LDLM_FL_HAS_INTENT;
einfo.ei_cb_bl = llu_extent_lock_callback;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = llu_glimpse_callback;
oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
oinfo.oi_lockh = &lockh;
oinfo.oi_md = lli->lli_smd;
+ oinfo.oi_flags = LDLM_FL_HAS_INTENT;
rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
if (rc) {
{
struct llu_sb_info *sbi = llu_i2sbi(inode);
struct intnl_stat *st = llu_i2stat(inode);
- struct obd_enqueue_info einfo = { 0 };
+ struct ldlm_enqueue_info einfo = { 0 };
struct obd_info oinfo = { { { 0 } } };
struct ost_lvb lvb;
int rc;
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = mode;
- einfo.ei_flags = ast_flags;
einfo.ei_cb_bl = llu_extent_lock_callback;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = llu_glimpse_callback;
oinfo.oi_policy = *policy;
oinfo.oi_lockh = lockh;
oinfo.oi_md = lsm;
+ oinfo.oi_flags = ast_flags;
- rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo);
+ rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
*policy = oinfo.oi_policy;
if (rc > 0)
rc = -EIO;
fid_oid(&lli->lli_fid),
fid_ver(&lli->lli_fid),
LDLM_FLOCK} };
+ struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
+ ldlm_flock_completion_ast, NULL, file_lock };
+
struct lustre_handle lockh = {0};
ldlm_policy_data_t flock;
- ldlm_mode_t mode = 0;
int flags = 0;
int rc;
switch (file_lock->fl_type) {
case F_RDLCK:
- mode = LCK_PR;
+ einfo.ei_mode = LCK_PR;
break;
case F_UNLCK:
- mode = LCK_NL;
+ einfo.ei_mode = LCK_NL;
break;
case F_WRLCK:
- mode = LCK_PW;
+ einfo.ei_mode = LCK_PW;
break;
default:
CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
#endif
#endif
flags = LDLM_FL_TEST_LOCK;
- file_lock->fl_type = mode;
+ file_lock->fl_type = einfo.ei_mode;
break;
default:
CERROR("unknown fcntl cmd: %d\n", cmd);
CDEBUG(D_DLMTRACE, "inode=%llu, pid=%u, flags=%#x, mode=%u, "
"start="LPU64", end="LPU64"\n", (unsigned long long)st->st_ino,
- flock.l_flock.pid, flags, mode, flock.l_flock.start,
+ flock.l_flock.pid, flags, einfo.ei_mode, flock.l_flock.start,
flock.l_flock.end);
- rc = ldlm_cli_enqueue(llu_i2mdcexp(ino), NULL, &res_id,
- LDLM_FLOCK, &flock, mode, &flags, NULL,
- ldlm_flock_completion_ast, NULL,
- file_lock, NULL, 0, NULL, &lockh, 0);
+ rc = ldlm_cli_enqueue(llu_i2mdcexp(ino), NULL, &einfo, &res_id,
+ &flock, &flags, NULL, 0, NULL, &lockh, 0);
RETURN(rc);
}
struct llu_inode_info *lli2 = NULL;
struct lov_stripe_md *lsm;
struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
+ struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CR,
+ llu_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
+
struct ptlrpc_request *req = NULL;
struct lustre_md md;
struct md_op_data data;
llu_prep_md_op_data(&data, NULL, ino, NULL, 0, O_RDWR,
LUSTRE_OPC_ANY);
- rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, &oit, LCK_CR, &data,
- &lockh, lum, lum_size, ldlm_completion_ast,
- llu_md_blocking_ast, NULL, LDLM_FL_INTENT_ONLY);
+ rc = md_enqueue(sbi->ll_md_exp, &einfo, &oit, &data,
+ &lockh, lum, lum_size, LDLM_FL_INTENT_ONLY);
if (rc)
GOTO(out, rc);
rc = md_lock_match(ll_i2sbi(dir)->ll_md_exp, LDLM_FL_BLOCK_GRANTED,
ll_inode2fid(dir), LDLM_IBITS, &policy, mode, &lockh);
if (!rc) {
+ struct ldlm_enqueue_info einfo = { LDLM_IBITS, mode,
+ ll_md_blocking_ast, ldlm_completion_ast, NULL, dir };
struct lookup_intent it = { .it_op = IT_READDIR };
struct ptlrpc_request *request;
struct md_op_data *op_data;
if (IS_ERR(op_data))
return (void *)op_data;
- rc = md_enqueue(ll_i2sbi(dir)->ll_md_exp, LDLM_IBITS, &it,
- mode, op_data, &lockh, NULL, 0,
- ldlm_completion_ast, ll_md_blocking_ast, dir,
- 0);
+ rc = md_enqueue(ll_i2sbi(dir)->ll_md_exp, &einfo, &it,
+ op_data, &lockh, NULL, 0, 0);
ll_finish_md_op_data(op_data);
lstat_t *st)
{
struct lustre_handle lockh = { 0 };
- struct obd_enqueue_info einfo = { 0 };
+ struct ldlm_enqueue_info einfo = { 0 };
struct obd_info oinfo = { { { 0 } } };
struct ost_lvb lvb;
int rc;
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = LCK_PR;
- einfo.ei_flags = LDLM_FL_HAS_INTENT;
einfo.ei_cb_bl = ll_extent_lock_callback;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = ll_glimpse_callback;
oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
oinfo.oi_lockh = &lockh;
oinfo.oi_md = lsm;
+ oinfo.oi_flags = LDLM_FL_HAS_INTENT;
rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
if (rc == -ENOENT)
struct ll_inode_info *lli = ll_i2info(inode);
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct lustre_handle lockh = { 0 };
- struct obd_enqueue_info einfo = { 0 };
+ struct ldlm_enqueue_info einfo = { 0 };
struct obd_info oinfo = { { { 0 } } };
int rc;
ENTRY;
* acquired only if there were no conflicting locks. */
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = LCK_PR;
- einfo.ei_flags = ast_flags | LDLM_FL_HAS_INTENT;
einfo.ei_cb_bl = ll_extent_lock_callback;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = ll_glimpse_callback;
oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;
oinfo.oi_lockh = &lockh;
oinfo.oi_md = lli->lli_smd;
+ oinfo.oi_flags = ast_flags | LDLM_FL_HAS_INTENT;
rc = obd_enqueue_rqset(sbi->ll_dt_exp, &oinfo, &einfo);
if (rc == -ENOENT)
{
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ost_lvb lvb;
- struct obd_enqueue_info einfo = { 0 };
+ struct ldlm_enqueue_info einfo = { 0 };
struct obd_info oinfo = { { { 0 } } };
int rc;
ENTRY;
einfo.ei_type = LDLM_EXTENT;
einfo.ei_mode = mode;
- einfo.ei_flags = ast_flags;
einfo.ei_cb_bl = ll_extent_lock_callback;
einfo.ei_cb_cp = ldlm_completion_ast;
einfo.ei_cb_gl = ll_glimpse_callback;
oinfo.oi_policy = *policy;
oinfo.oi_lockh = lockh;
oinfo.oi_md = lsm;
+ oinfo.oi_flags = ast_flags;
- rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo);
+ rc = obd_enqueue(sbi->ll_dt_exp, &oinfo, &einfo, NULL);
*policy = oinfo.oi_policy;
if (rc > 0)
rc = -EIO;
struct dentry *tail_dentry = tail_filp->f_dentry;
struct lookup_intent oit = {.it_op = IT_OPEN,
.it_flags = head_filp->f_flags|O_JOIN_FILE};
+ struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_CW,
+ ll_md_blocking_ast, ldlm_completion_ast, NULL, NULL };
+
struct lustre_handle lockh;
struct md_op_data *op_data;
int rc;
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
- rc = md_enqueue(ll_i2mdexp(head_inode), LDLM_IBITS, &oit, LCK_CW,
- op_data, &lockh, NULL, 0, ldlm_completion_ast,
- ll_md_blocking_ast, NULL, 0);
+ rc = md_enqueue(ll_i2mdexp(head_inode), &einfo, &oit,
+ op_data, &lockh, NULL, 0, 0);
ll_finish_md_op_data(op_data);
if (rc < 0)
fid_oid(ll_inode2fid(inode)),
fid_ver(ll_inode2fid(inode)),
LDLM_FLOCK} };
+ struct ldlm_enqueue_info einfo = { LDLM_FLOCK, 0, NULL,
+ ldlm_flock_completion_ast, NULL, file_lock };
struct lustre_handle lockh = {0};
ldlm_policy_data_t flock;
- ldlm_mode_t mode = 0;
int flags = 0;
int rc;
ENTRY;
switch (file_lock->fl_type) {
case F_RDLCK:
- mode = LCK_PR;
+ einfo.ei_mode = LCK_PR;
break;
case F_UNLCK:
/* An unlock request may or may not have any relation to
* information that is given with a normal read or write record
* lock request. To avoid creating another ldlm unlock (cancel)
* message we'll treat a LCK_NL flock request as an unlock. */
- mode = LCK_NL;
+ einfo.ei_mode = LCK_NL;
break;
case F_WRLCK:
- mode = LCK_PW;
+ einfo.ei_mode = LCK_PW;
break;
default:
CERROR("unknown fcntl lock type: %d\n", file_lock->fl_type);
flags = LDLM_FL_TEST_LOCK;
/* Save the old mode so that if the mode in the lock changes we
* can decrement the appropriate reader or writer refcount. */
- file_lock->fl_type = mode;
+ file_lock->fl_type = einfo.ei_mode;
break;
default:
CERROR("unknown fcntl lock command: %d\n", cmd);
CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
"start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
- flags, mode, flock.l_flock.start, flock.l_flock.end);
+ flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
- rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &res_id,
- LDLM_FLOCK, &flock, mode, &flags, NULL,
- ldlm_flock_completion_ast, NULL, file_lock,
- NULL, 0, NULL, &lockh, 0);
+ rc = ldlm_cli_enqueue(sbi->ll_md_exp, NULL, &einfo, &res_id,
+ &flock, &flags, NULL, 0, NULL, &lockh, 0);
if ((file_lock->fl_flags & FL_FLOCK) && (rc == 0))
ll_flock_lock_file_wait(file, file_lock, (cmd == F_SETLKW));
#ifdef HAVE_F_OP_FLOCK
}
static int
-lmv_enqueue_slaves(struct obd_export *exp, int locktype,
- struct lookup_intent *it, int lockmode,
- struct md_op_data *op_data, struct lustre_handle *lockh,
- void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
- ldlm_blocking_callback cb_blocking, void *cb_data)
+lmv_enqueue_slaves(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+ struct lookup_intent *it, struct md_op_data *op_data,
+ struct lustre_handle *lockh, void *lmm, int lmmsize)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
if (tgt_exp == NULL)
continue;
- rc = md_enqueue(tgt_exp, locktype, it, lockmode, op_data2,
- lockh + i, lmm, lmmsize, cb_compl, cb_blocking,
- cb_data, 0);
+ rc = md_enqueue(tgt_exp, einfo, it, op_data2,
+ lockh + i, lmm, lmmsize, 0);
CDEBUG(D_OTHER, "take lock on slave "DFID" -> %d/%d\n",
PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
/* drop all taken locks */
while (--i >= 0) {
if (lockh[i].cookie)
- ldlm_lock_decref(lockh + i, lockmode);
+ ldlm_lock_decref(lockh + i, einfo->ei_mode);
lockh[i].cookie = 0;
}
}
}
static int
-lmv_enqueue_remote(struct obd_export *exp, int lock_type,
- struct lookup_intent *it, int lock_mode,
- struct md_op_data *op_data, struct lustre_handle *lockh,
- void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
- ldlm_blocking_callback cb_blocking, void *cb_data,
+lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+ struct lookup_intent *it, struct md_op_data *op_data,
+ struct lustre_handle *lockh, void *lmm, int lmmsize,
int extra_lock_flags)
{
struct ptlrpc_request *req = it->d.lustre.it_data;
rdata->op_fid1 = fid_copy;
rdata->op_bias = MDS_CROSS_REF;
- rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, rdata,
- lockh, lmm, lmmsize, cb_compl, cb_blocking,
- cb_data, extra_lock_flags);
+ rc = md_enqueue(tgt_exp, einfo, it, rdata, lockh,
+ lmm, lmmsize, extra_lock_flags);
OBD_FREE_PTR(rdata);
EXIT;
out:
}
static int
-lmv_enqueue(struct obd_export *exp, int lock_type,
- struct lookup_intent *it, int lock_mode,
- struct md_op_data *op_data, struct lustre_handle *lockh,
- void *lmm, int lmmsize, ldlm_completion_callback cb_compl,
- ldlm_blocking_callback cb_blocking, void *cb_data,
+lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+ struct lookup_intent *it, struct md_op_data *op_data,
+ struct lustre_handle *lockh, void *lmm, int lmmsize,
int extra_lock_flags)
{
struct obd_device *obd = exp->exp_obd;
RETURN(rc);
if (op_data->op_mea1 && it->it_op == IT_UNLINK) {
- rc = lmv_enqueue_slaves(exp, lock_type, it, lock_mode,
- op_data, lockh, lmm, lmmsize,
- cb_compl, cb_blocking, cb_data);
+ rc = lmv_enqueue_slaves(exp, einfo, it, op_data,
+ lockh, lmm, lmmsize);
RETURN(rc);
}
CDEBUG(D_OTHER, "ENQUEUE '%s' on "DFID"\n", LL_IT2STR(it),
PFID(&op_data->op_fid1));
- rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, op_data, lockh,
- lmm, lmmsize, cb_compl, cb_blocking, cb_data,
- extra_lock_flags);
+ rc = md_enqueue(tgt_exp, einfo, it, op_data, lockh,
+ lmm, lmmsize, extra_lock_flags);
if (rc == 0 && it->it_op == IT_OPEN)
- rc = lmv_enqueue_remote(exp, lock_type, it, lock_mode,
- op_data, lockh, lmm, lmmsize,
- cb_compl, cb_blocking, cb_data,
- extra_lock_flags);
+ rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
+ lmm, lmmsize, extra_lock_flags);
RETURN(rc);
}
st_fid = &obj->lo_inodes[i].li_fid;
if (tgt_exp != st_exp) {
rc = md_cancel_unused(st_exp, st_fid, &policy,
- mode, 0, NULL);
+ mode, LDLM_FL_ASYNC,
+ NULL);
if (rc)
break;
} else {
};
struct lov_request_set {
- struct obd_enqueue_info *set_ei;
+ struct ldlm_enqueue_info*set_ei;
struct obd_info *set_oi;
atomic_t set_refcount;
struct obd_export *set_exp;
obd_off end, struct lov_request_set **reqset);
int lov_fini_sync_set(struct lov_request_set *set);
int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
- struct obd_enqueue_info *einfo,
+ struct ldlm_enqueue_info *einfo,
struct lov_request_set **reqset);
-int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc);
+int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
+ struct ptlrpc_request_set *rqset);
int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
struct lov_stripe_md *lsm,
ldlm_policy_data_t *policy, __u32 mode,
{
struct lov_request_set *lovset = (struct lov_request_set *)data;
ENTRY;
- rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc);
+ rc = lov_fini_enqueue_set(lovset, lovset->set_ei->ei_mode, rc, rqset);
RETURN(rc);
}
static int lov_enqueue(struct obd_export *exp, struct obd_info *oinfo,
- struct obd_enqueue_info *einfo)
+ struct ldlm_enqueue_info *einfo,
+ struct ptlrpc_request_set *rqset)
{
struct lov_request_set *set;
struct lov_request *req;
ASSERT_LSM_MAGIC(oinfo->oi_md);
/* we should never be asked to replay a lock this way. */
- LASSERT((einfo->ei_flags & LDLM_FL_REPLAY) == 0);
+ LASSERT((oinfo->oi_flags & LDLM_FL_REPLAY) == 0);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
req = list_entry(pos, struct lov_request, rq_link);
rc = obd_enqueue(lov->lov_tgts[req->rq_idx]->ltd_exp,
- &req->rq_oi, einfo);
+ &req->rq_oi, einfo, rqset);
if (rc != ELDLM_OK)
GOTO(out, rc);
}
- if (einfo->ei_rqset && !list_empty(&einfo->ei_rqset->set_requests)) {
+ if (rqset && !list_empty(&rqset->set_requests)) {
LASSERT(rc == 0);
- LASSERT(einfo->ei_rqset->set_interpret == NULL);
- einfo->ei_rqset->set_interpret = lov_enqueue_interpret;
- einfo->ei_rqset->set_arg = (void *)set;
+ LASSERT(rqset->set_interpret == NULL);
+ rqset->set_interpret = lov_enqueue_interpret;
+ rqset->set_arg = (void *)set;
RETURN(rc);
}
out:
- rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc);
+ rc = lov_fini_enqueue_set(set, einfo->ei_mode, rc, rqset);
RETURN(rc);
}
ldlm_lock_allow_match(lock);
LDLM_LOCK_PUT(lock);
} else if ((rc == ELDLM_LOCK_ABORTED) &&
- (set->set_ei->ei_flags & LDLM_FL_HAS_INTENT)) {
+ (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
memset(lov_lockhp, 0, sizeof(*lov_lockhp));
lov_stripe_lock(set->set_oi->oi_md);
loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
/* The callback for osc_enqueue that updates lov info for every OSC request. */
static int cb_update_enqueue(struct obd_info *oinfo, int rc)
{
- struct obd_enqueue_info *einfo;
+ struct ldlm_enqueue_info *einfo;
struct lov_request *lovreq;
lovreq = container_of(oinfo, struct lov_request, rq_oi);
RETURN(rc);
}
-int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc)
+int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
+ struct ptlrpc_request_set *rqset)
{
int ret = 0;
ENTRY;
LASSERT(set->set_exp);
/* Do enqueue_done only for sync requests and if any request
* succeeded. */
- if (!set->set_ei->ei_rqset) {
+ if (!rqset) {
if (rc)
set->set_completes = 0;
ret = enqueue_done(set, mode);
}
int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
- struct obd_enqueue_info *einfo,
+ struct ldlm_enqueue_info *einfo,
struct lov_request_set **reqset)
{
struct lov_obd *lov = &exp->exp_obd->u.lov;
/* Set lov request specific parameters. */
req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
req->rq_oi.oi_cb_up = cb_update_enqueue;
+ req->rq_oi.oi_flags = oinfo->oi_flags;
LASSERT(req->rq_oi.oi_lockh);
*reqset = set;
RETURN(0);
out_set:
- lov_fini_enqueue_set(set, einfo->ei_mode, rc);
+ lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
RETURN(rc);
}
struct lookup_intent *, int,
struct ptlrpc_request **reqp,
ldlm_blocking_callback cb_blocking, int extra_lock_flags);
-int mdc_enqueue(struct obd_export *exp,
- int lock_type,
- struct lookup_intent *it,
- int lock_mode,
- struct md_op_data *op_data,
- struct lustre_handle *lockh,
- void *lmm,
- int lmmlen,
- ldlm_completion_callback cb_completion,
- ldlm_blocking_callback cb_blocking,
- void *cb_data, int extra_lock_flags);
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+ struct lookup_intent *it, struct md_op_data *op_data,
+ struct lustre_handle *lockh, void *lmm, int lmmlen,
+ int extra_lock_flags);
+
int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
struct list_head *cancels, ldlm_mode_t mode,
__u64 bits);
/* We always reserve enough space in the reply packet for a stripe MD, because
* we don't know in advance the file type. */
-int mdc_enqueue(struct obd_export *exp,
- int lock_type,
- struct lookup_intent *it,
- int lock_mode,
- struct md_op_data *op_data,
- struct lustre_handle *lockh,
- void *lmm,
- int lmmsize,
- ldlm_completion_callback cb_completion,
- ldlm_blocking_callback cb_blocking,
- void *cb_data, int extra_lock_flags)
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+ struct lookup_intent *it, struct md_op_data *op_data,
+ struct lustre_handle *lockh, void *lmm, int lmmsize,
+ int extra_lock_flags)
{
struct ptlrpc_request *req;
struct obd_device *obddev = class_exp2obd(exp);
[DLM_LOCKREPLY_OFF] = sizeof(*lockrep),
[DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
[DLM_REPLY_REC_OFF+1] = obddev->u.cli.
- cl_max_mds_easize };
+ cl_max_mds_easize,
+ 0, 0, 0 };
int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
int repbufcnt = 4, rc;
ENTRY;
- LASSERTF(lock_type == LDLM_IBITS, "lock type %d\n", lock_type);
-// LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
-// ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
+ LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
if (it->it_op & IT_OPEN) {
int do_join = !!(it->it_flags & O_JOIN_FILE);
* rpcs in flight counter */
mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
mdc_enter_request(&obddev->u.cli);
- rc = ldlm_cli_enqueue(exp, &req, &res_id, lock_type, &policy,
- lock_mode, &flags, cb_blocking, cb_completion,
- NULL, cb_data, NULL, 0, NULL, lockh, 0);
+ rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
+ 0, NULL, lockh, 0);
mdc_exit_request(&obddev->u.cli);
mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
/* This can go when we're sure that this can never happen */
LASSERT(rc != -ENOENT);
if (rc == ELDLM_LOCK_ABORTED) {
- lock_mode = 0;
+ einfo->ei_mode = 0;
memset(lockh, 0, sizeof(*lockh));
rc = 0;
} else if (rc != 0) {
/* If the server gave us back a different lock mode, we should
* fix up our variables. */
- if (lock->l_req_mode != lock_mode) {
+ if (lock->l_req_mode != einfo->ei_mode) {
ldlm_lock_addref(lockh, lock->l_req_mode);
- ldlm_lock_decref(lockh, lock_mode);
- lock_mode = lock->l_req_mode;
+ ldlm_lock_decref(lockh, einfo->ei_mode);
+ einfo->ei_mode = lock->l_req_mode;
}
LDLM_LOCK_PUT(lock);
}
it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
- it->d.lustre.it_lock_mode = lock_mode;
+ it->d.lustre.it_lock_mode = einfo->ei_mode;
it->d.lustre.it_data = req;
if (it->d.lustre.it_status < 0 && req->rq_replay)
* this and use the request from revalidate. In this case, revalidate
* never dropped its reference, so the refcounts are all OK */
if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
+ struct ldlm_enqueue_info einfo =
+ { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
+ ldlm_completion_ast, NULL, NULL };
+
/* For case if upper layer did not alloc fid, do it now. */
if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
RETURN(rc);
}
}
-
- rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it),
- op_data, &lockh, lmm, lmmsize,
- ldlm_completion_ast, cb_blocking, NULL,
- extra_lock_flags);
+ rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
+ lmm, lmmsize, extra_lock_flags);
if (rc < 0)
RETURN(rc);
memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
r->ur_flags = rec->sa_attr_flags;
LASSERT_REQSWAB (req, offset + 1);
- if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 1) {
+ r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
+ if (r->ur_eadatalen) {
r->ur_eadata = lustre_msg_buf(req->rq_reqmsg, offset + 1, 0);
if (r->ur_eadata == NULL)
RETURN(-EFAULT);
- r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
}
-
- if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 2) {
+ r->ur_cookielen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
+ if (r->ur_cookielen) {
r->ur_logcookies = lustre_msg_buf(req->rq_reqmsg, offset + 2,0);
if (r->ur_eadata == NULL)
RETURN (-EFAULT);
-
- r->ur_cookielen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
}
-
+ if (lustre_msg_buflen(req->rq_reqmsg, offset + 3)) {
+ r->ur_dlm = lustre_swab_reqbuf(req, offset + 3,
+ sizeof(*r->ur_dlm),
+ lustre_swab_ldlm_request);
+ if (r->ur_dlm == NULL)
+ RETURN (-EFAULT);
+ }
RETURN(0);
}
r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
LASSERT_REQSWAB(req, offset + 2);
- if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 2) {
+ r->ur_tgtlen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
+ if (r->ur_tgtlen) {
/* NB for now, we only seem to pass NULL terminated symlink
* target strings here. If this ever changes, we'll have
* to stop checking for a buffer filled completely with a
r->ur_tgt = lustre_msg_string(req->rq_reqmsg, offset + 2, 0);
if (r->ur_tgt == NULL)
RETURN (-EFAULT);
- r->ur_tgtlen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
+ }
+ if (lustre_msg_buflen(req->rq_reqmsg, offset + 3)) {
+ r->ur_dlm = lustre_swab_reqbuf(req, offset + 3,
+ sizeof(*r->ur_dlm),
+ lustre_swab_ldlm_request);
+ if (r->ur_dlm == NULL)
+ RETURN (-EFAULT);
}
RETURN(0);
}
if (r->ur_name == NULL)
RETURN (-EFAULT);
r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
+ if (lustre_msg_buflen(req->rq_reqmsg, offset + 2)) {
+ r->ur_dlm = lustre_swab_reqbuf(req, offset + 2,
+ sizeof(*r->ur_dlm),
+ lustre_swab_ldlm_request);
+ if (r->ur_dlm == NULL)
+ RETURN (-EFAULT);
+ }
RETURN(0);
}
if (r->ur_name == NULL)
RETURN(-EFAULT);
r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
+
+ if (lustre_msg_buflen(req->rq_reqmsg, offset + 2)) {
+ r->ur_dlm = lustre_swab_reqbuf(req, offset + 2,
+ sizeof(*r->ur_dlm),
+ lustre_swab_ldlm_request);
+ if (r->ur_dlm == NULL)
+ RETURN (-EFAULT);
+ }
RETURN(0);
}
if (r->ur_tgt == NULL)
RETURN(-EFAULT);
r->ur_tgtlen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
+ if (lustre_msg_buflen(req->rq_reqmsg, offset + 3)) {
+ r->ur_dlm = lustre_swab_reqbuf(req, offset + 3,
+ sizeof(*r->ur_dlm),
+ lustre_swab_ldlm_request);
+ if (r->ur_dlm == NULL)
+ RETURN (-EFAULT);
+ }
RETURN(0);
}
r->ur_namelen = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
LASSERT_REQSWAB(req, offset + 2);
- if (lustre_msg_bufcount(req->rq_reqmsg) > offset + 2) {
+ r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
+ if (r->ur_eadatalen) {
r->ur_eadata = lustre_msg_buf(req->rq_reqmsg, offset + 2, 0);
if (r->ur_eadata == NULL)
RETURN (-EFAULT);
- r->ur_eadatalen = lustre_msg_buflen(req->rq_reqmsg, offset + 2);
}
RETURN(0);
}
MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
+ if (rec->ur_dlm)
+ ldlm_request_cancel(req, rec->ur_dlm, 0);
+
if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN ||
(req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)) {
de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
GOTO(cleanup, rc = -ESTALE);
+ if (rec->ur_dlm)
+ ldlm_request_cancel(req, rec->ur_dlm, 0);
+
dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_EX, &lockh,
MDS_INODELOCK_UPDATE);
if (IS_ERR(dparent)) {
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
GOTO(cleanup, rc = -ENOENT);
+ if (rec->ur_dlm)
+ ldlm_request_cancel(req, rec->ur_dlm, 0);
+
rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
&parent_lockh, &dparent, LCK_EX,
MDS_INODELOCK_UPDATE,
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
GOTO(cleanup, rc = -ENOENT);
+ if (rec->ur_dlm)
+ ldlm_request_cancel(req, rec->ur_dlm, 0);
+
/* Step 1: Lookup the source inode and target directory by FID */
de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
if (IS_ERR(de_src))
MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
+ if (rec->ur_dlm)
+ ldlm_request_cancel(req, rec->ur_dlm, 0);
+
rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir,
rec->ur_fid2, &de_tgtdir, LCK_EX,
rec->ur_name, rec->ur_namelen,
ldlm_completion_ast, NULL, NULL, 0,
NULL, lh);
} else {
+ struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_EX,
+ ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL };
int flags = 0;
/*
* This is the case mdt0 is remote node, issue DLM lock like
* other clients.
*/
- rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, res_id,
- LDLM_IBITS, policy, LCK_EX, &flags,
- ldlm_blocking_ast, ldlm_completion_ast,
- NULL, NULL, NULL, 0, NULL, lh, 0);
+ rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, &einfo, res_id,
+ policy, &flags, NULL, 0, NULL, lh, 0);
}
RETURN(rc);
struct lustre_handle *lockh)
{
struct config_llog_data *cld = (struct config_llog_data *)data;
+ struct ldlm_enqueue_info einfo = { type, mode, mgc_blocking_ast,
+ ldlm_completion_ast, NULL, data};
+
int rc;
ENTRY;
/* We need a callback for every lockholder, so don't try to
ldlm_lock_match (see rev 1.1.2.11.2.47) */
- rc = ldlm_cli_enqueue(exp, NULL, &cld->cld_resid,
- type, NULL, mode, flags,
- mgc_blocking_ast, ldlm_completion_ast, NULL,
- data, NULL, 0, NULL, lockh, 0);
+ rc = ldlm_cli_enqueue(exp, NULL, &einfo, &cld->cld_resid,
+ NULL, flags, NULL, 0, NULL, lockh, 0);
/* A failed enqueue should still call the mgc_blocking_ast,
where it will be requeued if needed ("grant failed"). */
struct obd_device *obd = exp->exp_obd;
struct echo_client_obd *ec = &obd->u.echo_client;
struct lustre_handle *ulh = obdo_handle (oa);
- struct obd_enqueue_info einfo = { 0 };
+ struct ldlm_enqueue_info einfo = { 0 };
struct obd_info oinfo = { { { 0 } } };
struct ec_object *eco;
struct ec_lock *ecl;
oinfo.oi_policy = ecl->ecl_policy;
oinfo.oi_lockh = &ecl->ecl_lock_handle;
oinfo.oi_md = eco->eco_lsm;
- rc = obd_enqueue(ec->ec_exp, &oinfo, &einfo);
+ rc = obd_enqueue(ec->ec_exp, &oinfo, &einfo, NULL);
if (rc != 0)
goto failed_1;
*
* Of course, this will all disappear when we switch to
* taking liblustre locks on the OST. */
- if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
- ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
+ ldlm_res_lvbo_update(res, NULL, 0, 1);
}
RETURN(ELDLM_LOCK_ABORTED);
}
* XXX nikita: situation when ldlm_server_glimpse_ast() failed before
* sending ast is not handled. This can result in lost client writes.
*/
- if (rc != 0 && ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
- ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
+ if (rc != 0)
+ ldlm_res_lvbo_update(res, NULL, 0, 1);
lock_res(res);
*reply_lvb = *res_lvb;
{
struct ldlm_res_id res_id = { .name = { oinfo->oi_oa->o_id, 0,
oinfo->oi_oa->o_gr, 0 } };
- struct ldlm_valblock_ops *ns_lvbo;
struct filter_mod_data *fmd;
struct lvfs_run_ctxt saved;
struct filter_obd *filter;
&res_id, LDLM_EXTENT, 0);
if (res != NULL) {
- ns_lvbo = res->lr_namespace->ns_lvbo;
- if (ns_lvbo && ns_lvbo->lvbo_update)
- rc = ns_lvbo->lvbo_update(res, NULL, 0, 0);
+ rc = ldlm_res_lvbo_update(res, NULL, 0, 0);
ldlm_resource_putref(res);
}
static int osc_enqueue_interpret(struct ptlrpc_request *req,
struct osc_enqueue_args *aa, int rc)
{
- int intent = aa->oa_ei->ei_flags & LDLM_FL_HAS_INTENT;
+ int intent = aa->oa_oi->oi_flags & LDLM_FL_HAS_INTENT;
struct lov_stripe_md *lsm = aa->oa_oi->oi_md;
struct ldlm_lock *lock;
/* Complete obtaining the lock procedure. */
rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
aa->oa_ei->ei_mode,
- &aa->oa_ei->ei_flags,
+ &aa->oa_oi->oi_flags,
&lsm->lsm_oinfo[0]->loi_lvb,
sizeof(lsm->lsm_oinfo[0]->loi_lvb),
lustre_swab_ost_lvb,
* is excluded from the cluster -- such scenarious make the life difficult, so
* release locks just after they are obtained. */
static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
- struct obd_enqueue_info *einfo)
+ struct ldlm_enqueue_info *einfo,
+ struct ptlrpc_request_set *rqset)
{
struct ldlm_res_id res_id = { .name = {0} };
struct obd_device *obd = exp->exp_obd;
struct ldlm_reply *rep;
struct ptlrpc_request *req = NULL;
- int intent = einfo->ei_flags & LDLM_FL_HAS_INTENT;
+ int intent = oinfo->oi_flags & LDLM_FL_HAS_INTENT;
int rc;
ENTRY;
/* Next, search for already existing extent locks that will cover us */
rc = ldlm_lock_match(obd->obd_namespace,
- einfo->ei_flags | LDLM_FL_LVB_READY, &res_id,
+ oinfo->oi_flags | LDLM_FL_LVB_READY, &res_id,
einfo->ei_type, &oinfo->oi_policy, einfo->ei_mode,
oinfo->oi_lockh);
if (rc == 1) {
osc_set_data_with_check(oinfo->oi_lockh, einfo->ei_cbdata,
- einfo->ei_flags);
+ oinfo->oi_flags);
if (intent) {
/* I would like to be able to ASSERT here that rss <=
* kms, but I can't, for reasons which are explained in
oinfo->oi_cb_up(oinfo, ELDLM_OK);
/* For async requests, decref the lock. */
- if (einfo->ei_rqset)
+ if (rqset)
ldlm_lock_decref(oinfo->oi_lockh, einfo->ei_mode);
RETURN(ELDLM_OK);
if (einfo->ei_mode == LCK_PR) {
rc = ldlm_lock_match(obd->obd_namespace,
- einfo->ei_flags | LDLM_FL_LVB_READY,
+ oinfo->oi_flags | LDLM_FL_LVB_READY,
&res_id, einfo->ei_type, &oinfo->oi_policy,
LCK_PW, oinfo->oi_lockh);
if (rc == 1) {
* be more elegant than adding another parameter to
* lock_match. I want a second opinion. */
/* addref the lock only if not async requests. */
- if (!einfo->ei_rqset)
+ if (!rqset)
ldlm_lock_addref(oinfo->oi_lockh, LCK_PR);
osc_set_data_with_check(oinfo->oi_lockh,
einfo->ei_cbdata,
- einfo->ei_flags);
+ oinfo->oi_flags);
oinfo->oi_cb_up(oinfo, ELDLM_OK);
ldlm_lock_decref(oinfo->oi_lockh, LCK_PW);
RETURN(ELDLM_OK);
}
/* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
- einfo->ei_flags &= ~LDLM_FL_BLOCK_GRANTED;
+ oinfo->oi_flags &= ~LDLM_FL_BLOCK_GRANTED;
- rc = ldlm_cli_enqueue(exp, &req, &res_id, einfo->ei_type,
- &oinfo->oi_policy, einfo->ei_mode,
- &einfo->ei_flags, einfo->ei_cb_bl,
- einfo->ei_cb_cp, einfo->ei_cb_gl,
- einfo->ei_cbdata,
+ rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id,
+ &oinfo->oi_policy, &oinfo->oi_flags,
&oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
sizeof(oinfo->oi_md->lsm_oinfo[0]->loi_lvb),
lustre_swab_ost_lvb, oinfo->oi_lockh,
- einfo->ei_rqset ? 1 : 0);
- if (einfo->ei_rqset) {
+ rqset ? 1 : 0);
+ if (rqset) {
if (!rc) {
struct osc_enqueue_args *aa;
CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
aa->oa_exp = exp;
req->rq_interpret_reply = osc_enqueue_interpret;
- ptlrpc_set_add_req(einfo->ei_rqset, req);
+ ptlrpc_set_add_req(rqset, req);
} else if (intent) {
ptlrpc_req_finished(req);
}
CLASSERT(OBD_CONNECT_QUOTA64 == 0x00080000ULL);
CLASSERT(OBD_CONNECT_MDS_CAPA == 0x00100000ULL);
CLASSERT(OBD_CONNECT_OSS_CAPA == 0x00200000ULL);
- CLASSERT(OBD_CONNECT_MDS_MDS == 0x00400000ULL);
+ CLASSERT(OBD_CONNECT_CANCELSET == 0x00400000ULL);
CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL);
- CLASSERT(OBD_CONNECT_CANCELSET == 0x01000000ULL);
+ CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
+ CLASSERT(OBD_CONNECT_MDS_MDS == 0x02000000ULL);
/* Checks for struct obdo */
LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",
}
run_test 58 "Eviction in the middle of open RPC reply processing"
+test_59() { # bug 10589
+ zconf_mount `hostname` $MOUNT2 || error "Failed to mount $MOUNT2"
+#define OBD_FAIL_LDLM_CANCEL_EVICT_RACE 0x311
+ sysctl -w lustre.fail_loc=0x311
+ writes=`dd if=/dev/zero of=$DIR2/$tfile count=1 2>&1 | awk 'BEGIN { FS="+" } /out/ {print $1}'`
+ sysctl -w lustre.fail_loc=0
+ sync
+ zconf_umount `hostname` $DIR2 -f
+ reads=`dd if=$DIR/$tfile of=/dev/null 2>&1 | awk 'BEGIN { FS="+" } /in/ {print $1}'`
+ [ $reads -eq $writes ] || error "read" $reads "blocks, must be" $writes
+}
+run_test 59 "Read cancel race on client eviction"
+
equals_msg `basename $0`: test complete, cleaning up
check_and_cleanup_lustre
[ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG || true
}
run_test 119b "Sparse directIO read must return actual read amount"
-test_119a() {
+test_120a() {
mkdir $DIR/$tdir
cancel_lru_locks mdc
stat $DIR/$tdir > /dev/null
[ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
[ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
}
-run_test 119a "Early Lock Cancel: mkdir test"
+run_test 120a "Early Lock Cancel: mkdir test"
-test_119b() {
+test_120b() {
mkdir $DIR/$tdir
cancel_lru_locks mdc
stat $DIR/$tdir > /dev/null
[ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
[ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
}
-run_test 119b "Early Lock Cancel: create test"
+run_test 120b "Early Lock Cancel: create test"
-test_119c() {
+test_120c() {
mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2
touch $DIR/$tdir/d1/f1
cancel_lru_locks mdc
[ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
[ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
}
-run_test 119c "Early Lock Cancel: link test"
+run_test 120c "Early Lock Cancel: link test"
-test_119d() {
+test_120d() {
touch $DIR/$tdir
cancel_lru_locks mdc
stat $DIR/$tdir > /dev/null
[ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
[ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
}
-run_test 119d "Early Lock Cancel: setattr test"
+run_test 120d "Early Lock Cancel: setattr test"
-test_119e() {
+test_120e() {
mkdir $DIR/$tdir
dd if=/dev/zero of=$DIR/$tdir/f1 count=1
cancel_lru_locks mdc
[ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
[ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
}
-run_test 119e "Early Lock Cancel: unlink test"
+run_test 120e "Early Lock Cancel: unlink test"
-test_119f() {
+test_120f() {
mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2
dd if=/dev/zero of=$DIR/$tdir/d1/f1 count=1
dd if=/dev/zero of=$DIR/$tdir/d2/f2 count=1
[ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
[ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
}
-run_test 119f "Early Lock Cancel: rename test"
+run_test 120f "Early Lock Cancel: rename test"
-test_119g() {
+test_120g() {
count=10000
echo create $count files
mkdir $DIR/$tdir
sleep 2
# wait for commitment of removal
}
-run_test 119g "Early Lock Cancel: performance test"
+run_test 120g "Early Lock Cancel: performance test"
+
+test_121() { #bug #10589
+ rm -rf $DIR/$tfile
+ writes=`dd if=/dev/zero of=$DIR/$tfile count=1 2>&1 | awk 'BEGIN { FS="+" } /out/ {print $1}'`
+#define OBD_FAIL_LDLM_CANCEL_RACE 0x310
+ sysctl -w lustre.fail_loc=0x310
+ cancel_lru_locks osc > /dev/null
+ reads=`dd if=$DIR/$tfile of=/dev/null 2>&1 | awk 'BEGIN { FS="+" } /in/ {print $1}'`
+ sysctl -w lustre.fail_loc=0
+ [ $reads -eq $writes ] || error "read" $reads "blocks, must be" $writes
+}
+run_test 121 "read cancel race ========="
+
TMPDIR=$OLDTMPDIR
TMP=$OLDTMP
CHECK_CDEFINE(OBD_CONNECT_OSS_CAPA);
CHECK_CDEFINE(OBD_CONNECT_MDS_MDS);
CHECK_CDEFINE(OBD_CONNECT_SOM);
+ CHECK_CDEFINE(OBD_CONNECT_AT);
CHECK_CDEFINE(OBD_CONNECT_CANCELSET);
}
CLASSERT(OBD_CONNECT_QUOTA64 == 0x00080000ULL);
CLASSERT(OBD_CONNECT_MDS_CAPA == 0x00100000ULL);
CLASSERT(OBD_CONNECT_OSS_CAPA == 0x00200000ULL);
- CLASSERT(OBD_CONNECT_MDS_MDS == 0x00400000ULL);
+ CLASSERT(OBD_CONNECT_CANCELSET == 0x00400000ULL);
CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL);
- CLASSERT(OBD_CONNECT_CANCELSET == 0x01000000ULL);
+ CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
+ CLASSERT(OBD_CONNECT_MDS_MDS == 0x02000000ULL);
/* Checks for struct obdo */
LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",