Lustre now respects this flag to disable mode checks when
truncating a file owned by the user
+Severity : minor
+Frequency : liblustre-only, when liblustre client dies unexpectedly or becomes
+ busy
+Bugzilla : 7313
+Description: Revoking locks from clients that went dead or catatonic might take
+ a lot of time.
+Details : New lock flags FL_CANCEL_ON_BLOCK used by liblustre makes
+ cancellation of such locks instant on servers without waiting for
+ any reply from clients. Clients drops these locks when cancel
+ notification from server is received without replying.
+
------------------------------------------------------------------------------
08-26-2005 Cluster File Systems, Inc. <info@clusterfs.com>
/* Flags sent in AST lock_flags to be mapped into the receiving lock. */
#define LDLM_AST_FLAGS (LDLM_FL_DISCARD_DATA)
+/* Immediatelly cancel such locks when they block some other locks. Send
+ cancel notification to original lock holder, but expect no reply. */
+#define LDLM_FL_CANCEL_ON_BLOCK 0x800000
+
+/* Flags flags inherited from parent lock when doing intents. */
+#define LDLM_INHERIT_FLAGS (LDLM_FL_CANCEL_ON_BLOCK)
+
/* The blocking callback is overloaded to perform two functions. These flags
* indicate which operation should be performed. */
#define LDLM_CB_BLOCKING 1
void *lmm, int lmmsize,
struct lookup_intent *, int,
struct ptlrpc_request **reqp,
- ldlm_blocking_callback cb_blocking);
+ ldlm_blocking_callback cb_blocking, int extra_lock_flags);
int mdc_enqueue(struct obd_export *exp,
int lock_type,
struct lookup_intent *it,
int lmmlen,
ldlm_completion_callback cb_completion,
ldlm_blocking_callback cb_blocking,
- void *cb_data);
+ void *cb_data, int extra_lock_flags);
/* mdc/mdc_request.c */
int mdc_init_ea_size(struct obd_export *mdc_exp, struct obd_export *lov_exp);
int ptlrpc_error(struct ptlrpc_request *req);
void ptlrpc_resend_req(struct ptlrpc_request *request);
int ptl_send_rpc(struct ptlrpc_request *request);
+int ptl_send_rpc_nowait(struct ptlrpc_request *request);
int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
/* ptlrpc/client.c */
/* Some flags from the enqueue want to make it into the AST, via the
* lock's l_flags. */
- lock->l_flags |= (*flags & LDLM_AST_DISCARD_DATA);
+ lock->l_flags |= (*flags & (LDLM_AST_DISCARD_DATA|LDLM_INHERIT_FLAGS));
/* This distinction between local lock trees is very important; a client
* namespace only has information about locks taken by that client, and
} else {
rc = 0;
}
+
if (rc == -ERESTART)
retval = rc;
else if (rc)
struct ldlm_request *body;
struct ptlrpc_request *req;
int rc = 0, size = sizeof(*body);
+ int instant_cancel = 0;
ENTRY;
if (flag == LDLM_CB_CANCELING) {
}
#endif
+ if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
+ instant_cancel = 1;
+
req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
LDLM_BL_CALLBACK, 1, &size, NULL);
if (req == NULL) {
LDLM_DEBUG(lock, "server preparing blocking AST");
req->rq_replen = lustre_msg_size(0, NULL);
-
- if (lock->l_granted_mode == lock->l_req_mode)
+ if (instant_cancel) {
+ ldlm_lock_cancel(lock);
+// ldlm_reprocess_all(lock->l_resource);
+ } else if (lock->l_granted_mode == lock->l_req_mode)
ldlm_add_waiting_lock(lock);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
req->rq_send_state = LUSTRE_IMP_FULL;
req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
- rc = ptlrpc_queue_wait(req);
+ if (unlikely(instant_cancel)) {
+ rc = ptl_send_rpc_nowait(req);
+ } else {
+ rc = ptlrpc_queue_wait(req);
+ }
if (rc != 0)
rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
LDLM_DEBUG(lock, "client blocking AST callback handler START");
lock->l_flags |= LDLM_FL_CBPENDING;
+
+ if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
+ lock->l_flags |= LDLM_FL_CANCEL;
+
do_ast = (!lock->l_readers && !lock->l_writers);
if (do_ast) {
switch (req->rq_reqmsg->opc) {
case LDLM_BL_CALLBACK:
CDEBUG(D_INODE, "blocking ast\n");
- ldlm_callback_reply(req, 0);
+ if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
+ ldlm_callback_reply(req, 0);
if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock))
ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
break;
case LDLM_CP_CALLBACK:
CDEBUG(D_INODE, "completion ast\n");
- ldlm_callback_reply(req, 0);
+ if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
+ ldlm_callback_reply(req, 0);
ldlm_handle_cp_callback(req, ns, dlm_req, lock);
break;
case LDLM_GL_CALLBACK:
ldlm_lock_addref_internal(lock, mode);
ldlm_lock2handle(lock, lockh);
lock->l_flags |= LDLM_FL_LOCAL;
+ lock->l_flags |= *flags & LDLM_INHERIT_FLAGS;
lock->l_lvb_swabber = lvb_swabber;
if (policy != NULL)
memcpy(&lock->l_policy_data, policy, sizeof(*policy));
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
*flags = reply->lock_flags;
+ lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS;
CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n",
lock, reply->lock_handle.cookie, *flags);
/* Set this flag to prevent others from getting new references*/
l_lock(&lock->l_resource->lr_namespace->ns_lock);
lock->l_flags |= LDLM_FL_CBPENDING;
- local_only = (lock->l_flags & LDLM_FL_LOCAL_ONLY);
+ local_only = (lock->l_flags &
+ (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_cancel_callback(lock);
if (rc == ESTALE) {
char str[PTL_NALFMT_SIZE];
CERROR("client/server (nid %s) out of sync"
- " -- not fatal\n",
+ " -- not fatal, flags %d\n",
ptlrpc_peernid2str(&req->rq_import->
- imp_connection->c_peer, str));
+ imp_connection->c_peer, str),
+lock->l_flags);
} else if (rc == -ETIMEDOUT) {
ptlrpc_req_finished(req);
GOTO(restart, rc);
rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, &it, LCK_PR,
&data, &lockh, NULL, 0,
ldlm_completion_ast, llu_mdc_blocking_ast,
- inode);
+ inode, LDLM_FL_CANCEL_ON_BLOCK);
request = (struct ptlrpc_request *)it.d.lustre.it_data;
if (request)
ptlrpc_req_finished(request);
pb->pb_ino, pb->pb_name.name,pb->pb_name.len,0);
rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, flags,
- &req, llu_mdc_blocking_ast);
+ &req, llu_mdc_blocking_ast,
+ LDLM_FL_CANCEL_ON_BLOCK);
/* If req is NULL, then mdc_intent_lock only tried to do a lock match;
* if all was well, it will return 1 if it found locks, 0 otherwise. */
if (req == NULL && rc >= 0)
pnode->p_base->pb_name.len, flags);
rc = mdc_intent_lock(llu_i2mdcexp(parent), &op_data, NULL, 0, it,
- flags, &req, llu_mdc_blocking_ast);
+ flags, &req, llu_mdc_blocking_ast,
+ LDLM_FL_CANCEL_ON_BLOCK);
if (rc < 0)
GOTO(out, rc);
de->d_name.name, de->d_name.len, 0);
rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, lookup_flags,
- &req, ll_mdc_blocking_ast);
+ &req, ll_mdc_blocking_ast, 0);
/* If req is NULL, then mdc_intent_lock only tried to do a lock match;
* if all was well, it will return 1 if it found locks, 0 otherwise. */
if (req == NULL && rc >= 0)
rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_PLAIN, &it,
LCK_PR, &data, &lockh, NULL, 0,
- ldlm_completion_ast, ll_mdc_blocking_ast, dir);
+ ldlm_completion_ast, ll_mdc_blocking_ast, dir,
+ 0);
request = (struct ptlrpc_request *)it.d.lustre.it_data;
if (request)
rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, itp, LCK_PW, &data,
&lockh, lmm, lmmsize, ldlm_completion_ast,
- ll_mdc_blocking_ast, NULL);
+ ll_mdc_blocking_ast, NULL, 0);
if (rc < 0)
CERROR("lock enqueue: err: %d\n", rc);
RETURN(rc);
it->it_create_mode &= ~current->fs->umask;
rc = mdc_intent_lock(ll_i2mdcexp(parent), &op_data, NULL, 0, it,
- lookup_flags, &req, ll_mdc_blocking_ast);
+ lookup_flags, &req, ll_mdc_blocking_ast, 0);
if (rc < 0)
GOTO(out, retval = ERR_PTR(rc));
int lmmsize,
ldlm_completion_callback cb_completion,
ldlm_blocking_callback cb_blocking,
- void *cb_data)
+ void *cb_data, int extra_lock_flags)
{
struct ptlrpc_request *req;
struct obd_device *obddev = class_exp2obd(exp);
struct ldlm_res_id res_id =
{ .name = {data->fid1.id, data->fid1.generation} };
int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
- int rc, flags = LDLM_FL_HAS_INTENT;
+ int rc, flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
int repsize[4] = {sizeof(struct ldlm_reply),
sizeof(struct mds_body),
obddev->u.cli.cl_max_mds_easize,
int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
void *lmm, int lmmsize, struct lookup_intent *it,
int lookup_flags, struct ptlrpc_request **reqp,
- ldlm_blocking_callback cb_blocking)
+ ldlm_blocking_callback cb_blocking, int extra_lock_flags)
{
struct lustre_handle lockh;
struct ptlrpc_request *request;
rc = mdc_enqueue(exp, LDLM_PLAIN, it, it_to_lock_mode(it),
op_data, &lockh, lmm, lmmsize,
- ldlm_completion_ast, cb_blocking, NULL);
+ ldlm_completion_ast, cb_blocking, NULL,
+ extra_lock_flags);
if (rc < 0)
RETURN(rc);
memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
return(rc);
}
-static int mds_getattr_name(int offset, struct ptlrpc_request *req,
+static int mds_getattr_name(int offset, struct ptlrpc_request *req, int flags,
struct lustre_handle *child_lockh)
{
struct obd_device *obd = req->rq_export->exp_obd;
rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
&parent_lockh, &dparent,
LCK_PR, name, namesize,
- child_lockh, &dchild, LCK_PR);
+ child_lockh, &dchild, LCK_PR,
+ flags);
if (rc)
GOTO(cleanup, rc);
} else {
* want to cancel.
*/
lockh.cookie = 0;
- rc = mds_getattr_name(0, req, &lockh);
+ rc = mds_getattr_name(0, req, 0, &lockh);
/* this non-intent call (from an ioctl) is special */
req->rq_status = rc;
if (rc == 0 && lockh.cookie)
case IT_LOOKUP:
case IT_READDIR:
fixup_handle_for_resent_req(req, lock, &new_lock, &lockh);
- rep->lock_policy_res2 = mds_getattr_name(offset, req, &lockh);
+ rep->lock_policy_res2 = mds_getattr_name(offset, req,
+ flags & LDLM_INHERIT_FLAGS,
+ &lockh);
/* FIXME: LDLM can set req->rq_status. MDS sets
policy_res{1,2} with disposition and status.
- replay: returns 0 & req->status is old status
int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
struct lustre_handle *p1_lockh, int p1_lock_mode,
struct ldlm_res_id *p2_res_id,
- struct lustre_handle *p2_lockh, int p2_lock_mode);
+ struct lustre_handle *p2_lockh, int p2_lock_mode,
+ int p2_lock_flags);
void mds_commit_cb(struct obd_device *, __u64 last_rcvd, void *data, int error);
int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
struct ptlrpc_request *req, int rc, __u32 op_data);
struct dentry **dparentp, int parent_mode,
char *name, int namelen,
struct lustre_handle *child_lockh,
- struct dentry **dchildp, int child_mode);
+ struct dentry **dchildp, int child_mode,
+ int child_lock_flags);
int mds_lock_new_child(struct obd_device *obd, struct inode *inode,
struct lustre_handle *child_lockh);
int mds_osc_setattr_async(struct obd_device *obd, struct inode *inode,
int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
struct lustre_handle *p1_lockh, int p1_lock_mode,
struct ldlm_res_id *p2_res_id,
- struct lustre_handle *p2_lockh, int p2_lock_mode)
+ struct lustre_handle *p2_lockh, int p2_lock_mode,
+ int p2_lock_flags)
{
struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id };
struct lustre_handle *handles[2] = { p1_lockh, p2_lockh };
int lock_modes[2] = { p1_lock_mode, p2_lock_mode };
- int rc, flags;
+ int flags[2] = { LDLM_FL_LOCAL_ONLY, LDLM_FL_LOCAL_ONLY | p2_lock_flags };
+ int rc;
ENTRY;
LASSERT(p1_res_id != NULL && p2_res_id != NULL);
res_id[0] = p2_res_id;
lock_modes[1] = p1_lock_mode;
lock_modes[0] = p2_lock_mode;
+ flags[1] = LDLM_FL_LOCAL_ONLY;
+ flags[0] = p2_lock_flags | LDLM_FL_LOCAL_ONLY;
}
CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n",
res_id[0]->name[0], res_id[1]->name[0]);
- flags = LDLM_FL_LOCAL_ONLY;
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0],
- LDLM_PLAIN, NULL, lock_modes[0], &flags,
+ LDLM_PLAIN, NULL, lock_modes[0], &flags[0],
mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
NULL, 0, NULL, handles[0]);
if (rc != ELDLM_OK)
memcpy(handles[1], handles[0], sizeof(*(handles[1])));
ldlm_lock_addref(handles[1], lock_modes[1]);
} else if (res_id[1]->name[0] != 0) {
- flags = LDLM_FL_LOCAL_ONLY;
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
*res_id[1], LDLM_PLAIN, NULL,
- lock_modes[1], &flags, mds_blocking_ast,
+ lock_modes[1], &flags[1],mds_blocking_ast,
ldlm_completion_ast, NULL, NULL, NULL, 0,
NULL, handles[1]);
if (rc != ELDLM_OK) {
struct dentry **dparentp, int parent_mode,
char *name, int namelen,
struct lustre_handle *child_lockh,
- struct dentry **dchildp, int child_mode)
+ struct dentry **dchildp, int child_mode,
+ int child_lock_flags)
{
struct ldlm_res_id child_res_id = { .name = {0} };
struct ldlm_res_id parent_res_id = { .name = {0} };
/* Step 3: Lock parent and child in resource order. If child doesn't
* exist, we still have to lock the parent and re-lookup. */
rc = enqueue_ordered_locks(obd,&parent_res_id,parent_lockh,parent_mode,
- &child_res_id, child_lockh, child_mode);
+ &child_res_id, child_lockh, child_mode,
+ child_lock_flags);
if (rc)
GOTO(cleanup, rc);
rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
&parent_lockh, &dparent, LCK_PW,
rec->ur_name, rec->ur_namelen,
- &child_lockh, &dchild, LCK_EX);
+ &child_lockh, &dchild, LCK_EX, 0);
if (rc)
GOTO(cleanup, rc);
tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX,
- &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX);
+ &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX, 0);
if (rc)
GOTO(cleanup, rc);
RETURN(rc);
}
+int ptl_send_rpc_nowait(struct ptlrpc_request *request)
+{
+ int rc;
+ struct ptlrpc_connection *connection;
+ unsigned long flags;
+ ENTRY;
+
+ LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST);
+
+ if (request->rq_import->imp_obd &&
+ request->rq_import->imp_obd->obd_fail) {
+ CDEBUG(D_HA, "muting rpc for failed imp obd %s\n",
+ request->rq_import->imp_obd->obd_name);
+ /* this prevents us from waiting in ptlrpc_queue_wait */
+ request->rq_err = 1;
+ RETURN(-ENODEV);
+ }
+
+ connection = request->rq_import->imp_connection;
+
+ request->rq_reqmsg->handle = request->rq_import->imp_remote_handle;
+ request->rq_reqmsg->type = PTL_RPC_MSG_REQUEST;
+ request->rq_reqmsg->conn_cnt = request->rq_import->imp_conn_cnt;
+
+ spin_lock_irqsave (&request->rq_lock, flags);
+ /* If the MD attach succeeds, there _will_ be a reply_in callback */
+ request->rq_receiving_reply = 0;
+ /* Clear any flags that may be present from previous sends. */
+ request->rq_replied = 0;
+ request->rq_err = 0;
+ request->rq_timedout = 0;
+ request->rq_net_err = 0;
+ request->rq_resend = 0;
+ request->rq_restart = 0;
+ spin_unlock_irqrestore (&request->rq_lock, flags);
+
+ ptlrpc_request_addref(request); /* +1 ref for the SENT callback */
+
+ request->rq_sent = CURRENT_SECONDS;
+ ptlrpc_pinger_sending_on_import(request->rq_import);
+ rc = ptl_send_buf(&request->rq_req_md_h,
+ request->rq_reqmsg, request->rq_reqlen,
+ PTL_NOACK_REQ, &request->rq_req_cbid,
+ connection,
+ request->rq_request_portal,
+ request->rq_xid);
+ if (rc == 0) {
+ ptlrpc_lprocfs_rpc_sent(request);
+ } else {
+ ptlrpc_req_finished (request); /* drop callback ref */
+ }
+
+ return rc;
+}
+
+
int ptl_send_rpc(struct ptlrpc_request *request)
{
int rc;
EXPORT_SYMBOL(ptlrpc_error);
EXPORT_SYMBOL(ptlrpc_resend_req);
EXPORT_SYMBOL(ptl_send_rpc);
+EXPORT_SYMBOL(ptl_send_rpc_nowait);
/* client.c */
EXPORT_SYMBOL(ptlrpc_init_client);