From ce84143cd2882e03fe828c3b5fb4d331f8a92669 Mon Sep 17 00:00:00 2001 From: vitaly Date: Mon, 10 Dec 2007 15:05:28 +0000 Subject: [PATCH] Branch b1_6 b=13563 i=green i=shadow 1) cancel lru locks in ldlm_cli_cancel() and send 1 batched cancel RPC; 2) pass the whole list of canceled locks to the async blocking thread, not lock-by-lock, and send cancels in batched cancel RPCs. --- lustre/include/lustre_dlm.h | 2 +- lustre/ldlm/ldlm_internal.h | 9 ++- lustre/ldlm/ldlm_lock.c | 2 +- lustre/ldlm/ldlm_lockd.c | 62 +++++++++++------ lustre/ldlm/ldlm_pool.c | 2 +- lustre/ldlm/ldlm_request.c | 163 ++++++++++++++++++++++---------------------- 6 files changed, 131 insertions(+), 109 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index a9f03ed..e1cf930 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -766,7 +766,7 @@ int ldlm_cli_join_lru(struct ldlm_namespace *, struct ldlm_res_id *, int join); int ldlm_cancel_resource_local(struct ldlm_resource *res, struct list_head *cancels, ldlm_policy_data_t *policy, ldlm_mode_t mode, - int lock_flags, int flags, void *opaque); + int lock_flags, int cancel_flags, void *opaque); int ldlm_cli_cancel_list(struct list_head *head, int count, struct ptlrpc_request *req, int off); diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 5966d4c..49a1cf5 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -46,7 +46,7 @@ enum { int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync, int flags); int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, - int count, int max, int flags); + int count, int max, int cancel_flags, int flags); extern int ldlm_enqueue_min; int ldlm_get_enq_timeout(struct ldlm_lock *lock); @@ -95,8 +95,11 @@ void ldlm_lock_touch_in_lru(struct ldlm_lock *lock); void ldlm_lock_destroy_nolock(struct ldlm_lock *lock); /* ldlm_lockd.c */ -int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct ldlm_lock *lock, int flags); +int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, + struct ldlm_lock *lock); +int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, + struct list_head *cancels, int count); + void ldlm_handle_bl_callback(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, struct ldlm_lock *lock); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 198b5aa..a999626 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -634,7 +634,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ldlm_lock_remove_from_lru(lock); unlock_res_and_lock(lock); if ((lock->l_flags & LDLM_FL_ATOMIC_CB) || - ldlm_bl_to_thread(ns, NULL, lock, 0) != 0) + ldlm_bl_to_thread_lock(ns, NULL, lock) != 0) ldlm_handle_bl_callback(ns, NULL, lock); } else if (ns_is_client(ns) && !lock->l_readers && !lock->l_writers && diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 72240f8..024a1e2 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -103,7 +103,8 @@ struct ldlm_bl_work_item { struct ldlm_namespace *blwi_ns; struct ldlm_lock_desc blwi_ld; struct ldlm_lock *blwi_lock; - int blwi_flags; + struct list_head blwi_head; + int blwi_count; }; #ifdef __KERNEL__ @@ -1347,7 +1348,7 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req, cfs_time_add(lock->l_last_used, cfs_time_seconds(10)))) { unlock_res_and_lock(lock); - if (ldlm_bl_to_thread(ns, NULL, lock, 0)) + if (ldlm_bl_to_thread_lock(ns, NULL, lock)) ldlm_handle_bl_callback(ns, NULL, lock); EXIT; @@ -1369,14 +1370,18 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc) return ptlrpc_reply(req); } -int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, - struct ldlm_lock *lock, int flags) -{ #ifdef __KERNEL__ +static int ldlm_bl_to_thread(struct ldlm_namespace *ns, + struct ldlm_lock_desc *ld, struct ldlm_lock *lock, + struct list_head *cancels, int count) +{ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; struct ldlm_bl_work_item *blwi; ENTRY; + if (cancels && count == 0) + RETURN(0); + OBD_ALLOC(blwi, sizeof(*blwi)); if (blwi == NULL) RETURN(-ENOMEM); @@ -1384,15 +1389,37 @@ int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, blwi->blwi_ns = ns; if (ld != NULL) blwi->blwi_ld = *ld; - blwi->blwi_lock = lock; - blwi->blwi_flags = flags; - + if (count) { + list_add(&blwi->blwi_head, cancels); + list_del_init(cancels); + blwi->blwi_count = count; + } else { + blwi->blwi_lock = lock; + } spin_lock(&blp->blp_lock); list_add_tail(&blwi->blwi_entry, &blp->blp_list); cfs_waitq_signal(&blp->blp_waitq); spin_unlock(&blp->blp_lock); RETURN(0); +} +#endif + +int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, + struct ldlm_lock *lock) +{ +#ifdef __KERNEL__ + RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0)); +#else + RETURN(-ENOSYS); +#endif +} + +int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld, + struct list_head *cancels, int count) +{ +#ifdef __KERNEL__ + RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count)); #else RETURN(-ENOSYS); #endif @@ -1541,7 +1568,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req) CDEBUG(D_INODE, "blocking ast\n"); if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) ldlm_callback_reply(req, 0); - if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock, 0)) + if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock)) ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock); break; case LDLM_CP_CALLBACK: @@ -1707,18 +1734,13 @@ static int ldlm_bl_thread_main(void *arg) break; } - if (blwi->blwi_flags == LDLM_FL_CANCELING) { + if (blwi->blwi_count) { /* The special case when we cancel locks in lru - * asynchronously, then we first remove the lock from - * l_bl_ast explicitely in ldlm_cancel_lru before - * sending it to this thread. Thus lock is marked - * LDLM_FL_CANCELING, and already cancelled locally. */ - CFS_LIST_HEAD(head); - LASSERT(list_empty(&blwi->blwi_lock->l_bl_ast)); - list_add(&blwi->blwi_lock->l_bl_ast, &head); - ldlm_cli_cancel_req(blwi->blwi_lock->l_conn_export, - &head, 1); - LDLM_LOCK_PUT(blwi->blwi_lock); + * asynchronously, we pass the list of locks here. + * Thus lock is marked LDLM_FL_CANCELING, and already + * canceled locally. */ + ldlm_cli_cancel_list(&blwi->blwi_head, + blwi->blwi_count, NULL, 0); } else { ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld, blwi->blwi_lock); diff --git a/lustre/ldlm/ldlm_pool.c b/lustre/ldlm/ldlm_pool.c index 979de47..7aca642 100644 --- a/lustre/ldlm/ldlm_pool.c +++ b/lustre/ldlm/ldlm_pool.c @@ -340,7 +340,7 @@ static int ldlm_cli_pool_shrink(struct ldlm_pool *pl, * policy. */ if (nr == 0) RETURN(ldlm_cancel_lru_local(ldlm_pl2ns(pl), NULL, 0, - 0, LDLM_CANCEL_SHRINK)); + 0, 0, LDLM_CANCEL_SHRINK)); /* Cancel @nr locks accoding to shrink policy */ RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), nr, LDLM_SYNC, diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 1293378..84871a8 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -497,18 +497,28 @@ cleanup: /* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into * a single page on the send/receive side. XXX: 512 should be changed * to more adequate value. */ -#define ldlm_req_handles_avail(exp, size, bufcount, off) \ -({ \ - int _avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); \ - int _s = size[DLM_LOCKREQ_OFF]; \ - size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); \ - _avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, \ - bufcount, size); \ - _avail /= sizeof(struct lustre_handle); \ - _avail += LDLM_LOCKREQ_HANDLES - off; \ - size[DLM_LOCKREQ_OFF] = _s; \ - _avail; \ -}) +static inline int ldlm_req_handles_avail(struct obd_export *exp, + int *size, int bufcount, int off) +{ + int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); + int old_size = size[DLM_LOCKREQ_OFF]; + + size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); + avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, + bufcount, size); + avail /= sizeof(struct lustre_handle); + avail += LDLM_LOCKREQ_HANDLES - off; + size[DLM_LOCKREQ_OFF] = old_size; + + return avail; +} + +static inline int ldlm_cancel_handles_avail(struct obd_export *exp) +{ + int size[2] = { sizeof(struct ptlrpc_body), + sizeof(struct ldlm_request) }; + return ldlm_req_handles_avail(exp, size, 2, 0); +} /* Cancel lru locks and pack them into the enqueue request. Pack there the given * @count locks in @cancels. */ @@ -529,20 +539,20 @@ struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp, /* Estimate the amount of free space in the request. */ int avail = ldlm_req_handles_avail(exp, size, bufcount, LDLM_ENQUEUE_CANCEL_OFF); - int flags, cancel; + int flags, to_free; LASSERT(avail >= count); flags = ns_connect_lru_resize(ns) ? LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED; - cancel = ns_connect_lru_resize(ns) ? 0 : 1; + to_free = ns_connect_lru_resize(ns) ? 0 : 1; /* Cancel lru locks here _only_ if the server supports * EARLY_CANCEL. Otherwise we have to send extra CANCEL * rpc right on enqueue, what will make it slower, vs. * asynchronous rpc in blocking thread. */ - count += ldlm_cancel_lru_local(ns, cancels, cancel, - avail - count, flags); + count += ldlm_cancel_lru_local(ns, cancels, to_free, + avail - count, 0, flags); size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_ENQUEUE); } @@ -938,6 +948,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, "out of sync -- not fatal\n", libcfs_nid2str(req->rq_import-> imp_connection->c_peer.nid)); + rc = 0; } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/ req->rq_import_generation == imp->imp_generation) { ptlrpc_req_finished(req); @@ -1018,9 +1029,10 @@ EXPORT_SYMBOL(ldlm_cli_update_pool); int ldlm_cli_cancel(struct lustre_handle *lockh) { + struct ldlm_namespace *ns; + int avail, flags, count = 1, rc = 0; struct ldlm_lock *lock; CFS_LIST_HEAD(cancels); - int rc = 0; ENTRY; /* concurrent cancels on the same handle can happen */ @@ -1031,22 +1043,30 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) } rc = ldlm_cli_cancel_local(lock); - - if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) - GOTO(out, rc); - + if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) { + LDLM_LOCK_PUT(lock); + RETURN(rc < 0 ? rc : 0); + } + /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL + * rpc which goes to canceld portal, so we can cancel other lru locks + * here and send them all as one LDLM_CANCEL rpc. */ + LASSERT(list_empty(&lock->l_bl_ast)); list_add(&lock->l_bl_ast, &cancels); - rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1); - EXIT; - out: - LDLM_LOCK_PUT(lock); - return rc < 0 ? rc : 0; + avail = ldlm_cancel_handles_avail(lock->l_conn_export); + LASSERT(avail > 0); + + ns = lock->l_resource->lr_namespace; + flags = ns_connect_lru_resize(ns) ? LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED; + count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1, + LDLM_FL_BL_AST, flags); + ldlm_cli_cancel_list(&cancels, count, NULL, 0); + RETURN(0); } /* XXX until we will have compound requests and can cut cancels from generic rpc * we need send cancels with LDLM_FL_BL_AST flag as separate rpc */ -static int ldlm_cancel_list(struct list_head *cancels, int count) +static int ldlm_cancel_list(struct list_head *cancels, int count, int flags) { CFS_LIST_HEAD(head); struct ldlm_lock *lock, *next; @@ -1057,8 +1077,13 @@ static int ldlm_cancel_list(struct list_head *cancels, int count) if (left-- == 0) break; - rc = ldlm_cli_cancel_local(lock); - if (rc == LDLM_FL_BL_AST) { + if (flags & LDLM_FL_LOCAL_ONLY) { + rc = LDLM_FL_LOCAL_ONLY; + ldlm_lock_cancel(lock); + } else { + rc = ldlm_cli_cancel_local(lock); + } + if (!(flags & LDLM_FL_BL_AST) && (rc == LDLM_FL_BL_AST)) { LDLM_DEBUG(lock, "Cancel lock separately"); list_del_init(&lock->l_bl_ast); list_add(&lock->l_bl_ast, &head); @@ -1073,7 +1098,7 @@ static int ldlm_cancel_list(struct list_head *cancels, int count) } } - if(bl_ast > 0) { + if (bl_ast > 0) { count -= bl_ast; ldlm_cli_cancel_list(&head, bl_ast, NULL, 0); } @@ -1081,25 +1106,6 @@ static int ldlm_cancel_list(struct list_head *cancels, int count) RETURN(count); } -/* cancel lock list without sending rpc to server*/ -static int ldlm_cancel_list_local(struct list_head *cancels, int count) -{ - struct ldlm_lock *lock, *next; - int left = 0; - - left = count; - list_for_each_entry_safe(lock, next, cancels, l_bl_ast) { - if (left-- == 0) - break; - ldlm_lock_cancel(lock); - /* CANCEL RPC should not be sent to server. */ - list_del_init(&lock->l_bl_ast); - LDLM_LOCK_PUT(lock); - count--; - } - RETURN(count); -} - /* Return 1 if @lock should be canceled according to shrinker policy. * Return zero otherwise. */ static int ldlm_cancel_shrink_policy(struct ldlm_namespace *ns, @@ -1242,7 +1248,7 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags) * memory pressre policy function. */ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, - int count, int max, int flags) + int count, int max, int cancel_flags, int flags) { ldlm_cancel_lru_policy_t cancel_lru_policy_func; int added = 0, unused, cancel; @@ -1303,25 +1309,26 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, LASSERT(!lock->l_readers && !lock->l_writers); /* If we have chosen to cancel this lock voluntarily, we - * better send cancel notification to server, so that it + * better send cancel notification to server, so that it * frees appropriate state. This might lead to a race * where while we are doing cancel here, server is also * silently cancelling this lock. */ lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK; - /* Setting the CBPENDING flag is a little misleading, but - * prevents an important race; namely, once CBPENDING is - * set, the lock can accumulate no more readers/writers. - * Since readers and writers are already zero here, - * ldlm_lock_decref() won't see this flag and call - * l_blocking_ast */ + /* Setting the CBPENDING flag is a little misleading, + * but prevents an important race; namely, once + * CBPENDING is set, the lock can accumulate no more + * readers/writers. Since readers and writers are + * already zero here, ldlm_lock_decref() won't see + * this flag and call l_blocking_ast */ lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING; - /* We can't re-add to l_lru as it confuses the refcounting - * in ldlm_lock_remove_from_lru() if an AST arrives after - * we drop ns_lock below. We use l_bl_ast and can't use - * l_pending_chain as it is used both on server and client - * nevertheless bug 5666 says it is used only on server */ + /* We can't re-add to l_lru as it confuses the + * refcounting in ldlm_lock_remove_from_lru() if an AST + * arrives after we drop ns_lock below. We use l_bl_ast + * and can't use l_pending_chain as it is used both on + * server and client nevertheless bug 5666 says it is + * used only on server */ LASSERT(list_empty(&lock->l_bl_ast)); list_add(&lock->l_bl_ast, cancels); unlock_res_and_lock(lock); @@ -1335,7 +1342,7 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, if (cancels == NULL) RETURN(added); - RETURN(ldlm_cancel_list(cancels, added)); + RETURN(ldlm_cancel_list(cancels, added, cancel_flags)); } /* when called with LDLM_ASYNC the blocking callback will be handled @@ -1352,23 +1359,16 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync, #ifndef __KERNEL__ sync = LDLM_SYNC; /* force to be sync in user space */ #endif - count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, flags); + count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0, flags); if (sync == LDLM_ASYNC) { - struct ldlm_lock *lock, *next; - list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) { - /* Remove from the list to allow blocking thread to - * re-use l_bl_ast. */ - list_del_init(&lock->l_bl_ast); - rc = ldlm_bl_to_thread(ns, NULL, lock, - LDLM_FL_CANCELING); - if (rc) - list_add_tail(&lock->l_bl_ast, &next->l_bl_ast); - } + rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count); + if (rc == 0) + RETURN(count); } - /* If some locks are left in the list in ASYNC mode, or + /* If an error occured in ASYNC mode, or * this is SYNC mode, cancel the list. */ - ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF); + ldlm_cli_cancel_list(&cancels, count, NULL, 0); RETURN(count); } @@ -1379,7 +1379,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, struct list_head *cancels, ldlm_policy_data_t *policy, ldlm_mode_t mode, int lock_flags, - int flags, void *opaque) + int cancel_flags, void *opaque) { struct ldlm_lock *lock; int count = 0; @@ -1395,7 +1395,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, } if (lock->l_readers || lock->l_writers) { - if (flags & LDLM_FL_WARN) { + if (cancel_flags & LDLM_FL_WARN) { LDLM_ERROR(lock, "lock in use"); //LBUG(); } @@ -1430,10 +1430,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res, unlock_res(res); /* Handle only @count inserted locks. */ - if ((flags & LDLM_FL_LOCAL_ONLY)) - RETURN(ldlm_cancel_list_local(cancels, count)); - - RETURN(ldlm_cancel_list(cancels, count)); + RETURN(ldlm_cancel_list(cancels, count, cancel_flags)); } /* If @req is NULL, send CANCEL request to server with handles of locks @@ -1500,7 +1497,7 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, count = ldlm_cancel_resource_local(res, &cancels, NULL, LCK_MINMODE, 0, flags, opaque); - rc = ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF); + rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0); if (rc != ELDLM_OK) CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc); -- 1.8.3.1