X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_request.c;h=8aff2f612f83681e92214f2ed921eb28cb4bbcb7;hp=b57f1f72b6333bf495ae4223a78488db0fe533d5;hb=ec295cad998d4d5e7fc915491da5ac646dbc6af6;hpb=11e0902cb38306ccb570ae2aab6348f64bdb9825 diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index b57f1f7..8aff2f6 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -3,20 +3,23 @@ * * Copyright (C) 2002, 2003 Cluster File Systems, Inc. * - * This file is part of Lustre, http://www.lustre.org. + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. */ #define DEBUG_SUBSYSTEM S_LDLM @@ -25,9 +28,9 @@ #include #endif -#include -#include -#include +#include +#include +#include #include "ldlm_internal.h" @@ -37,7 +40,11 @@ static void interrupted_completion_wait(void *data) struct lock_wait_data { struct ldlm_lock *lwd_lock; - int lwd_generation; + __u32 lwd_conn_cnt; +}; + +struct ldlm_async_args { + struct lustre_handle lock_handle; }; int ldlm_expired_completion_wait(void *data) @@ -47,31 +54,35 @@ int ldlm_expired_completion_wait(void *data) struct obd_import *imp; struct obd_device *obd; + ENTRY; if (lock->l_conn_export == NULL) { - static unsigned long next_dump = 0, last_dump = 0; + static cfs_time_t next_dump = 0, last_dump = 0; if (ptlrpc_check_suspend()) RETURN(0); - LDLM_ERROR(lock, "lock timed out; not entering recovery in " - "server code, just going back to sleep"); - if (time_after(jiffies, next_dump)) { + LDLM_ERROR(lock, "lock timed out (enqueued at %lu, %lus ago); " + "not entering recovery in server code, just going " + "back to sleep", lock->l_enqueued_time.tv_sec, + CURRENT_SECONDS - lock->l_enqueued_time.tv_sec); + if (cfs_time_after(cfs_time_current(), next_dump)) { last_dump = next_dump; - next_dump = jiffies + 300 * HZ; + next_dump = cfs_time_shift(300); ldlm_namespace_dump(D_DLMTRACE, lock->l_resource->lr_namespace); if (last_dump == 0) - portals_debug_dumplog(); + libcfs_debug_dumplog(); } RETURN(0); } obd = lock->l_conn_export->exp_obd; imp = obd->u.cli.cl_import; - ptlrpc_fail_import(imp, lwd->lwd_generation); - LDLM_ERROR(lock, "lock timed out, entering recovery for %s@%s", - imp->imp_target_uuid.uuid, - imp->imp_connection->c_remote_uuid.uuid); + ptlrpc_fail_import(imp, lwd->lwd_conn_cnt); + LDLM_ERROR(lock, "lock timed out (enqueued at %lu, %lus ago), entering " + "recovery for %s@%s", lock->l_enqueued_time.tv_sec, + CURRENT_SECONDS - lock->l_enqueued_time.tv_sec, + obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid); RETURN(0); } @@ -80,19 +91,20 @@ int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data) { /* XXX ALLOCATE - 160 bytes */ struct lock_wait_data lwd; - unsigned long irqflags; struct obd_device *obd; struct obd_import *imp = NULL; struct l_wait_info lwi; int rc = 0; ENTRY; - if (flags == LDLM_FL_WAIT_NOREPROC) + if (flags == LDLM_FL_WAIT_NOREPROC) { + LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock"); goto noreproc; + } if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV))) { - wake_up(&lock->l_waitq); + cfs_waitq_signal(&lock->l_waitq); RETURN(0); } @@ -111,19 +123,19 @@ noreproc: lwd.lwd_lock = lock; - if (flags & LDLM_FL_NO_TIMEOUT) { - LDLM_DEBUG(lock, "waiting indefinitely for group lock\n"); + if (lock->l_flags & LDLM_FL_NO_TIMEOUT) { + LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT"); lwi = LWI_INTR(interrupted_completion_wait, &lwd); } else { - lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, + lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout), ldlm_expired_completion_wait, interrupted_completion_wait, &lwd); } if (imp != NULL) { - spin_lock_irqsave(&imp->imp_lock, irqflags); - lwd.lwd_generation = imp->imp_generation; - spin_unlock_irqrestore(&imp->imp_lock, irqflags); + spin_lock(&imp->imp_lock); + lwd.lwd_conn_cnt = imp->imp_conn_cnt; + spin_unlock(&imp->imp_lock); } /* Go to sleep until the lock is granted or cancelled. */ @@ -146,54 +158,123 @@ noreproc: RETURN(0); } -static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, - struct ldlm_res_id res_id, - __u32 type, - ldlm_policy_data_t *policy, - ldlm_mode_t mode, - int *flags, - ldlm_blocking_callback blocking, - ldlm_completion_callback completion, - ldlm_glimpse_callback glimpse, - void *data, __u32 lvb_len, - void *lvb_swabber, - struct lustre_handle *lockh) +/* + * ->l_blocking_ast() callback for LDLM locks acquired by server-side OBDs. + */ +int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, int flag) +{ + int do_ast; + ENTRY; + + if (flag == LDLM_CB_CANCELING) { + /* Don't need to do anything here. */ + RETURN(0); + } + + lock_res_and_lock(lock); + /* Get this: if ldlm_blocking_ast is racing with intent_policy, such + * that ldlm_blocking_ast is called just before intent_policy method + * takes the ns_lock, then by the time we get the lock, we might not + * be the correct blocking function anymore. So check, and return + * early, if so. */ + if (lock->l_blocking_ast != ldlm_blocking_ast) { + unlock_res_and_lock(lock); + RETURN(0); + } + + lock->l_flags |= LDLM_FL_CBPENDING; + do_ast = (!lock->l_readers && !lock->l_writers); + unlock_res_and_lock(lock); + + if (do_ast) { + struct lustre_handle lockh; + int rc; + + LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel"); + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc < 0) + CERROR("ldlm_cli_cancel: %d\n", rc); + } else { + LDLM_DEBUG(lock, "Lock still has references, will be " + "cancelled later"); + } + RETURN(0); +} + +/* + * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See + * comment in filter_intent_policy() on why you may need this. + */ +int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp) +{ + /* + * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for + * that is rather subtle: with OST-side locking, it may so happen that + * _all_ extent locks are held by the OST. If client wants to obtain + * current file size it calls ll{,u}_glimpse_size(), and (as locks are + * on the server), dummy glimpse callback fires and does + * nothing. Client still receives correct file size due to the + * following fragment in filter_intent_policy(): + * + * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB + * if (rc != 0 && res->lr_namespace->ns_lvbo && + * res->lr_namespace->ns_lvbo->lvbo_update) { + * res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1); + * } + * + * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and + * returns correct file size to the client. + */ + return -ELDLM_NO_LOCK_DATA; +} + +int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, + const struct ldlm_res_id *res_id, + ldlm_type_t type, ldlm_policy_data_t *policy, + ldlm_mode_t mode, int *flags, + ldlm_blocking_callback blocking, + ldlm_completion_callback completion, + ldlm_glimpse_callback glimpse, + void *data, __u32 lvb_len, void *lvb_swabber, + struct lustre_handle *lockh) { struct ldlm_lock *lock; int err; ENTRY; - if (ns->ns_client) { + LASSERT(!(*flags & LDLM_FL_REPLAY)); + if (unlikely(ns_is_client(ns))) { CERROR("Trying to enqueue local lock in a shadow namespace\n"); LBUG(); } - lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking, + lock = ldlm_lock_create(ns, res_id, type, mode, blocking, completion, glimpse, data, lvb_len); - if (!lock) + if (unlikely(!lock)) GOTO(out_nolock, err = -ENOMEM); LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created"); ldlm_lock_addref_internal(lock, mode); ldlm_lock2handle(lock, lockh); + lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_LOCAL; if (*flags & LDLM_FL_ATOMIC_CB) lock->l_flags |= LDLM_FL_ATOMIC_CB; lock->l_lvb_swabber = lvb_swabber; + unlock_res_and_lock(lock); if (policy != NULL) - memcpy(&lock->l_policy_data, policy, sizeof(*policy)); + lock->l_policy_data = *policy; if (type == LDLM_EXTENT) - memcpy(&lock->l_req_extent, &policy->l_extent, - sizeof(policy->l_extent)); + lock->l_req_extent = policy->l_extent; err = ldlm_lock_enqueue(ns, &lock, policy, flags); - if (err != ELDLM_OK) + if (unlikely(err != ELDLM_OK)) GOTO(out, err); if (policy != NULL) - memcpy(policy, &lock->l_policy_data, sizeof(*policy)); - if ((*flags) & LDLM_FL_LOCK_CHANGED) - memcpy(&res_id, &lock->l_resource->lr_name, sizeof(res_id)); + *policy = lock->l_policy_data; LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)", lock); @@ -228,99 +309,24 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, } } -int ldlm_cli_enqueue(struct obd_export *exp, - struct ptlrpc_request *req, - struct ldlm_namespace *ns, - struct ldlm_res_id res_id, - __u32 type, - ldlm_policy_data_t *policy, - ldlm_mode_t mode, - int *flags, - ldlm_blocking_callback blocking, - ldlm_completion_callback completion, - ldlm_glimpse_callback glimpse, - void *data, - void *lvb, - __u32 lvb_len, - void *lvb_swabber, - struct lustre_handle *lockh) +int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, + ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode, + int *flags, void *lvb, __u32 lvb_len, + void *lvb_swabber, struct lustre_handle *lockh,int rc) { + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + int is_replay = *flags & LDLM_FL_REPLAY; struct ldlm_lock *lock; - struct ldlm_request *body; struct ldlm_reply *reply; - int rc, size[3] = {0, sizeof(*body), lvb_len}, req_passed_in = 1; - int is_replay = *flags & LDLM_FL_REPLAY; - int cleanup_phase = 0; + int cleanup_phase = 1; ENTRY; - if (exp == NULL) { - LASSERT(!is_replay); - rc = ldlm_cli_enqueue_local(ns, res_id, type, policy, mode, - flags, blocking, completion, - glimpse, data, lvb_len, lvb_swabber, - lockh); - RETURN(rc); - } - - /* If we're replaying this lock, just check some invariants. - * If we're creating a new lock, get everything all setup nice. */ - if (is_replay) { - lock = ldlm_handle2lock(lockh); - LDLM_DEBUG(lock, "client-side enqueue START"); - LASSERT(exp == lock->l_conn_export); - } else { - lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking, - completion, glimpse, data, lvb_len); - if (lock == NULL) - RETURN(-ENOMEM); - /* for the local lock, add the reference */ - ldlm_lock_addref_internal(lock, mode); - ldlm_lock2handle(lock, lockh); - lock->l_lvb_swabber = lvb_swabber; - if (policy != NULL) - memcpy(&lock->l_policy_data, policy, sizeof(*policy)); - if (type == LDLM_EXTENT) - memcpy(&lock->l_req_extent, &policy->l_extent, - sizeof(policy->l_extent)); - LDLM_DEBUG(lock, "client-side enqueue START"); - } - - /* lock not sent to server yet */ - cleanup_phase = 2; - - if (req == NULL) { - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, - LDLM_ENQUEUE, 2, size, NULL); - if (req == NULL) - GOTO(cleanup, rc = -ENOMEM); - req_passed_in = 0; - } - - LASSERTF(req->rq_reqmsg->buflens[MDS_REQ_INTENT_LOCKREQ_OFF] == - sizeof(*body), "buflen[%d] = %d, not %d\n", - MDS_REQ_INTENT_LOCKREQ_OFF, - req->rq_reqmsg->buflens[MDS_REQ_INTENT_LOCKREQ_OFF], - sizeof(*body)); - - /* Dump lock data into the request buffer */ - body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_INTENT_LOCKREQ_OFF, - sizeof (*body)); - ldlm_lock2desc(lock, &body->lock_desc); - body->lock_flags = *flags; - - memcpy(&body->lock_handle1, lockh, sizeof(*lockh)); - - /* Continue as normal. */ - if (!req_passed_in) { - size[0] = sizeof(*reply); - req->rq_replen = lustre_msg_size(1 + (lvb_len > 0), size); + lock = ldlm_handle2lock(lockh); + /* ldlm_cli_enqueue is holding a reference on this lock. */ + if (!lock) { + LASSERT(type == LDLM_FLOCK); + RETURN(-ENOLCK); } - lock->l_conn_export = exp; - lock->l_export = NULL; - lock->l_blocking_ast = blocking; - - LDLM_DEBUG(lock, "sending request"); - rc = ptlrpc_queue_wait(req); if (rc != ELDLM_OK) { LASSERT(!is_replay); @@ -328,7 +334,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED"); if (rc == ELDLM_LOCK_ABORTED) { /* Before we return, swab the reply */ - reply = lustre_swab_repbuf(req, 0, sizeof(*reply), + reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, + sizeof(*reply), lustre_swab_ldlm_reply); if (reply == NULL) { CERROR("Can't unpack ldlm_reply\n"); @@ -336,7 +343,9 @@ int ldlm_cli_enqueue(struct obd_export *exp, } if (lvb_len) { void *tmplvb; - tmplvb = lustre_swab_repbuf(req, 1, lvb_len, + tmplvb = lustre_swab_repbuf(req, + DLM_REPLY_REC_OFF, + lvb_len, lvb_swabber); if (tmplvb == NULL) GOTO(cleanup, rc = -EPROTO); @@ -347,7 +356,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, GOTO(cleanup, rc); } - reply = lustre_swab_repbuf(req, 0, sizeof(*reply), + reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply), lustre_swab_ldlm_reply); if (reply == NULL) { CERROR("Can't unpack ldlm_reply\n"); @@ -355,11 +364,16 @@ int ldlm_cli_enqueue(struct obd_export *exp, } /* lock enqueued on the server */ - cleanup_phase = 1; + cleanup_phase = 0; - memcpy(&lock->l_remote_handle, &reply->lock_handle, - sizeof(lock->l_remote_handle)); + lock_res_and_lock(lock); + lock->l_remote_handle = reply->lock_handle; *flags = reply->lock_flags; + lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS; + /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match() + * to wait with no timeout as well */ + lock->l_flags |= reply->lock_flags & LDLM_FL_NO_TIMEOUT; + unlock_res_and_lock(lock); CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n", lock, reply->lock_handle.cookie, *flags); @@ -376,32 +390,41 @@ int ldlm_cli_enqueue(struct obd_export *exp, lock->l_req_mode = newmode; } - if (reply->lock_desc.l_resource.lr_name.name[0] != - lock->l_resource->lr_name.name[0] || - reply->lock_desc.l_resource.lr_name.name[1] != - lock->l_resource->lr_name.name[1]) { - CDEBUG(D_INFO, "remote intent success, locking %ld " - "instead of %ld\n", + if (memcmp(reply->lock_desc.l_resource.lr_name.name, + lock->l_resource->lr_name.name, + sizeof(struct ldlm_res_id))) { + CDEBUG(D_INFO, "remote intent success, locking " + "(%ld,%ld,%ld) instead of " + "(%ld,%ld,%ld)\n", (long)reply->lock_desc.l_resource.lr_name.name[0], - (long)lock->l_resource->lr_name.name[0]); + (long)reply->lock_desc.l_resource.lr_name.name[1], + (long)reply->lock_desc.l_resource.lr_name.name[2], + (long)lock->l_resource->lr_name.name[0], + (long)lock->l_resource->lr_name.name[1], + (long)lock->l_resource->lr_name.name[2]); ldlm_lock_change_resource(ns, lock, - reply->lock_desc.l_resource.lr_name); + &reply->lock_desc.l_resource.lr_name); if (lock->l_resource == NULL) { LBUG(); GOTO(cleanup, rc = -ENOMEM); } LDLM_DEBUG(lock, "client-side enqueue, new resource"); } - if (policy != NULL) - memcpy(&lock->l_policy_data, - &reply->lock_desc.l_policy_data, - sizeof(reply->lock_desc.l_policy_data)); + if (with_policy) + if (!(type == LDLM_IBITS && !(exp->exp_connect_flags & + OBD_CONNECT_IBITS))) + lock->l_policy_data = + reply->lock_desc.l_policy_data; if (type != LDLM_PLAIN) LDLM_DEBUG(lock,"client-side enqueue, new policy data"); } - if ((*flags) & LDLM_FL_AST_SENT) { + if ((*flags) & LDLM_FL_AST_SENT || + /* Cancel extent locks as soon as possible on a liblustre client, + * because it cannot handle asynchronous ASTs robustly (see + * bug 7311). */ + (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) { lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; unlock_res_and_lock(lock); @@ -412,11 +435,10 @@ int ldlm_cli_enqueue(struct obd_export *exp, * clobber the LVB with an older one. */ if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) { void *tmplvb; - tmplvb = lustre_swab_repbuf(req, 1, lvb_len, lvb_swabber); - if (tmplvb == NULL) { - cleanup_phase = 2; + tmplvb = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, lvb_len, + lvb_swabber); + if (tmplvb == NULL) GOTO(cleanup, rc = -EPROTO); - } memcpy(lock->l_lvb_data, tmplvb, lvb_len); } @@ -426,8 +448,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, int err = lock->l_completion_ast(lock, *flags, NULL); if (!rc) rc = err; - if (rc) - cleanup_phase = 2; + if (rc && type != LDLM_FLOCK) /* bug 9425, bug 10250 */ + cleanup_phase = 1; } } @@ -440,26 +462,231 @@ int ldlm_cli_enqueue(struct obd_export *exp, LDLM_DEBUG(lock, "client-side enqueue END"); EXIT; cleanup: - switch (cleanup_phase) { - case 2: - if (rc) - failed_lock_cleanup(ns, lock, lockh, mode); - case 1: - if (!req_passed_in && req != NULL) - ptlrpc_req_finished(req); - } - + if (cleanup_phase == 1 && rc) + failed_lock_cleanup(ns, lock, lockh, mode); + /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */ + LDLM_LOCK_PUT(lock); LDLM_LOCK_PUT(lock); return rc; } +/* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into + * a single page on the send/receive side. XXX: 512 should be changed + * to more adequate value. */ +static inline int ldlm_req_handles_avail(struct obd_export *exp, + int *size, int bufcount, int off) +{ + int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); + int old_size = size[DLM_LOCKREQ_OFF]; + + size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); + avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, + bufcount, size); + avail /= sizeof(struct lustre_handle); + avail += LDLM_LOCKREQ_HANDLES - off; + size[DLM_LOCKREQ_OFF] = old_size; + + return avail; +} + +static inline int ldlm_cancel_handles_avail(struct obd_export *exp) +{ + int size[2] = { sizeof(struct ptlrpc_body), + sizeof(struct ldlm_request) }; + return ldlm_req_handles_avail(exp, size, 2, 0); +} + +/* Cancel lru locks and pack them into the enqueue request. Pack there the given + * @count locks in @cancels. */ +struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp, + int bufcount, int *size, + struct list_head *cancels, + int count) +{ + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct ldlm_request *dlm = NULL; + struct ptlrpc_request *req; + CFS_LIST_HEAD(head); + ENTRY; + + if (cancels == NULL) + cancels = &head; + if (exp_connect_cancelset(exp)) { + /* Estimate the amount of available space in the request. */ + int avail = ldlm_req_handles_avail(exp, size, bufcount, + LDLM_ENQUEUE_CANCEL_OFF); + LASSERT(avail >= count); + + /* Cancel lru locks here _only_ if the server supports + * EARLY_CANCEL. Otherwise we have to send extra CANCEL + * rpc right on enqueue, what will make it slower, vs. + * asynchronous rpc in blocking thread. */ + count += ldlm_cancel_lru_local(ns, cancels, + exp_connect_lru_resize(exp) ? 0 : 1, + avail - count, LDLM_CANCEL_AGED); + size[DLM_LOCKREQ_OFF] = + ldlm_request_bufsize(count, LDLM_ENQUEUE); + } + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, + LDLM_ENQUEUE, bufcount, size, NULL); + if (exp_connect_cancelset(exp) && req) { + dlm = lustre_msg_buf(req->rq_reqmsg, + DLM_LOCKREQ_OFF, sizeof(*dlm)); + /* Skip first lock handler in ldlm_request_pack(), this method + * will incrment @lock_count according to the lock handle amount + * actually written to the buffer. */ + dlm->lock_count = LDLM_ENQUEUE_CANCEL_OFF; + ldlm_cli_cancel_list(cancels, count, req, DLM_LOCKREQ_OFF, 0); + } else { + ldlm_lock_list_put(cancels, l_bl_ast, count); + } + RETURN(req); +} + +/* If a request has some specific initialisation it is passed in @reqp, + * otherwise it is created in ldlm_cli_enqueue. + * + * Supports sync and async requests, pass @async flag accordingly. If a + * request was created in ldlm_cli_enqueue and it is the async request, + * pass it to the caller in @reqp. */ +int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, + struct ldlm_enqueue_info *einfo, + const struct ldlm_res_id *res_id, + ldlm_policy_data_t *policy, int *flags, + void *lvb, __u32 lvb_len, void *lvb_swabber, + struct lustre_handle *lockh, int async) +{ + struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; + struct ldlm_lock *lock; + struct ldlm_request *body; + struct ldlm_reply *reply; + int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + [DLM_LOCKREQ_OFF] = sizeof(*body), + [DLM_REPLY_REC_OFF] = lvb_len }; + int is_replay = *flags & LDLM_FL_REPLAY; + int req_passed_in = 1, rc, err; + struct ptlrpc_request *req; + ENTRY; + + LASSERT(exp != NULL); + + /* If we're replaying this lock, just check some invariants. + * If we're creating a new lock, get everything all setup nice. */ + if (is_replay) { + lock = ldlm_handle2lock(lockh); + LASSERT(lock != NULL); + LDLM_DEBUG(lock, "client-side enqueue START"); + LASSERT(exp == lock->l_conn_export); + } else { + lock = ldlm_lock_create(ns, res_id, einfo->ei_type, + einfo->ei_mode, einfo->ei_cb_bl, + einfo->ei_cb_cp, einfo->ei_cb_gl, + einfo->ei_cbdata, lvb_len); + if (lock == NULL) + RETURN(-ENOMEM); + /* for the local lock, add the reference */ + ldlm_lock_addref_internal(lock, einfo->ei_mode); + ldlm_lock2handle(lock, lockh); + lock->l_lvb_swabber = lvb_swabber; + if (policy != NULL) { + /* INODEBITS_INTEROP: If the server does not support + * inodebits, we will request a plain lock in the + * descriptor (ldlm_lock2desc() below) but use an + * inodebits lock internally with both bits set. + */ + if (einfo->ei_type == LDLM_IBITS && + !(exp->exp_connect_flags & OBD_CONNECT_IBITS)) + lock->l_policy_data.l_inodebits.bits = + MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_UPDATE; + else + lock->l_policy_data = *policy; + } + + if (einfo->ei_type == LDLM_EXTENT) + lock->l_req_extent = policy->l_extent; + LDLM_DEBUG(lock, "client-side enqueue START"); + } + + /* lock not sent to server yet */ + + if (reqp == NULL || *reqp == NULL) { + req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0); + if (req == NULL) { + failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode); + LDLM_LOCK_PUT(lock); + RETURN(-ENOMEM); + } + req_passed_in = 0; + if (reqp) + *reqp = req; + } else { + req = *reqp; + LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >= + sizeof(*body), "buflen[%d] = %d, not "LPSZ"\n", + DLM_LOCKREQ_OFF, + lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF), + sizeof(*body)); + } + + lock->l_conn_export = exp; + lock->l_export = NULL; + lock->l_blocking_ast = einfo->ei_cb_bl; + + /* Dump lock data into the request buffer */ + body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); + ldlm_lock2desc(lock, &body->lock_desc); + body->lock_flags = *flags; + body->lock_handle[0] = *lockh; + + /* Continue as normal. */ + if (!req_passed_in) { + size[DLM_LOCKREPLY_OFF] = sizeof(*reply); + ptlrpc_req_set_repsize(req, 2 + (lvb_len > 0), size); + } + + /* + * Liblustre client doesn't get extent locks, except for O_APPEND case + * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where + * [i_size, OBD_OBJECT_EOF] lock is taken. + */ + LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT || + policy->l_extent.end == OBD_OBJECT_EOF)); + + if (async) { + LASSERT(reqp != NULL); + RETURN(0); + } + + LDLM_DEBUG(lock, "sending request"); + rc = ptlrpc_queue_wait(req); + err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0, + einfo->ei_mode, flags, lvb, lvb_len, + lvb_swabber, lockh, rc); + + /* If ldlm_cli_enqueue_fini did not find the lock, we need to free + * one reference that we took */ + if (err == -ENOLCK) + LDLM_LOCK_PUT(lock); + else + rc = err; + + if (!req_passed_in && req != NULL) { + ptlrpc_req_finished(req); + if (reqp) + *reqp = NULL; + } + + RETURN(rc); +} + static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, int *flags) { struct ldlm_resource *res; int rc; ENTRY; - if (lock->l_resource->lr_namespace->ns_client) { + if (ns_is_client(lock->l_resource->lr_namespace)) { CERROR("Trying to cancel local lock\n"); LBUG(); } @@ -487,8 +714,10 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) struct ldlm_reply *reply; struct ldlm_lock *lock; struct ldlm_resource *res; - struct ptlrpc_request *req = NULL; - int rc, size = sizeof(*body); + struct ptlrpc_request *req; + int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + [DLM_LOCKREQ_OFF] = sizeof(*body) }; + int rc; ENTRY; lock = ldlm_handle2lock(lockh); @@ -504,25 +733,24 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) LDLM_DEBUG(lock, "client-side convert"); req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export), - LUSTRE_DLM_VERSION, LDLM_CONVERT, 1, &size, NULL); + LUSTRE_DLM_VERSION, LDLM_CONVERT, 2, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); - body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body)); - memcpy(&body->lock_handle1, &lock->l_remote_handle, - sizeof(body->lock_handle1)); + body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); + body->lock_handle[0] = lock->l_remote_handle; body->lock_desc.l_req_mode = new_mode; body->lock_flags = *flags; - size = sizeof(*reply); - req->rq_replen = lustre_msg_size(1, &size); + size[DLM_LOCKREPLY_OFF] = sizeof(*reply); + ptlrpc_req_set_repsize(req, 2, size); rc = ptlrpc_queue_wait(req); if (rc != ELDLM_OK) GOTO(out, rc); - reply = lustre_swab_repbuf(req, 0, sizeof (*reply), + reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply), lustre_swab_ldlm_reply); if (reply == NULL) { CERROR ("Can't unpack ldlm_reply\n"); @@ -532,8 +760,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) if (req->rq_status) GOTO(out, rc = req->rq_status); - res = ldlm_lock_convert(lock, new_mode, - (int *)&reply->lock_flags); + res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags); if (res != NULL) { ldlm_reprocess_all(res); /* Go to sleep until the lock is granted. */ @@ -554,225 +781,592 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) return rc; } -int ldlm_cli_cancel(struct lustre_handle *lockh) +/* Cancel locks locally. + * Returns: + * LDLM_FL_LOCAL_ONLY if tere is no need in a CANCEL rpc to the server; + * LDLM_FL_CANCELING otherwise; + * LDLM_FL_BL_AST if there is a need in a separate CANCEL rpc. */ +static int ldlm_cli_cancel_local(struct ldlm_lock *lock) { - struct ptlrpc_request *req; - struct ldlm_lock *lock; - struct ldlm_request *body; - int rc = 0, size = sizeof(*body); + int rc = LDLM_FL_LOCAL_ONLY; ENTRY; - - /* concurrent cancels on the same handle can happen */ - lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING); - if (lock == NULL) - RETURN(0); - + if (lock->l_conn_export) { int local_only; - struct obd_import *imp; LDLM_DEBUG(lock, "client-side cancel"); /* Set this flag to prevent others from getting new references*/ lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING; - local_only = lock->l_flags & LDLM_FL_LOCAL_ONLY; + local_only = (lock->l_flags & + (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)); ldlm_cancel_callback(lock); + rc = (lock->l_flags & LDLM_FL_BL_AST) ? + LDLM_FL_BL_AST : LDLM_FL_CANCELING; unlock_res_and_lock(lock); if (local_only) { - CDEBUG(D_INFO, "not sending request (at caller's " + CDEBUG(D_DLMTRACE, "not sending request (at caller's " "instruction)\n"); - goto local_cancel; + rc = LDLM_FL_LOCAL_ONLY; + } + ldlm_lock_cancel(lock); + } else { + if (ns_is_client(lock->l_resource->lr_namespace)) { + LDLM_ERROR(lock, "Trying to cancel local lock"); + LBUG(); } + LDLM_DEBUG(lock, "server-side local cancel"); + ldlm_lock_cancel(lock); + ldlm_reprocess_all(lock->l_resource); + LDLM_DEBUG(lock, "server-side local cancel handler END"); + } + + RETURN(rc); +} + +/* Pack @count locks in @head into ldlm_request buffer at the offset @off, + of the request @req. */ +static void ldlm_cancel_pack(struct ptlrpc_request *req, int off, + struct list_head *head, int count) +{ + struct ldlm_request *dlm; + struct ldlm_lock *lock; + int max, packed = 0; + ENTRY; + + dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm)); + LASSERT(dlm != NULL); + + /* Check the room in the request buffer. */ + max = lustre_msg_buflen(req->rq_reqmsg, off) - + sizeof(struct ldlm_request); + max /= sizeof(struct lustre_handle); + max += LDLM_LOCKREQ_HANDLES; + LASSERT(max >= dlm->lock_count + count); + + /* XXX: it would be better to pack lock handles grouped by resource. + * so that the server cancel would call filter_lvbo_update() less + * frequently. */ + list_for_each_entry(lock, head, l_bl_ast) { + if (!count--) + break; + LASSERT(lock->l_conn_export); + /* Pack the lock handle to the given request buffer. */ + LDLM_DEBUG(lock, "packing"); + dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle; + packed++; + } + CDEBUG(D_DLMTRACE, "%d locks packed\n", packed); + EXIT; +} + +/* Prepare and send a batched cancel rpc, it will include count lock handles + * of locks given in @head. */ +int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels, + int count, int flags) +{ + struct ptlrpc_request *req = NULL; + struct ldlm_request *body; + int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + [DLM_LOCKREQ_OFF] = sizeof(*body) }; + struct obd_import *imp; + int free, sent = 0; + int rc = 0; + ENTRY; + + LASSERT(exp != NULL); + LASSERT(count > 0); - restart: - imp = class_exp2cliimp(lock->l_conn_export); + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE)) + RETURN(count); + + free = ldlm_req_handles_avail(exp, size, 2, 0); + if (count > free) + count = free; + + size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL); + while (1) { + imp = class_exp2cliimp(exp); if (imp == NULL || imp->imp_invalid) { - CDEBUG(D_HA, "skipping cancel on invalid import %p\n", - imp); - goto local_cancel; + CDEBUG(D_DLMTRACE, + "skipping cancel on invalid import %p\n", imp); + RETURN(count); } - req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, - 1, &size, NULL); + req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2, + size, NULL); if (!req) GOTO(out, rc = -ENOMEM); + req->rq_no_resend = 1; + req->rq_no_delay = 1; /* XXX FIXME bug 249 */ req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL; req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; - body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body)); - memcpy(&body->lock_handle1, &lock->l_remote_handle, - sizeof(body->lock_handle1)); - - req->rq_replen = lustre_msg_size(0, NULL); - - rc = ptlrpc_queue_wait(req); + body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, + sizeof(*body)); + ldlm_cancel_pack(req, DLM_LOCKREQ_OFF, cancels, count); + ptlrpc_req_set_repsize(req, 1, NULL); + if (flags & LDLM_FL_ASYNC) { + ptlrpcd_add_req(req); + sent = count; + GOTO(out, 0); + } else { + rc = ptlrpc_queue_wait(req); + } if (rc == ESTALE) { - char str[PTL_NALFMT_SIZE]; - CERROR("client/server (nid %s) out of sync" - " -- not fatal\n", - ptlrpc_peernid2str(&req->rq_import-> - imp_connection->c_peer, str)); - } else if (rc == -ETIMEDOUT) { + CDEBUG(D_DLMTRACE, "client/server (nid %s) " + "out of sync -- not fatal\n", + libcfs_nid2str(req->rq_import-> + imp_connection->c_peer.nid)); + rc = 0; + } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/ + req->rq_import_generation == imp->imp_generation) { ptlrpc_req_finished(req); - GOTO(restart, rc); + continue; } else if (rc != ELDLM_OK) { CERROR("Got rc %d from cancel RPC: canceling " "anyway\n", rc); + break; } - - ptlrpc_req_finished(req); - local_cancel: - ldlm_lock_cancel(lock); - } else { - if (lock->l_resource->lr_namespace->ns_client) { - LDLM_ERROR(lock, "Trying to cancel local lock\n"); - LBUG(); - } - LDLM_DEBUG(lock, "client-side local cancel"); - ldlm_lock_cancel(lock); - ldlm_reprocess_all(lock->l_resource); - LDLM_DEBUG(lock, "client-side local cancel handler END"); + sent = count; + break; } + ptlrpc_req_finished(req); EXIT; - out: - LDLM_LOCK_PUT(lock); - return rc; +out: + return sent ? sent : rc; } -/* when called with LDLM_ASYNC the blocking callback will be handled - * in a thread and this function will return after the thread has been - * asked to call the callback. when called with LDLM_SYNC the blocking - * callback will be performed in this function. */ -int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) +static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp) { - struct ldlm_lock *lock, *next; - int count, rc = 0; - LIST_HEAD(cblist); + LASSERT(imp != NULL); + return &imp->imp_obd->obd_namespace->ns_pool; +} + +int ldlm_cli_update_pool(struct ptlrpc_request *req) +{ + struct ldlm_pool *pl; ENTRY; + + if (!imp_connect_lru_resize(req->rq_import)) + RETURN(0); -#ifndef __KERNEL__ - sync = LDLM_SYNC; /* force to be sync in user space */ + if (lustre_msg_get_slv(req->rq_repmsg) == 0 || + lustre_msg_get_limit(req->rq_repmsg) == 0) + RETURN(0); + + pl = ldlm_imp2pl(req->rq_import); + + spin_lock(&pl->pl_lock); +#ifdef __KERNEL__ + { + __u64 old_slv, fast_slv_change; + + old_slv = ldlm_pool_get_slv(pl); + fast_slv_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE; + do_div(fast_slv_change, 100); +#endif + pl->pl_update_time = cfs_time_current(); + ldlm_pool_set_slv(pl, lustre_msg_get_slv(req->rq_repmsg)); + ldlm_pool_set_limit(pl, lustre_msg_get_limit(req->rq_repmsg)); +#ifdef __KERNEL__ + /* Wake up pools thread only if SLV has changed more than + * 5% since last update. In this case we want to react asap. + * Otherwise it is no sense to wake up pools as they are + * re-calculated every 1s anyways. */ + if (old_slv > ldlm_pool_get_slv(pl) && + old_slv - ldlm_pool_get_slv(pl) > fast_slv_change) + ldlm_pools_wakeup(); + } #endif + spin_unlock(&pl->pl_lock); - spin_lock(&ns->ns_unused_lock); - count = ns->ns_nr_unused - ns->ns_max_unused; + RETURN(0); +} +EXPORT_SYMBOL(ldlm_cli_update_pool); - if (count <= 0) { - spin_unlock(&ns->ns_unused_lock); +int ldlm_cli_cancel(struct lustre_handle *lockh) +{ + struct ldlm_lock *lock; + CFS_LIST_HEAD(cancels); + int rc = 0; + ENTRY; + + /* concurrent cancels on the same handle can happen */ + lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING); + if (lock == NULL) { + LDLM_DEBUG_NOLOCK("lock is already being destroyed\n"); RETURN(0); } + + rc = ldlm_cli_cancel_local(lock); + list_add(&lock->l_bl_ast, &cancels); + + if (rc == LDLM_FL_BL_AST) { + rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1, 0); + } else if (rc == LDLM_FL_CANCELING) { + int avail = ldlm_cancel_handles_avail(lock->l_conn_export); + int count = 1; + LASSERT(avail > 0); + count += ldlm_cancel_lru_local(lock->l_resource->lr_namespace, + &cancels, 0, avail - 1, + LDLM_CANCEL_AGED); + ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0); + } + if (rc != LDLM_FL_CANCELING) + LDLM_LOCK_PUT(lock); + RETURN(rc < 0 ? rc : 0); +} + +/* - Free space in lru for @count new locks, + * redundant unused locks are canceled locally; + * - also cancel locally unused aged locks; + * - do not cancel more than @max locks; + * - GET the found locks and add them into the @cancels list. + * + * A client lock can be added to the l_bl_ast list only when it is + * marked LDLM_FL_CANCELING. Otherwise, somebody is already doing CANCEL. + * There are the following use cases: ldlm_cancel_resource_local(), + * ldlm_cancel_lru_local() and ldlm_cli_cancel(), which check&set this + * flag properly. As any attempt to cancel a lock rely on this flag, + * l_bl_ast list is accessed later without any special locking. */ +int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, + int count, int max, int flags) +{ + cfs_time_t cur = cfs_time_current(); + int rc, added = 0, left, unused; + struct ldlm_lock *lock, *next; + __u64 slv, lvf, lv; + ENTRY; + + spin_lock(&ns->ns_unused_lock); + unused = ns->ns_nr_unused; + + if (!ns_connect_lru_resize(ns)) + count += unused - ns->ns_max_unused; while (!list_empty(&ns->ns_unused_list)) { - struct list_head *tmp = ns->ns_unused_list.next; - lock = list_entry(tmp, struct ldlm_lock, l_lru); - LASSERT(!lock->l_readers && !lock->l_writers); + struct ldlm_pool *pl = &ns->ns_pool; + + LASSERT(unused >= 0); + + if (max && added >= max) + break; + + list_for_each_entry(lock, &ns->ns_unused_list, l_lru) { + /* somebody is already doing CANCEL or there is a + * blocking request will send cancel. */ + if (!(lock->l_flags & LDLM_FL_CANCELING) && + !(lock->l_flags & LDLM_FL_BL_AST)) + break; + } + if (&lock->l_lru == &ns->ns_unused_list) + break; + + if (ns_connect_lru_resize(ns)) { + cfs_time_t la; + + /* Take into account SLV only if cpount == 0. */ + if (count == 0) { + /* Calculate lv for every lock. */ + spin_lock(&pl->pl_lock); + slv = ldlm_pool_get_slv(pl); + lvf = atomic_read(&pl->pl_lock_volume_factor); + spin_unlock(&pl->pl_lock); + + la = cfs_duration_sec(cfs_time_sub(cur, + lock->l_last_used)); + if (la == 0) + la = 1; + + /* Stop when slv is not yet come from server + * or lv is smaller than it is. */ + lv = lvf * la * unused; + if (slv == 1 || lv < slv) + break; + } else { + if (added >= count) + break; + } + } else { + if ((added >= count) && + (!(flags & LDLM_CANCEL_AGED) || + cfs_time_before_64(cur, ns->ns_max_age + + lock->l_last_used))) + break; + } LDLM_LOCK_GET(lock); /* dropped by bl thread */ spin_unlock(&ns->ns_unused_lock); lock_res_and_lock(lock); - ldlm_lock_remove_from_lru(lock); + /* Check flags again under the lock. */ + if ((lock->l_flags & LDLM_FL_CANCELING) || + (lock->l_flags & LDLM_FL_BL_AST) || + (ldlm_lock_remove_from_lru(lock) == 0)) { + /* other thread is removing lock from lru or + * somebody is already doing CANCEL or + * there is a blocking request which will send + * cancel by itseft. */ + unlock_res_and_lock(lock); + LDLM_LOCK_PUT(lock); + spin_lock(&ns->ns_unused_lock); + continue; + } + LASSERT(!lock->l_readers && !lock->l_writers); + + /* If we have chosen to canecl this lock voluntarily, we better + send cancel notification to server, so that it frees + appropriate state. This might lead to a race where while + we are doing cancel here, server is also silently + cancelling this lock. */ + lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK; /* Setting the CBPENDING flag is a little misleading, but * prevents an important race; namely, once CBPENDING is set, * the lock can accumulate no more readers/writers. Since * readers and writers are already zero here, ldlm_lock_decref * won't see this flag and call l_blocking_ast */ - lock->l_flags |= LDLM_FL_CBPENDING; - + lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING; /* We can't re-add to l_lru as it confuses the refcounting in * ldlm_lock_remove_from_lru() if an AST arrives after we drop - * ns_lock below. We use l_tmp and can't use l_pending_chain as - * it is used both on server and client nevertheles bug 5666 + * ns_lock below. We use l_bl_ast and can't use l_pending_chain + * as it is used both on server and client nevertheles bug 5666 * says it is used only on server. --umka */ - if (sync != LDLM_ASYNC || ldlm_bl_to_thread(ns, NULL, lock)) - list_add(&lock->l_tmp, &cblist); + LASSERT(list_empty(&lock->l_bl_ast)); + list_add(&lock->l_bl_ast, cancels); unlock_res_and_lock(lock); - spin_lock(&ns->ns_unused_lock); - - if (--count == 0) - break; + added++; + unused--; } spin_unlock(&ns->ns_unused_lock); - list_for_each_entry_safe(lock, next, &cblist, l_tmp) { - list_del_init(&lock->l_tmp); - ldlm_handle_bl_callback(ns, NULL, lock); - } + /* Handle only @added inserted locks. */ + left = added; + list_for_each_entry_safe(lock, next, cancels, l_bl_ast) { + if (left-- == 0) + break; - RETURN(rc); + rc = ldlm_cli_cancel_local(lock); + if (rc == LDLM_FL_BL_AST) { + CFS_LIST_HEAD(head); + + LDLM_DEBUG(lock, "Cancel lock separately"); + list_del_init(&lock->l_bl_ast); + list_add(&lock->l_bl_ast, &head); + ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0); + rc = LDLM_FL_LOCAL_ONLY; + } + if (rc == LDLM_FL_LOCAL_ONLY) { + /* CANCEL RPC should not be sent to server. */ + list_del_init(&lock->l_bl_ast); + LDLM_LOCK_PUT(lock); + added--; + } + + } + RETURN(added); } -static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, - struct ldlm_res_id res_id, int flags, - void *opaque) +/* when called with LDLM_ASYNC the blocking callback will be handled + * in a thread and this function will return after the thread has been + * asked to call the callback. when called with LDLM_SYNC the blocking + * callback will be performed in this function. */ +int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync) { - struct list_head *tmp, *next, list = LIST_HEAD_INIT(list); - struct ldlm_resource *res; - struct ldlm_lock *lock; + CFS_LIST_HEAD(cancels); + int count, rc; ENTRY; - res = ldlm_resource_get(ns, NULL, res_id, 0, 0); - if (res == NULL) { - /* This is not a problem. */ - CDEBUG(D_INFO, "No resource "LPU64"\n", res_id.name[0]); - RETURN(0); +#ifndef __KERNEL__ + sync = LDLM_SYNC; /* force to be sync in user space */ +#endif + count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0); + if (sync == LDLM_ASYNC) { + rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count); + if (rc == 0) + RETURN(count); } - lock_res(res); - list_for_each(tmp, &res->lr_granted) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + /* If an error occured in ASYNC mode, or + * this is SYNC mode, cancel the list. */ + ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0); + RETURN(count); +} +/* Find and cancel locally unused locks found on resource, matched to the + * given policy, mode. GET the found locks and add them into the @cancels + * list. */ +int ldlm_cancel_resource_local(struct ldlm_resource *res, + struct list_head *cancels, + ldlm_policy_data_t *policy, + ldlm_mode_t mode, int lock_flags, + int flags, void *opaque) +{ + struct ldlm_lock *lock, *next; + int count = 0, left; + ENTRY; + + lock_res(res); + list_for_each_entry(lock, &res->lr_granted, l_res_link) { if (opaque != NULL && lock->l_ast_data != opaque) { LDLM_ERROR(lock, "data %p doesn't match opaque %p", lock->l_ast_data, opaque); + //LBUG(); continue; } if (lock->l_readers || lock->l_writers) { - if (flags & LDLM_FL_CONFIG_CHANGE) - lock->l_flags |= LDLM_FL_CBPENDING; - else if (flags & LDLM_FL_WARN) + if (flags & LDLM_FL_WARN) { LDLM_ERROR(lock, "lock in use"); + //LBUG(); + } continue; } + /* If somebody is already doing CANCEL, or blocking ast came, + * skip this lock. */ + if (lock->l_flags & LDLM_FL_BL_AST || + lock->l_flags & LDLM_FL_CANCELING) + continue; + + if (lockmode_compat(lock->l_granted_mode, mode)) + continue; + + /* If policy is given and this is IBITS lock, add to list only + * those locks that match by policy. */ + if (policy && (lock->l_resource->lr_type == LDLM_IBITS) && + !(lock->l_policy_data.l_inodebits.bits & + policy->l_inodebits.bits)) + continue; + /* See CBPENDING comment in ldlm_cancel_lru */ - lock->l_flags |= LDLM_FL_CBPENDING; + lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING | + lock_flags; LASSERT(list_empty(&lock->l_bl_ast)); - list_add(&lock->l_bl_ast, &list); + list_add(&lock->l_bl_ast, cancels); LDLM_LOCK_GET(lock); + count++; } unlock_res(res); - list_for_each_safe(tmp, next, &list) { - struct lustre_handle lockh; - int rc; - lock = list_entry(tmp, struct ldlm_lock, l_bl_ast); + /* Handle only @count inserted locks. */ + left = count; + list_for_each_entry_safe(lock, next, cancels, l_bl_ast) { + int rc = LDLM_FL_LOCAL_ONLY; - if (flags & LDLM_FL_LOCAL_ONLY) { + if (left-- == 0) + break; + if (flags & LDLM_FL_LOCAL_ONLY) ldlm_lock_cancel(lock); + else + rc = ldlm_cli_cancel_local(lock); + + if (rc == LDLM_FL_BL_AST) { + CFS_LIST_HEAD(head); + + LDLM_DEBUG(lock, "Cancel lock separately"); + list_del_init(&lock->l_bl_ast); + list_add(&lock->l_bl_ast, &head); + ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0); + rc = LDLM_FL_LOCAL_ONLY; + } + if (rc == LDLM_FL_LOCAL_ONLY) { + /* CANCEL RPC should not be sent to server. */ + list_del_init(&lock->l_bl_ast); + LDLM_LOCK_PUT(lock); + count--; + } + } + RETURN(count); +} + +/* If @req is NULL, send CANCEL request to server with handles of locks + * in the @cancels. If EARLY_CANCEL is not supported, send CANCEL requests + * separately per lock. + * If @req is not NULL, put handles of locks in @cancels into the request + * buffer at the offset @off. + * Destroy @cancels at the end. */ +int ldlm_cli_cancel_list(struct list_head *cancels, int count, + struct ptlrpc_request *req, int off, int flags) +{ + struct ldlm_lock *lock; + int res = 0; + ENTRY; + + if (list_empty(cancels) || count == 0) + RETURN(0); + + /* XXX: requests (both batched and not) could be sent in parallel. + * Usually it is enough to have just 1 RPC, but it is possible that + * there are to many locks to be cancelled in LRU or on a resource. + * It would also speed up the case when the server does not support + * the feature. */ + while (count > 0) { + LASSERT(!list_empty(cancels)); + lock = list_entry(cancels->next, struct ldlm_lock, l_bl_ast); + LASSERT(lock->l_conn_export); + + if (exp_connect_cancelset(lock->l_conn_export)) { + res = count; + if (req) + ldlm_cancel_pack(req, off, cancels, count); + else + res = ldlm_cli_cancel_req(lock->l_conn_export, + cancels, count, + flags); } else { - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc != ELDLM_OK) - CERROR("ldlm_cli_cancel: %d\n", rc); + res = ldlm_cli_cancel_req(lock->l_conn_export, + cancels, 1, flags); } - list_del_init(&lock->l_bl_ast); - LDLM_LOCK_PUT(lock); + + if (res < 0) { + CERROR("ldlm_cli_cancel_list: %d\n", res); + res = count; + } + + count -= res; + ldlm_lock_list_put(cancels, l_bl_ast, res); } + LASSERT(list_empty(cancels)); + LASSERT(count == 0); + RETURN(0); +} - ldlm_resource_putref(res); +int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, + const struct ldlm_res_id *res_id, + ldlm_policy_data_t *policy, + ldlm_mode_t mode, int flags, void *opaque) +{ + struct ldlm_resource *res; + CFS_LIST_HEAD(cancels); + int count; + int rc; + ENTRY; + res = ldlm_resource_get(ns, NULL, res_id, 0, 0); + if (res == NULL) { + /* This is not a problem. */ + CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]); + RETURN(0); + } + + count = ldlm_cancel_resource_local(res, &cancels, policy, mode, + 0, flags, opaque); + rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0, flags); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc); + + ldlm_resource_putref(res); RETURN(0); } @@ -792,23 +1386,20 @@ static inline int have_no_nsresource(struct ldlm_namespace *ns) * that have 0 readers/writers. * * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying - * to notify the server. - * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback. - * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. - * If flags & LDLM_FL_CONFIG_CHANGE, mark all locks as having a pending callback - */ + * to notify the server. */ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, - struct ldlm_res_id *res_id, int flags, void *opaque) + const struct ldlm_res_id *res_id, + int flags, void *opaque) { int i; - struct l_wait_info lwi = { 0 }; ENTRY; if (ns == NULL) RETURN(ELDLM_OK); if (res_id) - RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags, + RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, NULL, + LCK_MINMODE, flags, opaque)); spin_lock(&ns->ns_hash_lock); @@ -823,10 +1414,12 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, ldlm_resource_getref(res); spin_unlock(&ns->ns_hash_lock); - rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name, + rc = ldlm_cli_cancel_unused_resource(ns, &res->lr_name, + NULL, LCK_MINMODE, flags, opaque); + if (rc) - CERROR("cancel_unused_res ("LPU64"): %d\n", + CERROR("ldlm_cli_cancel_unused ("LPU64"): %d\n", res->lr_name.name[0], rc); spin_lock(&ns->ns_hash_lock); @@ -836,12 +1429,58 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, } spin_unlock(&ns->ns_hash_lock); - if (flags & LDLM_FL_CONFIG_CHANGE) - l_wait_event(ns->ns_waitq, have_no_nsresource(ns), &lwi); - RETURN(ELDLM_OK); } +/* join/split resource locks to/from lru list */ +int ldlm_cli_join_lru(struct ldlm_namespace *ns, + const struct ldlm_res_id *res_id, int join) +{ + struct ldlm_resource *res; + struct ldlm_lock *lock, *n; + int count = 0; + ENTRY; + + LASSERT(ns_is_client(ns)); + + res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0); + if (res == NULL) + RETURN(count); + LASSERT(res->lr_type == LDLM_EXTENT); + + lock_res(res); + if (!join) + goto split; + + list_for_each_entry_safe (lock, n, &res->lr_granted, l_res_link) { + if (list_empty(&lock->l_lru) && + !lock->l_readers && !lock->l_writers && + !(lock->l_flags & LDLM_FL_LOCAL) && + !(lock->l_flags & LDLM_FL_CBPENDING)) { + ldlm_lock_add_to_lru(lock); + lock->l_flags &= ~LDLM_FL_NO_LRU; + LDLM_DEBUG(lock, "join lock to lru"); + count++; + } + } + goto unlock; +split: + spin_lock(&ns->ns_unused_lock); + list_for_each_entry_safe (lock, n, &ns->ns_unused_list, l_lru) { + if (lock->l_resource == res) { + ldlm_lock_remove_from_lru_nolock(lock); + lock->l_flags |= LDLM_FL_NO_LRU; + LDLM_DEBUG(lock, "split lock from lru"); + count++; + } + } + spin_unlock(&ns->ns_unused_lock); +unlock: + unlock_res(res); + ldlm_resource_putref(res); + RETURN(count); +} + /* Lock iterators. */ int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter, @@ -912,6 +1551,7 @@ int ldlm_namespace_foreach_res(struct ldlm_namespace *ns, struct ldlm_resource *res; struct list_head *tmp; + ENTRY; spin_lock(&ns->ns_hash_lock); for (i = 0; i < RES_HASH_SIZE; i++) { tmp = ns->ns_hash[i].next; @@ -935,10 +1575,9 @@ int ldlm_namespace_foreach_res(struct ldlm_namespace *ns, } /* non-blocking function to manipulate a lock whose cb_data is being put away.*/ -void ldlm_change_cbdata(struct ldlm_namespace *ns, - struct ldlm_res_id *res_id, - ldlm_iterator_t iter, - void *data) +void ldlm_resource_iterate(struct ldlm_namespace *ns, + const struct ldlm_res_id *res_id, + ldlm_iterator_t iter, void *data) { struct ldlm_resource *res; ENTRY; @@ -948,7 +1587,7 @@ void ldlm_change_cbdata(struct ldlm_namespace *ns, LBUG(); } - res = ldlm_resource_get(ns, NULL, *res_id, 0, 0); + res = ldlm_resource_get(ns, NULL, res_id, 0, 0); if (res == NULL) { EXIT; return; @@ -966,38 +1605,52 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure) struct list_head *list = closure; /* we use l_pending_chain here, because it's unused on clients. */ - list_add(&lock->l_pending_chain, list); + LASSERTF(list_empty(&lock->l_pending_chain),"lock %p next %p prev %p\n", + lock, &lock->l_pending_chain.next,&lock->l_pending_chain.prev); + /* bug 9573: don't replay locks left after eviction */ + if (!(lock->l_flags & LDLM_FL_FAILED)) + list_add(&lock->l_pending_chain, list); return LDLM_ITER_CONTINUE; } static int replay_lock_interpret(struct ptlrpc_request *req, - void * data, int rc) + struct ldlm_async_args *aa, int rc) { struct ldlm_lock *lock; struct ldlm_reply *reply; + ENTRY; atomic_dec(&req->rq_import->imp_replay_inflight); if (rc != ELDLM_OK) GOTO(out, rc); - lock = req->rq_async_args.pointer_arg[0]; - LASSERT(lock != NULL); - reply = lustre_swab_repbuf(req, 0, sizeof (*reply), + reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply), lustre_swab_ldlm_reply); if (reply == NULL) { CERROR("Can't unpack ldlm_reply\n"); GOTO (out, rc = -EPROTO); } - memcpy(&lock->l_remote_handle, &reply->lock_handle, - sizeof(lock->l_remote_handle)); + lock = ldlm_handle2lock(&aa->lock_handle); + if (!lock) { + CERROR("received replay ack for unknown local cookie "LPX64 + " remote cookie "LPX64 " from server %s id %s\n", + aa->lock_handle.cookie, reply->lock_handle.cookie, + req->rq_export->exp_client_uuid.uuid, + libcfs_id2str(req->rq_peer)); + GOTO(out, rc = -ESTALE); + } + + lock->l_remote_handle = reply->lock_handle; LDLM_DEBUG(lock, "replayed lock:"); ptlrpc_import_recovery_state_machine(req->rq_import); + LDLM_LOCK_PUT(lock); out: if (rc != ELDLM_OK) ptlrpc_connect_import(req->rq_import, NULL); + RETURN(rc); } @@ -1006,10 +1659,27 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) struct ptlrpc_request *req; struct ldlm_request *body; struct ldlm_reply *reply; - int buffers = 1; - int size[2] = {0, sizeof(*body)}; + struct ldlm_async_args *aa; + int buffers = 2; + int size[3] = { sizeof(struct ptlrpc_body) }; int flags; + ENTRY; + + + /* Bug 11974: Do not replay a lock which is actively being canceled */ + if (lock->l_flags & LDLM_FL_CANCELING) { + LDLM_DEBUG(lock, "Not replaying canceled lock:"); + RETURN(0); + } + /* If this is reply-less callback lock, we cannot replay it, since + * server might have long dropped it, but notification of that event was + * lost by network. (and server granted conflicting lock already) */ + if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) { + LDLM_DEBUG(lock, "Not replaying reply-less lock:"); + ldlm_lock_cancel(lock); + RETURN(0); + } /* * If granted mode matches the requested mode, this lock is granted. * @@ -1033,25 +1703,26 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) else flags = LDLM_FL_REPLAY; - req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE, - 2, size, NULL); + size[DLM_LOCKREQ_OFF] = sizeof(*body); + req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 2, size, + NULL); if (!req) RETURN(-ENOMEM); /* We're part of recovery, so don't wait for it. */ req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS; - body = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*body)); + body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body)); ldlm_lock2desc(lock, &body->lock_desc); body->lock_flags = flags; - ldlm_lock2handle(lock, &body->lock_handle1); - size[0] = sizeof(*reply); + ldlm_lock2handle(lock, &body->lock_handle[0]); + size[DLM_LOCKREPLY_OFF] = sizeof(*reply); if (lock->l_lvb_len != 0) { - buffers = 2; - size[1] = lock->l_lvb_len; + buffers = 3; + size[DLM_REPLY_REC_OFF] = lock->l_lvb_len; } - req->rq_replen = lustre_msg_size(buffers, size); + ptlrpc_req_set_repsize(req, buffers, size); /* notify the server we've replayed all requests. * also, we mark the request to be put on a dedicated * queue to be processed after all request replayes. @@ -1060,9 +1731,10 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) LDLM_DEBUG(lock, "replaying lock:"); - imp->imp_locks_replayed++; atomic_inc(&req->rq_import->imp_replay_inflight); - req->rq_async_args.pointer_arg[0] = lock; + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = (struct ldlm_async_args *)&req->rq_async_args; + aa->lock_handle = body->lock_handle[0]; req->rq_interpret_reply = replay_lock_interpret; ptlrpcd_add_req(req); @@ -1072,26 +1744,25 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) int ldlm_replay_locks(struct obd_import *imp) { struct ldlm_namespace *ns = imp->imp_obd->obd_namespace; - struct list_head list, *pos, *next; - struct ldlm_lock *lock; + struct list_head list; + struct ldlm_lock *lock, *next; int rc = 0; ENTRY; - INIT_LIST_HEAD(&list); + CFS_INIT_LIST_HEAD(&list); LASSERT(atomic_read(&imp->imp_replay_inflight) == 0); - LASSERT(ns != NULL); /* ensure this doesn't fall to 0 before all have been queued */ atomic_inc(&imp->imp_replay_inflight); (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list); - list_for_each_safe(pos, next, &list) { - lock = list_entry(pos, struct ldlm_lock, l_pending_chain); - rc = replay_one_lock(imp, lock); + list_for_each_entry_safe(lock, next, &list, l_pending_chain) { + list_del_init(&lock->l_pending_chain); if (rc) - break; /* or try to do the rest? */ + continue; /* or try to do the rest? */ + rc = replay_one_lock(imp, lock); } atomic_dec(&imp->imp_replay_inflight);