From eed3d71c2026f1aeaec7ad46b5d5f3ccd53620a7 Mon Sep 17 00:00:00 2001 From: green Date: Tue, 13 Sep 2005 16:53:20 +0000 Subject: [PATCH] Branch: b1_4 b=7313 r=adilger Allow locks flagged in a certain way (used by liblustre) to be cancelled without waiting for reply from client. This is a prototype version of code. We land it so that it will get into next code drop to Cray. --- lustre/ChangeLog | 11 ++++++++ lustre/include/linux/lustre_dlm.h | 7 +++++ lustre/include/linux/lustre_mds.h | 4 +-- lustre/include/linux/lustre_net.h | 1 + lustre/ldlm/ldlm_lock.c | 3 ++- lustre/ldlm/ldlm_lockd.c | 26 ++++++++++++++---- lustre/ldlm/ldlm_request.c | 10 ++++--- lustre/liblustre/dir.c | 2 +- lustre/liblustre/namei.c | 6 +++-- lustre/llite/dcache.c | 2 +- lustre/llite/dir.c | 3 ++- lustre/llite/file.c | 2 +- lustre/llite/namei.c | 2 +- lustre/mdc/mdc_locks.c | 9 ++++--- lustre/mds/handler.c | 11 +++++--- lustre/mds/mds_internal.h | 6 +++-- lustre/mds/mds_reint.c | 24 ++++++++++------- lustre/ptlrpc/niobuf.c | 56 +++++++++++++++++++++++++++++++++++++++ lustre/ptlrpc/ptlrpc_module.c | 1 + 19 files changed, 148 insertions(+), 38 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 9cc52cb..f1f2dcc 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -76,6 +76,17 @@ Details : the Linux NFS server relies on an MDS_OPEN_OWNEROVERRIDE hack to Lustre now respects this flag to disable mode checks when truncating a file owned by the user +Severity : minor +Frequency : liblustre-only, when liblustre client dies unexpectedly or becomes + busy +Bugzilla : 7313 +Description: Revoking locks from clients that went dead or catatonic might take + a lot of time. +Details : New lock flags FL_CANCEL_ON_BLOCK used by liblustre makes + cancellation of such locks instant on servers without waiting for + any reply from clients. Clients drops these locks when cancel + notification from server is received without replying. + ------------------------------------------------------------------------------ 08-26-2005 Cluster File Systems, Inc. diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 5fb0019..01b72a9 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -101,6 +101,13 @@ typedef enum { /* Flags sent in AST lock_flags to be mapped into the receiving lock. */ #define LDLM_AST_FLAGS (LDLM_FL_DISCARD_DATA) +/* Immediatelly cancel such locks when they block some other locks. Send + cancel notification to original lock holder, but expect no reply. */ +#define LDLM_FL_CANCEL_ON_BLOCK 0x800000 + +/* Flags flags inherited from parent lock when doing intents. */ +#define LDLM_INHERIT_FLAGS (LDLM_FL_CANCEL_ON_BLOCK) + /* The blocking callback is overloaded to perform two functions. These flags * indicate which operation should be performed. */ #define LDLM_CB_BLOCKING 1 diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 66d1b2b..d3aae2c 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -159,7 +159,7 @@ int mdc_intent_lock(struct obd_export *exp, void *lmm, int lmmsize, struct lookup_intent *, int, struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking); + ldlm_blocking_callback cb_blocking, int extra_lock_flags); int mdc_enqueue(struct obd_export *exp, int lock_type, struct lookup_intent *it, @@ -170,7 +170,7 @@ int mdc_enqueue(struct obd_export *exp, int lmmlen, ldlm_completion_callback cb_completion, ldlm_blocking_callback cb_blocking, - void *cb_data); + void *cb_data, int extra_lock_flags); /* mdc/mdc_request.c */ int mdc_init_ea_size(struct obd_export *mdc_exp, struct obd_export *lov_exp); diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index e0470f4..eaa71ed 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -648,6 +648,7 @@ int ptlrpc_reply(struct ptlrpc_request *req); int ptlrpc_error(struct ptlrpc_request *req); void ptlrpc_resend_req(struct ptlrpc_request *request); int ptl_send_rpc(struct ptlrpc_request *request); +int ptl_send_rpc_nowait(struct ptlrpc_request *request); int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd); /* ptlrpc/client.c */ diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index cda70ee..1975d79 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -835,7 +835,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, /* Some flags from the enqueue want to make it into the AST, via the * lock's l_flags. */ - lock->l_flags |= (*flags & LDLM_AST_DISCARD_DATA); + lock->l_flags |= (*flags & (LDLM_AST_DISCARD_DATA|LDLM_INHERIT_FLAGS)); /* This distinction between local lock trees is very important; a client * namespace only has information about locks taken by that client, and @@ -941,6 +941,7 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list) } else { rc = 0; } + if (rc == -ERESTART) retval = rc; else if (rc) diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 1adbba3..ee26f2e 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -429,6 +429,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, struct ldlm_request *body; struct ptlrpc_request *req; int rc = 0, size = sizeof(*body); + int instant_cancel = 0; ENTRY; if (flag == LDLM_CB_CANCELING) { @@ -461,6 +462,9 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, } #endif + if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) + instant_cancel = 1; + req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse, LDLM_BL_CALLBACK, 1, &size, NULL); if (req == NULL) { @@ -476,14 +480,20 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, LDLM_DEBUG(lock, "server preparing blocking AST"); req->rq_replen = lustre_msg_size(0, NULL); - - if (lock->l_granted_mode == lock->l_req_mode) + if (instant_cancel) { + ldlm_lock_cancel(lock); +// ldlm_reprocess_all(lock->l_resource); + } else if (lock->l_granted_mode == lock->l_req_mode) ldlm_add_waiting_lock(lock); l_unlock(&lock->l_resource->lr_namespace->ns_lock); req->rq_send_state = LUSTRE_IMP_FULL; req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */ - rc = ptlrpc_queue_wait(req); + if (unlikely(instant_cancel)) { + rc = ptl_send_rpc_nowait(req); + } else { + rc = ptlrpc_queue_wait(req); + } if (rc != 0) rc = ldlm_handle_ast_error(lock, req, rc, "blocking"); @@ -951,6 +961,10 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns, LDLM_DEBUG(lock, "client blocking AST callback handler START"); lock->l_flags |= LDLM_FL_CBPENDING; + + if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) + lock->l_flags |= LDLM_FL_CANCEL; + do_ast = (!lock->l_readers && !lock->l_writers); if (do_ast) { @@ -1237,13 +1251,15 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) switch (req->rq_reqmsg->opc) { case LDLM_BL_CALLBACK: CDEBUG(D_INODE, "blocking ast\n"); - ldlm_callback_reply(req, 0); + if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) + ldlm_callback_reply(req, 0); if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock)) ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock); break; case LDLM_CP_CALLBACK: CDEBUG(D_INODE, "completion ast\n"); - ldlm_callback_reply(req, 0); + if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) + ldlm_callback_reply(req, 0); ldlm_handle_cp_callback(req, ns, dlm_req, lock); break; case LDLM_GL_CALLBACK: diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index c43bd4b..8e7cd4e 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -182,6 +182,7 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, ldlm_lock_addref_internal(lock, mode); ldlm_lock2handle(lock, lockh); lock->l_flags |= LDLM_FL_LOCAL; + lock->l_flags |= *flags & LDLM_INHERIT_FLAGS; lock->l_lvb_swabber = lvb_swabber; if (policy != NULL) memcpy(&lock->l_policy_data, policy, sizeof(*policy)); @@ -360,6 +361,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, memcpy(&lock->l_remote_handle, &reply->lock_handle, sizeof(lock->l_remote_handle)); *flags = reply->lock_flags; + lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS; CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n", lock, reply->lock_handle.cookie, *flags); @@ -570,7 +572,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) /* Set this flag to prevent others from getting new references*/ l_lock(&lock->l_resource->lr_namespace->ns_lock); lock->l_flags |= LDLM_FL_CBPENDING; - local_only = (lock->l_flags & LDLM_FL_LOCAL_ONLY); + local_only = (lock->l_flags & + (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)); l_unlock(&lock->l_resource->lr_namespace->ns_lock); ldlm_cancel_callback(lock); @@ -608,9 +611,10 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) if (rc == ESTALE) { char str[PTL_NALFMT_SIZE]; CERROR("client/server (nid %s) out of sync" - " -- not fatal\n", + " -- not fatal, flags %d\n", ptlrpc_peernid2str(&req->rq_import-> - imp_connection->c_peer, str)); + imp_connection->c_peer, str), +lock->l_flags); } else if (rc == -ETIMEDOUT) { ptlrpc_req_finished(req); GOTO(restart, rc); diff --git a/lustre/liblustre/dir.c b/lustre/liblustre/dir.c index 4c1917a..358856f 100644 --- a/lustre/liblustre/dir.c +++ b/lustre/liblustre/dir.c @@ -88,7 +88,7 @@ static int llu_dir_do_readpage(struct inode *inode, struct page *page) rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, &it, LCK_PR, &data, &lockh, NULL, 0, ldlm_completion_ast, llu_mdc_blocking_ast, - inode); + inode, LDLM_FL_CANCEL_ON_BLOCK); request = (struct ptlrpc_request *)it.d.lustre.it_data; if (request) ptlrpc_req_finished(request); diff --git a/lustre/liblustre/namei.c b/lustre/liblustre/namei.c index e173996..912851a 100644 --- a/lustre/liblustre/namei.c +++ b/lustre/liblustre/namei.c @@ -265,7 +265,8 @@ int llu_pb_revalidate(struct pnode *pnode, int flags, struct lookup_intent *it) pb->pb_ino, pb->pb_name.name,pb->pb_name.len,0); rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, flags, - &req, llu_mdc_blocking_ast); + &req, llu_mdc_blocking_ast, + LDLM_FL_CANCEL_ON_BLOCK); /* If req is NULL, then mdc_intent_lock only tried to do a lock match; * if all was well, it will return 1 if it found locks, 0 otherwise. */ if (req == NULL && rc >= 0) @@ -431,7 +432,8 @@ static int llu_lookup_it(struct inode *parent, struct pnode *pnode, pnode->p_base->pb_name.len, flags); rc = mdc_intent_lock(llu_i2mdcexp(parent), &op_data, NULL, 0, it, - flags, &req, llu_mdc_blocking_ast); + flags, &req, llu_mdc_blocking_ast, + LDLM_FL_CANCEL_ON_BLOCK); if (rc < 0) GOTO(out, rc); diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 55759ef..7ee9a58 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -303,7 +303,7 @@ int ll_revalidate_it(struct dentry *de, int lookup_flags, de->d_name.name, de->d_name.len, 0); rc = mdc_intent_lock(exp, &op_data, NULL, 0, it, lookup_flags, - &req, ll_mdc_blocking_ast); + &req, ll_mdc_blocking_ast, 0); /* If req is NULL, then mdc_intent_lock only tried to do a lock match; * if all was well, it will return 1 if it found locks, 0 otherwise. */ if (req == NULL && rc >= 0) diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index f9d4924..68196c3 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -224,7 +224,8 @@ static struct page *ll_get_dir_page(struct inode *dir, unsigned long n) rc = mdc_enqueue(ll_i2sbi(dir)->ll_mdc_exp, LDLM_PLAIN, &it, LCK_PR, &data, &lockh, NULL, 0, - ldlm_completion_ast, ll_mdc_blocking_ast, dir); + ldlm_completion_ast, ll_mdc_blocking_ast, dir, + 0); request = (struct ptlrpc_request *)it.d.lustre.it_data; if (request) diff --git a/lustre/llite/file.c b/lustre/llite/file.c index c8b4e7c..10ebaf1 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -154,7 +154,7 @@ static int ll_intent_file_open(struct file *file, void *lmm, rc = mdc_enqueue(sbi->ll_mdc_exp, LDLM_PLAIN, itp, LCK_PW, &data, &lockh, lmm, lmmsize, ldlm_completion_ast, - ll_mdc_blocking_ast, NULL); + ll_mdc_blocking_ast, NULL, 0); if (rc < 0) CERROR("lock enqueue: err: %d\n", rc); RETURN(rc); diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 779c1e1..4ec2685 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -408,7 +408,7 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry, it->it_create_mode &= ~current->fs->umask; rc = mdc_intent_lock(ll_i2mdcexp(parent), &op_data, NULL, 0, it, - lookup_flags, &req, ll_mdc_blocking_ast); + lookup_flags, &req, ll_mdc_blocking_ast, 0); if (rc < 0) GOTO(out, retval = ERR_PTR(rc)); diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index ead4b89..202afbf 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -235,14 +235,14 @@ int mdc_enqueue(struct obd_export *exp, int lmmsize, ldlm_completion_callback cb_completion, ldlm_blocking_callback cb_blocking, - void *cb_data) + void *cb_data, int extra_lock_flags) { struct ptlrpc_request *req; struct obd_device *obddev = class_exp2obd(exp); struct ldlm_res_id res_id = { .name = {data->fid1.id, data->fid1.generation} }; int size[5] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)}; - int rc, flags = LDLM_FL_HAS_INTENT; + int rc, flags = extra_lock_flags | LDLM_FL_HAS_INTENT; int repsize[4] = {sizeof(struct ldlm_reply), sizeof(struct mds_body), obddev->u.cli.cl_max_mds_easize, @@ -472,7 +472,7 @@ EXPORT_SYMBOL(mdc_enqueue); int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, void *lmm, int lmmsize, struct lookup_intent *it, int lookup_flags, struct ptlrpc_request **reqp, - ldlm_blocking_callback cb_blocking) + ldlm_blocking_callback cb_blocking, int extra_lock_flags) { struct lustre_handle lockh; struct ptlrpc_request *request; @@ -526,7 +526,8 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data, rc = mdc_enqueue(exp, LDLM_PLAIN, it, it_to_lock_mode(it), op_data, &lockh, lmm, lmmsize, - ldlm_completion_ast, cb_blocking, NULL); + ldlm_completion_ast, cb_blocking, NULL, + extra_lock_flags); if (rc < 0) RETURN(rc); memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh)); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 0f90dd6..ac61f2f 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -671,7 +671,7 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, return(rc); } -static int mds_getattr_name(int offset, struct ptlrpc_request *req, +static int mds_getattr_name(int offset, struct ptlrpc_request *req, int flags, struct lustre_handle *child_lockh) { struct obd_device *obd = req->rq_export->exp_obd; @@ -753,7 +753,8 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req, rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1, &parent_lockh, &dparent, LCK_PR, name, namesize, - child_lockh, &dchild, LCK_PR); + child_lockh, &dchild, LCK_PR, + flags); if (rc) GOTO(cleanup, rc); } else { @@ -1238,7 +1239,7 @@ int mds_handle(struct ptlrpc_request *req) * want to cancel. */ lockh.cookie = 0; - rc = mds_getattr_name(0, req, &lockh); + rc = mds_getattr_name(0, req, 0, &lockh); /* this non-intent call (from an ioctl) is special */ req->rq_status = rc; if (rc == 0 && lockh.cookie) @@ -1990,7 +1991,9 @@ static int mds_intent_policy(struct ldlm_namespace *ns, case IT_LOOKUP: case IT_READDIR: fixup_handle_for_resent_req(req, lock, &new_lock, &lockh); - rep->lock_policy_res2 = mds_getattr_name(offset, req, &lockh); + rep->lock_policy_res2 = mds_getattr_name(offset, req, + flags & LDLM_INHERIT_FLAGS, + &lockh); /* FIXME: LDLM can set req->rq_status. MDS sets policy_res{1,2} with disposition and status. - replay: returns 0 & req->status is old status diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index 1f564f3..6fffb8b 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -103,7 +103,8 @@ int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2); int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, struct lustre_handle *p1_lockh, int p1_lock_mode, struct ldlm_res_id *p2_res_id, - struct lustre_handle *p2_lockh, int p2_lock_mode); + struct lustre_handle *p2_lockh, int p2_lock_mode, + int p2_lock_flags); void mds_commit_cb(struct obd_device *, __u64 last_rcvd, void *data, int error); int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle, struct ptlrpc_request *req, int rc, __u32 op_data); @@ -115,7 +116,8 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, struct dentry **dparentp, int parent_mode, char *name, int namelen, struct lustre_handle *child_lockh, - struct dentry **dchildp, int child_mode); + struct dentry **dchildp, int child_mode, + int child_lock_flags); int mds_lock_new_child(struct obd_device *obd, struct inode *inode, struct lustre_handle *child_lockh); int mds_osc_setattr_async(struct obd_device *obd, struct inode *inode, diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 215140b..67a162a 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -946,12 +946,14 @@ int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2) int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, struct lustre_handle *p1_lockh, int p1_lock_mode, struct ldlm_res_id *p2_res_id, - struct lustre_handle *p2_lockh, int p2_lock_mode) + struct lustre_handle *p2_lockh, int p2_lock_mode, + int p2_lock_flags) { struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id }; struct lustre_handle *handles[2] = { p1_lockh, p2_lockh }; int lock_modes[2] = { p1_lock_mode, p2_lock_mode }; - int rc, flags; + int flags[2] = { LDLM_FL_LOCAL_ONLY, LDLM_FL_LOCAL_ONLY | p2_lock_flags }; + int rc; ENTRY; LASSERT(p1_res_id != NULL && p2_res_id != NULL); @@ -966,14 +968,15 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, res_id[0] = p2_res_id; lock_modes[1] = p1_lock_mode; lock_modes[0] = p2_lock_mode; + flags[1] = LDLM_FL_LOCAL_ONLY; + flags[0] = p2_lock_flags | LDLM_FL_LOCAL_ONLY; } CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n", res_id[0]->name[0], res_id[1]->name[0]); - flags = LDLM_FL_LOCAL_ONLY; rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0], - LDLM_PLAIN, NULL, lock_modes[0], &flags, + LDLM_PLAIN, NULL, lock_modes[0], &flags[0], mds_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, handles[0]); if (rc != ELDLM_OK) @@ -984,10 +987,9 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, memcpy(handles[1], handles[0], sizeof(*(handles[1]))); ldlm_lock_addref(handles[1], lock_modes[1]); } else if (res_id[1]->name[0] != 0) { - flags = LDLM_FL_LOCAL_ONLY; rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[1], LDLM_PLAIN, NULL, - lock_modes[1], &flags, mds_blocking_ast, + lock_modes[1], &flags[1],mds_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, handles[1]); if (rc != ELDLM_OK) { @@ -1178,7 +1180,8 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, struct dentry **dparentp, int parent_mode, char *name, int namelen, struct lustre_handle *child_lockh, - struct dentry **dchildp, int child_mode) + struct dentry **dchildp, int child_mode, + int child_lock_flags) { struct ldlm_res_id child_res_id = { .name = {0} }; struct ldlm_res_id parent_res_id = { .name = {0} }; @@ -1233,7 +1236,8 @@ retry_locks: /* Step 3: Lock parent and child in resource order. If child doesn't * exist, we still have to lock the parent and re-lookup. */ rc = enqueue_ordered_locks(obd,&parent_res_id,parent_lockh,parent_mode, - &child_res_id, child_lockh, child_mode); + &child_res_id, child_lockh, child_mode, + child_lock_flags); if (rc) GOTO(cleanup, rc); @@ -1386,7 +1390,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, &parent_lockh, &dparent, LCK_PW, rec->ur_name, rec->ur_namelen, - &child_lockh, &dchild, LCK_EX); + &child_lockh, &dchild, LCK_EX, 0); if (rc) GOTO(cleanup, rc); @@ -1635,7 +1639,7 @@ static int mds_reint_link(struct mds_update_record *rec, int offset, tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation; rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX, - &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX); + &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX, 0); if (rc) GOTO(cleanup, rc); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index f24391a..44a6193 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -387,6 +387,62 @@ int ptlrpc_error(struct ptlrpc_request *req) RETURN(rc); } +int ptl_send_rpc_nowait(struct ptlrpc_request *request) +{ + int rc; + struct ptlrpc_connection *connection; + unsigned long flags; + ENTRY; + + LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST); + + if (request->rq_import->imp_obd && + request->rq_import->imp_obd->obd_fail) { + CDEBUG(D_HA, "muting rpc for failed imp obd %s\n", + request->rq_import->imp_obd->obd_name); + /* this prevents us from waiting in ptlrpc_queue_wait */ + request->rq_err = 1; + RETURN(-ENODEV); + } + + connection = request->rq_import->imp_connection; + + request->rq_reqmsg->handle = request->rq_import->imp_remote_handle; + request->rq_reqmsg->type = PTL_RPC_MSG_REQUEST; + request->rq_reqmsg->conn_cnt = request->rq_import->imp_conn_cnt; + + spin_lock_irqsave (&request->rq_lock, flags); + /* If the MD attach succeeds, there _will_ be a reply_in callback */ + request->rq_receiving_reply = 0; + /* Clear any flags that may be present from previous sends. */ + request->rq_replied = 0; + request->rq_err = 0; + request->rq_timedout = 0; + request->rq_net_err = 0; + request->rq_resend = 0; + request->rq_restart = 0; + spin_unlock_irqrestore (&request->rq_lock, flags); + + ptlrpc_request_addref(request); /* +1 ref for the SENT callback */ + + request->rq_sent = CURRENT_SECONDS; + ptlrpc_pinger_sending_on_import(request->rq_import); + rc = ptl_send_buf(&request->rq_req_md_h, + request->rq_reqmsg, request->rq_reqlen, + PTL_NOACK_REQ, &request->rq_req_cbid, + connection, + request->rq_request_portal, + request->rq_xid); + if (rc == 0) { + ptlrpc_lprocfs_rpc_sent(request); + } else { + ptlrpc_req_finished (request); /* drop callback ref */ + } + + return rc; +} + + int ptl_send_rpc(struct ptlrpc_request *request) { int rc; diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index 4df0fe7..35d91ee 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -95,6 +95,7 @@ EXPORT_SYMBOL(ptlrpc_reply); EXPORT_SYMBOL(ptlrpc_error); EXPORT_SYMBOL(ptlrpc_resend_req); EXPORT_SYMBOL(ptl_send_rpc); +EXPORT_SYMBOL(ptl_send_rpc_nowait); /* client.c */ EXPORT_SYMBOL(ptlrpc_init_client); -- 1.8.3.1