1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/ldlm/ldlm_lockd.c
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
43 # define EXPORT_SYMTAB
45 #define DEBUG_SUBSYSTEM S_LDLM
48 # include <libcfs/libcfs.h>
50 # include <liblustre.h>
53 #include <lustre_dlm.h>
54 #include <obd_class.h>
55 #include <libcfs/list.h>
56 #include "ldlm_internal.h"
59 static int ldlm_num_threads;
60 CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
61 "number of DLM service threads to start");
64 extern cfs_mem_cache_t *ldlm_resource_slab;
65 extern cfs_mem_cache_t *ldlm_lock_slab;
66 extern struct lustre_lock ldlm_handle_lock;
68 static struct semaphore ldlm_ref_sem;
69 static int ldlm_refcount;
71 static struct ldlm_state *ldlm_state;
73 inline cfs_time_t round_timeout(cfs_time_t timeout)
75 return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
78 /* timeout for initial callback (AST) reply (bz10399) */
79 static inline unsigned int ldlm_get_rq_timeout(void)
82 unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
84 return timeout < 1 ? 1 : timeout;
88 /* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
89 static spinlock_t waiting_locks_spinlock; /* BH lock (timer) */
90 static struct list_head waiting_locks_list;
91 static cfs_timer_t waiting_locks_timer;
93 static struct expired_lock_thread {
94 cfs_waitq_t elt_waitq;
97 struct list_head elt_expired_locks;
98 } expired_lock_thread;
101 #define ELT_STOPPED 0
103 #define ELT_TERMINATE 2
105 struct ldlm_bl_pool {
109 * blp_prio_list is used for callbacks that should be handled
110 * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
113 struct list_head blp_prio_list;
116 * blp_list is used for all other callbacks which are likely
117 * to take longer to process.
119 struct list_head blp_list;
121 cfs_waitq_t blp_waitq;
122 struct completion blp_comp;
123 atomic_t blp_num_threads;
124 atomic_t blp_busy_threads;
129 struct ldlm_bl_work_item {
130 struct list_head blwi_entry;
131 struct ldlm_namespace *blwi_ns;
132 struct ldlm_lock_desc blwi_ld;
133 struct ldlm_lock *blwi_lock;
134 struct list_head blwi_head;
136 struct completion blwi_comp;
137 atomic_t blwi_ref_count;
141 static inline void ldlm_bl_work_item_get(struct ldlm_bl_work_item *blwi)
143 atomic_inc(&blwi->blwi_ref_count);
146 static inline void ldlm_bl_work_item_put(struct ldlm_bl_work_item *blwi)
148 if (atomic_dec_and_test(&blwi->blwi_ref_count))
149 OBD_FREE(blwi, sizeof(*blwi));
152 static inline int have_expired_locks(void)
157 spin_lock_bh(&waiting_locks_spinlock);
158 need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
159 spin_unlock_bh(&waiting_locks_spinlock);
164 static int expired_lock_main(void *arg)
166 struct list_head *expired = &expired_lock_thread.elt_expired_locks;
167 struct l_wait_info lwi = { 0 };
171 cfs_daemonize("ldlm_elt");
173 expired_lock_thread.elt_state = ELT_READY;
174 cfs_waitq_signal(&expired_lock_thread.elt_waitq);
177 l_wait_event(expired_lock_thread.elt_waitq,
178 have_expired_locks() ||
179 expired_lock_thread.elt_state == ELT_TERMINATE,
182 spin_lock_bh(&waiting_locks_spinlock);
183 if (expired_lock_thread.elt_dump) {
184 spin_unlock_bh(&waiting_locks_spinlock);
186 /* from waiting_locks_callback, but not in timer */
187 libcfs_debug_dumplog();
188 libcfs_run_lbug_upcall(__FILE__,
189 "waiting_locks_callback",
190 expired_lock_thread.elt_dump);
192 spin_lock_bh(&waiting_locks_spinlock);
193 expired_lock_thread.elt_dump = 0;
198 while (!list_empty(expired)) {
199 struct obd_export *export;
200 struct ldlm_lock *lock;
202 lock = list_entry(expired->next, struct ldlm_lock,
204 if ((void *)lock < LP_POISON + CFS_PAGE_SIZE &&
205 (void *)lock >= LP_POISON) {
206 spin_unlock_bh(&waiting_locks_spinlock);
207 CERROR("free lock on elt list %p\n", lock);
210 list_del_init(&lock->l_pending_chain);
211 if ((void *)lock->l_export < LP_POISON + CFS_PAGE_SIZE &&
212 (void *)lock->l_export >= LP_POISON) {
213 CERROR("lock with free export on elt list %p\n",
215 lock->l_export = NULL;
216 LDLM_ERROR(lock, "free export");
217 /* release extra ref grabbed by
218 * ldlm_add_waiting_lock() or
219 * ldlm_failed_ast() */
223 export = class_export_get(lock->l_export);
224 spin_unlock_bh(&waiting_locks_spinlock);
226 /* release extra ref grabbed by ldlm_add_waiting_lock()
227 * or ldlm_failed_ast() */
231 class_fail_export(export);
232 class_export_put(export);
233 spin_lock_bh(&waiting_locks_spinlock);
235 spin_unlock_bh(&waiting_locks_spinlock);
237 if (do_dump && obd_dump_on_eviction) {
238 CERROR("dump the log upon eviction\n");
239 libcfs_debug_dumplog();
242 if (expired_lock_thread.elt_state == ELT_TERMINATE)
246 expired_lock_thread.elt_state = ELT_STOPPED;
247 cfs_waitq_signal(&expired_lock_thread.elt_waitq);
252 * Check if there is a request in the export request list
253 * which prevents the lock canceling.
255 static int ldlm_lock_busy(struct ldlm_lock *lock)
257 struct ptlrpc_request *req;
261 if (lock->l_export == NULL)
264 spin_lock(&lock->l_export->exp_lock);
265 list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
266 if (req->rq_ops->hpreq_lock_match) {
267 match = req->rq_ops->hpreq_lock_match(req, lock);
272 spin_unlock(&lock->l_export->exp_lock);
276 /* This is called from within a timer interrupt and cannot schedule */
277 static void waiting_locks_callback(unsigned long unused)
279 struct ldlm_lock *lock, *last = NULL;
281 spin_lock_bh(&waiting_locks_spinlock);
282 while (!list_empty(&waiting_locks_list)) {
283 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
285 if (cfs_time_after(lock->l_callback_timeout, cfs_time_current())
286 || (lock->l_req_mode == LCK_GROUP))
289 /* Check if we need to prolong timeout */
290 if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
291 ldlm_lock_busy(lock)) {
294 if (lock->l_pending_chain.next == &waiting_locks_list)
298 spin_unlock_bh(&waiting_locks_spinlock);
299 LDLM_DEBUG(lock, "prolong the busy lock");
300 ldlm_refresh_waiting_lock(lock,
301 ldlm_get_enq_timeout(lock));
302 spin_lock_bh(&waiting_locks_spinlock);
312 lock->l_resource->lr_namespace->ns_timeouts++;
313 LDLM_ERROR(lock, "lock callback timer expired after %lds: "
314 "evicting client at %s ",
315 cfs_time_current_sec()- lock->l_last_activity,
317 lock->l_export->exp_connection->c_peer.nid));
319 LDLM_ERROR(lock, "waiting on lock multiple times");
320 CERROR("wll %p n/p %p/%p, l_pending %p n/p %p/%p\n",
322 waiting_locks_list.next, waiting_locks_list.prev,
323 &lock->l_pending_chain,
324 lock->l_pending_chain.next,
325 lock->l_pending_chain.prev);
327 CFS_INIT_LIST_HEAD(&waiting_locks_list); /* HACK */
328 expired_lock_thread.elt_dump = __LINE__;
331 CEMERG("would be an LBUG, but isn't (bug 5653)\n");
332 libcfs_debug_dumpstack(NULL);
333 /*blocks* libcfs_debug_dumplog(); */
334 /*blocks* libcfs_run_lbug_upcall(file, func, line); */
339 /* no needs to take an extra ref on the lock since it was in
340 * the waiting_locks_list and ldlm_add_waiting_lock()
341 * already grabbed a ref */
342 list_del(&lock->l_pending_chain);
343 list_add(&lock->l_pending_chain,
344 &expired_lock_thread.elt_expired_locks);
347 if (!list_empty(&expired_lock_thread.elt_expired_locks)) {
348 if (obd_dump_on_timeout)
349 expired_lock_thread.elt_dump = __LINE__;
351 cfs_waitq_signal(&expired_lock_thread.elt_waitq);
355 * Make sure the timer will fire again if we have any locks
358 if (!list_empty(&waiting_locks_list)) {
359 cfs_time_t timeout_rounded;
360 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
362 timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout);
363 cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
365 spin_unlock_bh(&waiting_locks_spinlock);
369 * Indicate that we're waiting for a client to call us back cancelling a given
370 * lock. We add it to the pending-callback chain, and schedule the lock-timeout
371 * timer to fire appropriately. (We round up to the next second, to avoid
372 * floods of timer firings during periods of high lock contention and traffic).
373 * As done by ldlm_add_waiting_lock(), the caller must grab a lock reference
374 * if it has been added to the waiting list (1 is returned).
376 * Called with the namespace lock held.
378 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds)
381 cfs_time_t timeout_rounded;
383 if (!list_empty(&lock->l_pending_chain))
386 if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
387 OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
390 timeout = cfs_time_shift(seconds);
391 if (likely(cfs_time_after(timeout, lock->l_callback_timeout)))
392 lock->l_callback_timeout = timeout;
394 timeout_rounded = round_timeout(lock->l_callback_timeout);
396 if (cfs_time_before(timeout_rounded,
397 cfs_timer_deadline(&waiting_locks_timer)) ||
398 !cfs_timer_is_armed(&waiting_locks_timer)) {
399 cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
401 /* if the new lock has a shorter timeout than something earlier on
402 the list, we'll wait the longer amount of time; no big deal. */
403 list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
407 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
410 int timeout = ldlm_get_enq_timeout(lock);
412 LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
414 spin_lock_bh(&waiting_locks_spinlock);
415 if (lock->l_destroyed) {
416 static cfs_time_t next;
417 spin_unlock_bh(&waiting_locks_spinlock);
418 LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
419 if (cfs_time_after(cfs_time_current(), next)) {
420 next = cfs_time_shift(14400);
421 libcfs_debug_dumpstack(NULL);
426 ret = __ldlm_add_waiting_lock(lock, timeout);
428 /* grab ref on the lock if it has been added to the
431 spin_unlock_bh(&waiting_locks_spinlock);
433 LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
434 ret == 0 ? "not re-" : "", timeout,
435 AT_OFF ? "off" : "on");
440 * Remove a lock from the pending list, likely because it had its cancellation
441 * callback arrive without incident. This adjusts the lock-timeout timer if
442 * needed. Returns 0 if the lock wasn't pending after all, 1 if it was.
443 * As done by ldlm_del_waiting_lock(), the caller must release the lock
444 * reference when the lock is removed from any list (1 is returned).
446 * Called with namespace lock held.
448 static int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
450 struct list_head *list_next;
452 if (list_empty(&lock->l_pending_chain))
455 list_next = lock->l_pending_chain.next;
456 if (lock->l_pending_chain.prev == &waiting_locks_list) {
457 /* Removing the head of the list, adjust timer. */
458 if (list_next == &waiting_locks_list) {
459 /* No more, just cancel. */
460 cfs_timer_disarm(&waiting_locks_timer);
462 struct ldlm_lock *next;
463 next = list_entry(list_next, struct ldlm_lock,
465 cfs_timer_arm(&waiting_locks_timer,
466 round_timeout(next->l_callback_timeout));
469 list_del_init(&lock->l_pending_chain);
474 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
478 if (lock->l_export == NULL) {
479 /* We don't have a "waiting locks list" on clients. */
480 CDEBUG(D_DLMTRACE, "Client lock %p : no-op\n", lock);
484 spin_lock_bh(&waiting_locks_spinlock);
485 ret = __ldlm_del_waiting_lock(lock);
486 spin_unlock_bh(&waiting_locks_spinlock);
488 /* release lock ref if it has indeed been removed
498 * Called with namespace lock held.
500 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
502 if (lock->l_export == NULL) {
503 /* We don't have a "waiting locks list" on clients. */
504 LDLM_DEBUG(lock, "client lock: no-op");
508 spin_lock_bh(&waiting_locks_spinlock);
510 if (list_empty(&lock->l_pending_chain)) {
511 spin_unlock_bh(&waiting_locks_spinlock);
512 LDLM_DEBUG(lock, "wasn't waiting");
516 /* we remove/add the lock to the waiting list, so no needs to
517 * release/take a lock reference */
518 __ldlm_del_waiting_lock(lock);
519 __ldlm_add_waiting_lock(lock, timeout);
520 spin_unlock_bh(&waiting_locks_spinlock);
522 LDLM_DEBUG(lock, "refreshed");
525 #else /* !__KERNEL__ */
527 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
529 LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
533 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
538 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
542 #endif /* __KERNEL__ */
544 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
545 const char *ast_type)
547 struct ptlrpc_connection *conn = lock->l_export->exp_connection;
548 char *str = libcfs_nid2str(conn->c_peer.nid);
550 LCONSOLE_ERROR_MSG(0x138, "%s: A client on nid %s was evicted due "
551 "to a lock %s callback to %s timed out: rc %d\n",
552 lock->l_export->exp_obd->obd_name, str,
553 ast_type, obd_export_nid2str(lock->l_export), rc);
555 if (obd_dump_on_timeout)
556 libcfs_debug_dumplog();
558 spin_lock_bh(&waiting_locks_spinlock);
559 if (__ldlm_del_waiting_lock(lock) == 0)
560 /* the lock was not in any list, grab an extra ref before adding
561 * the lock to the expired list */
563 list_add(&lock->l_pending_chain, &expired_lock_thread.elt_expired_locks);
564 cfs_waitq_signal(&expired_lock_thread.elt_waitq);
565 spin_unlock_bh(&waiting_locks_spinlock);
567 class_fail_export(lock->l_export);
571 static int ldlm_handle_ast_error(struct ldlm_lock *lock,
572 struct ptlrpc_request *req, int rc,
573 const char *ast_type)
575 lnet_process_id_t peer = req->rq_import->imp_connection->c_peer;
577 if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
578 LASSERT(lock->l_export);
579 if (lock->l_export->exp_libclient) {
580 LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
581 " timeout, just cancelling lock", ast_type,
582 libcfs_nid2str(peer.nid));
583 ldlm_lock_cancel(lock);
585 } else if (lock->l_flags & LDLM_FL_CANCEL) {
586 LDLM_DEBUG(lock, "%s AST timeout from nid %s, but "
587 "cancel was received (AST reply lost?)",
588 ast_type, libcfs_nid2str(peer.nid));
589 ldlm_lock_cancel(lock);
592 ldlm_del_waiting_lock(lock);
593 ldlm_failed_ast(lock, rc, ast_type);
597 LDLM_DEBUG(lock, "client (nid %s) returned %d"
598 " from %s AST - normal race",
599 libcfs_nid2str(peer.nid),
600 lustre_msg_get_status(req->rq_repmsg),
603 LDLM_ERROR(lock, "client (nid %s) returned %d "
604 "from %s AST", libcfs_nid2str(peer.nid),
605 (req->rq_repmsg != NULL) ?
606 lustre_msg_get_status(req->rq_repmsg) : 0,
608 ldlm_lock_cancel(lock);
609 /* Server-side AST functions are called from ldlm_reprocess_all,
610 * which needs to be told to please restart its reprocessing. */
617 static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc)
619 struct ldlm_cb_set_arg *arg;
620 struct ldlm_lock *lock;
623 LASSERT(data != NULL);
625 arg = req->rq_async_args.pointer_arg[0];
626 lock = req->rq_async_args.pointer_arg[1];
627 LASSERT(lock != NULL);
629 /* If client canceled the lock but the cancel has not
630 * been recieved yet, we need to update lvbo to have the
631 * proper attributes cached. */
632 if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
633 ldlm_res_lvbo_update(lock->l_resource, NULL,
635 rc = ldlm_handle_ast_error(lock, req, rc,
636 arg->type == LDLM_BL_CALLBACK
637 ? "blocking" : "completion");
643 atomic_set(&arg->restart, 1);
648 static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
649 struct ldlm_cb_set_arg *arg,
650 struct ldlm_lock *lock,
656 if (unlikely(instant_cancel)) {
657 rc = ptl_send_rpc(req, 1);
658 ptlrpc_req_finished(req);
660 /* If we cancelled the lock, we need to restart
661 * ldlm_reprocess_queue */
662 atomic_set(&arg->restart, 1);
665 ptlrpc_set_add_req(arg->set, req);
672 * Check if there are requests in the export request list which prevent
673 * the lock canceling and make these requests high priority ones.
675 static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
677 struct ptlrpc_request *req;
680 if (lock->l_export == NULL) {
681 LDLM_DEBUG(lock, "client lock: no-op");
685 spin_lock(&lock->l_export->exp_lock);
686 list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
687 if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
688 req->rq_ops->hpreq_lock_match(req, lock))
689 ptlrpc_hpreq_reorder(req);
691 spin_unlock(&lock->l_export->exp_lock);
696 * ->l_blocking_ast() method for server-side locks. This is invoked when newly
697 * enqueued server lock conflicts with given one.
699 * Sends blocking ast rpc to the client owning that lock; arms timeout timer
700 * to wait for client response.
702 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
703 struct ldlm_lock_desc *desc,
704 void *data, int flag)
706 struct ldlm_cb_set_arg *arg = data;
707 struct ldlm_request *body;
708 struct ptlrpc_request *req;
709 __u32 size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
710 [DLM_LOCKREQ_OFF] = sizeof(*body) };
711 int instant_cancel = 0, rc;
714 if (flag == LDLM_CB_CANCELING) {
715 /* Don't need to do anything here. */
720 LASSERT(data != NULL);
722 ldlm_lock_reorder_req(lock);
724 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
725 LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 2, size,
730 req->rq_async_args.pointer_arg[0] = arg;
731 req->rq_async_args.pointer_arg[1] = lock;
732 req->rq_interpret_reply = ldlm_cb_interpret;
733 req->rq_no_resend = 1;
735 lock_res(lock->l_resource);
736 if (lock->l_granted_mode != lock->l_req_mode) {
737 /* this blocking AST will be communicated as part of the
738 * completion AST instead */
739 unlock_res(lock->l_resource);
740 ptlrpc_req_finished(req);
741 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
745 if (lock->l_destroyed) {
746 /* What's the point? */
747 unlock_res(lock->l_resource);
748 ptlrpc_req_finished(req);
753 if (CURRENT_SECONDS - lock->l_export->exp_last_request_time > 30){
754 unlock_res(lock->l_resource);
755 ptlrpc_req_finished(req);
756 ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
761 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
764 body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
765 body->lock_handle[0] = lock->l_remote_handle;
766 body->lock_desc = *desc;
767 body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
769 LDLM_DEBUG(lock, "server preparing blocking AST");
771 lock->l_last_activity = cfs_time_current_sec();
773 ptlrpc_req_set_repsize(req, 1, NULL);
774 if (instant_cancel) {
775 unlock_res(lock->l_resource);
776 ldlm_lock_cancel(lock);
778 LASSERT(lock->l_granted_mode == lock->l_req_mode);
779 ldlm_add_waiting_lock(lock);
780 unlock_res(lock->l_resource);
783 req->rq_send_state = LUSTRE_IMP_FULL;
784 /* ptlrpc_prep_req already set timeout */
786 req->rq_timeout = ldlm_get_rq_timeout();
788 if (lock->l_export && lock->l_export->exp_nid_stats &&
789 lock->l_export->exp_nid_stats->nid_ldlm_stats) {
790 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
791 LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
794 rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
799 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
801 struct ldlm_cb_set_arg *arg = data;
802 struct ldlm_request *body;
803 struct ptlrpc_request *req;
804 long total_enqueue_wait;
805 __u32 size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
806 [DLM_LOCKREQ_OFF] = sizeof(*body) };
807 int rc, buffers = 2, instant_cancel = 0;
810 LASSERT(lock != NULL);
811 LASSERT(data != NULL);
813 total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(),
814 lock->l_last_activity);
816 lock_res_and_lock(lock);
817 if (lock->l_resource->lr_lvb_len) {
818 size[DLM_REQ_REC_OFF] = lock->l_resource->lr_lvb_len;
821 unlock_res_and_lock(lock);
823 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
824 LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK, buffers,
829 req->rq_async_args.pointer_arg[0] = arg;
830 req->rq_async_args.pointer_arg[1] = lock;
831 req->rq_interpret_reply = ldlm_cb_interpret;
832 req->rq_no_resend = 1;
834 body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
835 body->lock_handle[0] = lock->l_remote_handle;
836 body->lock_flags = flags;
837 ldlm_lock2desc(lock, &body->lock_desc);
842 lvb = lustre_msg_buf(req->rq_reqmsg, DLM_REQ_REC_OFF,
843 lock->l_resource->lr_lvb_len);
844 lock_res_and_lock(lock);
845 memcpy(lvb, lock->l_resource->lr_lvb_data,
846 lock->l_resource->lr_lvb_len);
847 unlock_res_and_lock(lock);
850 LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
853 /* Server-side enqueue wait time estimate, used in
854 __ldlm_add_waiting_lock to set future enqueue timers */
855 if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
856 at_measured(&lock->l_resource->lr_namespace->ns_at_estimate,
859 /* bz18618. Don't add lock enqueue time we spend waiting for a
860 previous callback to fail. Locks waiting legitimately will
861 get extended by ldlm_refresh_waiting_lock regardless of the
862 estimate, so it's okay to underestimate here. */
863 LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. "
864 "It is likely that a previous callback timed out.",
866 at_get(&lock->l_resource->lr_namespace->ns_at_estimate));
868 ptlrpc_req_set_repsize(req, 1, NULL);
870 req->rq_send_state = LUSTRE_IMP_FULL;
871 /* ptlrpc_prep_req already set timeout */
873 req->rq_timeout = ldlm_get_rq_timeout();
875 /* We only send real blocking ASTs after the lock is granted */
876 lock_res_and_lock(lock);
877 if (lock->l_flags & LDLM_FL_AST_SENT) {
878 body->lock_flags |= LDLM_FL_AST_SENT;
879 /* copy ast flags like LDLM_FL_DISCARD_DATA */
880 body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
882 /* We might get here prior to ldlm_handle_enqueue setting
883 * LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
884 * into waiting list, but this is safe and similar code in
885 * ldlm_handle_enqueue will call ldlm_lock_cancel() still,
886 * that would not only cancel the lock, but will also remove
887 * it from waiting list */
888 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
889 unlock_res_and_lock(lock);
890 ldlm_lock_cancel(lock);
892 lock_res_and_lock(lock);
894 ldlm_add_waiting_lock(lock); /* start the lock-timeout
898 unlock_res_and_lock(lock);
900 if (lock->l_export && lock->l_export->exp_nid_stats &&
901 lock->l_export->exp_nid_stats->nid_ldlm_stats) {
902 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
903 LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
906 rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
911 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
913 struct ldlm_resource *res = lock->l_resource;
914 struct ldlm_request *body;
915 struct ptlrpc_request *req;
916 __u32 size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
917 [DLM_LOCKREQ_OFF] = sizeof(*body) };
921 LASSERT(lock != NULL && lock->l_export != NULL);
923 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
924 LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK, 2, size,
929 body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
930 body->lock_handle[0] = lock->l_remote_handle;
931 ldlm_lock2desc(lock, &body->lock_desc);
933 lock_res_and_lock(lock);
934 size[REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
935 unlock_res_and_lock(lock);
936 res = lock->l_resource;
937 ptlrpc_req_set_repsize(req, 2, size);
939 req->rq_send_state = LUSTRE_IMP_FULL;
940 /* ptlrpc_prep_req already set timeout */
942 req->rq_timeout = ldlm_get_rq_timeout();
944 if (lock->l_export && lock->l_export->exp_nid_stats &&
945 lock->l_export->exp_nid_stats->nid_ldlm_stats) {
946 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
947 LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
950 rc = ptlrpc_queue_wait(req);
951 if (rc == -ELDLM_NO_LOCK_DATA)
952 LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
954 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
956 rc = ldlm_res_lvbo_update(res, req,
958 ptlrpc_req_finished(req);
960 ldlm_reprocess_all(res);
965 static void ldlm_svc_get_eopc(struct ldlm_request *dlm_req,
966 struct lprocfs_stats *srv_stats)
968 int lock_type = 0, op = 0;
970 lock_type = dlm_req->lock_desc.l_resource.lr_type;
974 op = PTLRPC_LAST_CNTR + LDLM_PLAIN_ENQUEUE;
977 if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT)
978 op = PTLRPC_LAST_CNTR + LDLM_GLIMPSE_ENQUEUE;
980 op = PTLRPC_LAST_CNTR + LDLM_EXTENT_ENQUEUE;
983 op = PTLRPC_LAST_CNTR + LDLM_FLOCK_ENQUEUE;
986 op = PTLRPC_LAST_CNTR + LDLM_IBITS_ENQUEUE;
994 lprocfs_counter_incr(srv_stats, op);
1000 * Main server-side entry point into LDLM. This is called by ptlrpc service
1001 * threads to carry out client lock enqueueing requests.
1003 int ldlm_handle_enqueue(struct ptlrpc_request *req,
1004 ldlm_completion_callback completion_callback,
1005 ldlm_blocking_callback blocking_callback,
1006 ldlm_glimpse_callback glimpse_callback)
1008 struct obd_device *obddev = req->rq_export->exp_obd;
1009 struct ldlm_reply *dlm_rep;
1010 struct ldlm_request *dlm_req;
1011 __u32 size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
1012 [DLM_LOCKREPLY_OFF] = sizeof(*dlm_rep) };
1015 ldlm_error_t err = ELDLM_OK;
1016 struct ldlm_lock *lock = NULL;
1017 void *cookie = NULL;
1020 LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
1022 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1023 lustre_swab_ldlm_request);
1024 if (dlm_req == NULL) {
1025 CERROR ("Can't unpack dlm_req\n");
1026 GOTO(out, rc = -EFAULT);
1029 ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF);
1030 flags = dlm_req->lock_flags;
1032 LASSERT(req->rq_export);
1034 if (req->rq_rqbd->rqbd_service->srv_stats)
1035 ldlm_svc_get_eopc(dlm_req,
1036 req->rq_rqbd->rqbd_service->srv_stats);
1038 if (req->rq_export && req->rq_export->exp_nid_stats &&
1039 req->rq_export->exp_nid_stats->nid_ldlm_stats) {
1040 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1041 LDLM_ENQUEUE - LDLM_FIRST_OPC);
1044 if (dlm_req->lock_desc.l_resource.lr_type < LDLM_MIN_TYPE ||
1045 dlm_req->lock_desc.l_resource.lr_type >= LDLM_MAX_TYPE) {
1046 DEBUG_REQ(D_ERROR, req, "invalid lock request type %d",
1047 dlm_req->lock_desc.l_resource.lr_type);
1048 GOTO(out, rc = -EFAULT);
1051 if (dlm_req->lock_desc.l_req_mode <= LCK_MINMODE ||
1052 dlm_req->lock_desc.l_req_mode >= LCK_MAXMODE ||
1053 dlm_req->lock_desc.l_req_mode & (dlm_req->lock_desc.l_req_mode-1)) {
1054 DEBUG_REQ(D_ERROR, req, "invalid lock request mode %d",
1055 dlm_req->lock_desc.l_req_mode);
1056 GOTO(out, rc = -EFAULT);
1059 if (req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) {
1060 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN) {
1061 DEBUG_REQ(D_ERROR, req,
1062 "PLAIN lock request from IBITS client?");
1063 GOTO(out, rc = -EPROTO);
1065 } else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) {
1066 DEBUG_REQ(D_ERROR, req,
1067 "IBITS lock request from unaware client?");
1068 GOTO(out, rc = -EPROTO);
1072 /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check
1073 against server's _CONNECT_SUPPORTED flags? (I don't want to use
1074 ibits for mgc/mgs) */
1076 /* INODEBITS_INTEROP: Perform conversion from plain lock to
1077 * inodebits lock if client does not support them. */
1078 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) &&
1079 (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) {
1080 dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS;
1081 dlm_req->lock_desc.l_policy_data.l_inodebits.bits =
1082 MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1083 if (dlm_req->lock_desc.l_req_mode == LCK_PR)
1084 dlm_req->lock_desc.l_req_mode = LCK_CR;
1088 if (flags & LDLM_FL_REPLAY) {
1089 /* Find an existing lock in the per-export lock hash */
1090 lock = lustre_hash_lookup(req->rq_export->exp_lock_hash,
1091 (void *)&dlm_req->lock_handle[0]);
1093 DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
1094 LPX64, lock->l_handle.h_cookie);
1095 GOTO(existing_lock, rc = 0);
1099 /* The lock's callback data might be set in the policy function */
1100 lock = ldlm_lock_create(obddev->obd_namespace,
1101 dlm_req->lock_desc.l_resource.lr_name,
1102 dlm_req->lock_desc.l_resource.lr_type,
1103 dlm_req->lock_desc.l_req_mode,
1104 blocking_callback, completion_callback,
1105 glimpse_callback, NULL, 0);
1107 GOTO(out, rc = -ENOMEM);
1109 lock->l_last_activity = cfs_time_current_sec();
1110 lock->l_remote_handle = dlm_req->lock_handle[0];
1111 LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
1113 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
1114 /* Don't enqueue a lock onto the export if it has already
1115 * been evicted. Cancel it now instead. (bug 3822) */
1116 if (req->rq_export->exp_failed) {
1117 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
1118 GOTO(out, rc = -ENOTCONN);
1120 lock->l_export = class_export_get(req->rq_export);
1122 if (lock->l_export->exp_lock_hash)
1123 lustre_hash_add(lock->l_export->exp_lock_hash,
1124 &lock->l_remote_handle, &lock->l_exp_hash);
1128 if (flags & LDLM_FL_HAS_INTENT) {
1129 /* In this case, the reply buffer is allocated deep in
1130 * local_lock_enqueue by the policy function. */
1135 lock_res_and_lock(lock);
1136 if (lock->l_resource->lr_lvb_len) {
1137 size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
1140 unlock_res_and_lock(lock);
1142 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
1143 GOTO(out, rc = -ENOMEM);
1145 rc = lustre_pack_reply(req, buffers, size, NULL);
1150 if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
1151 lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
1152 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
1153 lock->l_req_extent = lock->l_policy_data.l_extent;
1155 err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, (int *)&flags);
1159 dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
1161 dlm_rep->lock_flags = flags;
1163 ldlm_lock2desc(lock, &dlm_rep->lock_desc);
1164 ldlm_lock2handle(lock, &dlm_rep->lock_handle);
1166 /* We never send a blocking AST until the lock is granted, but
1167 * we can tell it right now */
1168 lock_res_and_lock(lock);
1170 /* Now take into account flags to be inherited from original lock
1171 request both in reply to client and in our own lock flags. */
1172 dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
1173 lock->l_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
1175 /* Don't move a pending lock onto the export if it has already
1176 * been evicted. Cancel it now instead. (bug 5683) */
1177 if (req->rq_export->exp_failed ||
1178 OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT)) {
1179 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
1181 } else if (lock->l_flags & LDLM_FL_AST_SENT) {
1182 dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
1183 if (lock->l_granted_mode == lock->l_req_mode) {
1184 /* Only cancel lock if it was granted, because it
1185 * would be destroyed immediatelly and would never
1186 * be granted in the future, causing timeouts on client.
1187 * Not granted lock will be cancelled immediatelly after
1188 * sending completion AST.
1190 if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) {
1191 unlock_res_and_lock(lock);
1192 ldlm_lock_cancel(lock);
1193 lock_res_and_lock(lock);
1195 ldlm_add_waiting_lock(lock);
1198 /* Make sure we never ever grant usual metadata locks to liblustre
1200 if ((dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN ||
1201 dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) &&
1202 req->rq_export->exp_libclient) {
1203 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) ||
1204 !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
1205 CERROR("Granting sync lock to libclient. "
1206 "req fl %d, rep fl %d, lock fl "LPX64"\n",
1207 dlm_req->lock_flags, dlm_rep->lock_flags,
1209 LDLM_ERROR(lock, "sync lock");
1210 if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) {
1211 struct ldlm_intent *it;
1212 it = lustre_msg_buf(req->rq_reqmsg,
1216 CERROR("This is intent %s ("LPU64")\n",
1217 ldlm_it2str(it->opc), it->opc);
1223 unlock_res_and_lock(lock);
1227 req->rq_status = rc ?: err; /* return either error - bug 11190 */
1228 if (!req->rq_packed_final) {
1229 err = lustre_pack_reply(req, 1, NULL, NULL);
1234 /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
1235 * ldlm_reprocess_all. If this moves, revisit that code. -phil */
1237 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
1238 "(err=%d, rc=%d)", err, rc);
1240 if (rc == 0 && obddev->obd_fail)
1244 lock_res_and_lock(lock);
1245 size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
1246 if (size[DLM_REPLY_REC_OFF] > 0) {
1247 void *lvb = lustre_msg_buf(req->rq_repmsg,
1249 size[DLM_REPLY_REC_OFF]);
1250 LASSERTF(lvb != NULL, "req %p, lock %p\n",
1253 memcpy(lvb, lock->l_resource->lr_lvb_data,
1254 size[DLM_REPLY_REC_OFF]);
1256 unlock_res_and_lock(lock);
1258 lock_res_and_lock(lock);
1259 ldlm_resource_unlink_lock(lock);
1260 ldlm_lock_destroy_nolock(lock);
1261 unlock_res_and_lock(lock);
1264 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
1265 ldlm_reprocess_all(lock->l_resource);
1267 LDLM_LOCK_PUT(lock);
1270 LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
1276 int ldlm_handle_convert(struct ptlrpc_request *req)
1278 struct ldlm_request *dlm_req;
1279 struct ldlm_reply *dlm_rep;
1280 struct ldlm_lock *lock;
1282 __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
1283 [DLM_LOCKREPLY_OFF] = sizeof(*dlm_rep) };
1286 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1287 lustre_swab_ldlm_request);
1288 if (dlm_req == NULL) {
1289 CERROR ("Can't unpack dlm_req\n");
1293 if (req->rq_export && req->rq_export->exp_nid_stats &&
1294 req->rq_export->exp_nid_stats->nid_ldlm_stats) {
1295 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1296 LDLM_CONVERT - LDLM_FIRST_OPC);
1299 rc = lustre_pack_reply(req, 2, size, NULL);
1303 dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
1305 dlm_rep->lock_flags = dlm_req->lock_flags;
1307 lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
1309 req->rq_status = EINVAL;
1313 LDLM_DEBUG(lock, "server-side convert handler START");
1315 lock->l_last_activity = cfs_time_current_sec();
1316 res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
1317 &dlm_rep->lock_flags);
1319 if (ldlm_del_waiting_lock(lock))
1320 LDLM_DEBUG(lock, "converted waiting lock");
1323 req->rq_status = EDEADLOCK;
1328 if (!req->rq_status)
1329 ldlm_reprocess_all(lock->l_resource);
1330 LDLM_DEBUG(lock, "server-side convert handler END");
1331 LDLM_LOCK_PUT(lock);
1333 LDLM_DEBUG_NOLOCK("server-side convert handler END");
1338 /* Cancel all the locks whos handles are packed into ldlm_request */
1339 int ldlm_request_cancel(struct ptlrpc_request *req,
1340 struct ldlm_request *dlm_req, int first)
1342 struct ldlm_resource *res, *pres = NULL;
1343 struct ldlm_lock *lock;
1344 int i, count, done = 0;
1347 count = dlm_req->lock_count ? dlm_req->lock_count : 1;
1351 /* There is no lock on the server at the replay time,
1352 * skip lock cancelling to make replay tests to pass. */
1353 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1356 for (i = first; i < count; i++) {
1357 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
1359 LDLM_DEBUG_NOLOCK("server-side cancel handler stale "
1360 "lock (cookie "LPU64")",
1361 dlm_req->lock_handle[i].cookie);
1366 res = lock->l_resource;
1369 ldlm_reprocess_all(pres);
1370 ldlm_resource_putref(pres);
1373 ldlm_resource_getref(res);
1374 ldlm_res_lvbo_update(res, NULL, 0, 1);
1378 ldlm_lock_cancel(lock);
1379 LDLM_LOCK_PUT(lock);
1382 ldlm_reprocess_all(pres);
1383 ldlm_resource_putref(pres);
1388 int ldlm_handle_cancel(struct ptlrpc_request *req)
1390 struct ldlm_request *dlm_req;
1394 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1395 lustre_swab_ldlm_request);
1396 if (dlm_req == NULL) {
1397 CERROR("bad request buffer for cancel\n");
1401 if (req->rq_export && req->rq_export->exp_nid_stats &&
1402 req->rq_export->exp_nid_stats->nid_ldlm_stats) {
1403 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1404 LDLM_CANCEL - LDLM_FIRST_OPC);
1407 rc = lustre_pack_reply(req, 1, NULL, NULL);
1411 if (!ldlm_request_cancel(req, dlm_req, 0))
1412 req->rq_status = ESTALE;
1414 if (ptlrpc_reply(req) != 0)
1420 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
1421 struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
1426 LDLM_DEBUG(lock, "client blocking AST callback handler");
1428 lock_res_and_lock(lock);
1429 lock->l_flags |= LDLM_FL_CBPENDING;
1431 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
1432 lock->l_flags |= LDLM_FL_CANCEL;
1434 do_ast = (!lock->l_readers && !lock->l_writers);
1435 unlock_res_and_lock(lock);
1438 CDEBUG(D_DLMTRACE, "Lock %p is already unused, calling callback (%p)\n",
1439 lock, lock->l_blocking_ast);
1440 if (lock->l_blocking_ast != NULL)
1441 lock->l_blocking_ast(lock, ld, lock->l_ast_data,
1444 CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
1448 LDLM_LOCK_PUT(lock);
1452 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
1453 struct ldlm_namespace *ns,
1454 struct ldlm_request *dlm_req,
1455 struct ldlm_lock *lock)
1457 CFS_LIST_HEAD(ast_list);
1460 LDLM_DEBUG(lock, "client completion callback handler START");
1462 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
1463 int to = cfs_time_seconds(1);
1465 to = schedule_timeout(to);
1466 if (lock->l_granted_mode == lock->l_req_mode ||
1472 lock_res_and_lock(lock);
1473 if (lock->l_destroyed ||
1474 lock->l_granted_mode == lock->l_req_mode) {
1475 /* bug 11300: the lock has already been granted */
1476 unlock_res_and_lock(lock);
1477 LDLM_DEBUG(lock, "Double grant race happened");
1478 LDLM_LOCK_PUT(lock);
1483 /* If we receive the completion AST before the actual enqueue returned,
1484 * then we might need to switch lock modes, resources, or extents. */
1485 if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
1486 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
1487 LDLM_DEBUG(lock, "completion AST, new lock mode");
1490 if (lock->l_resource->lr_type != LDLM_PLAIN) {
1491 lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
1492 LDLM_DEBUG(lock, "completion AST, new policy data");
1495 ldlm_resource_unlink_lock(lock);
1496 if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
1497 &lock->l_resource->lr_name,
1498 sizeof(lock->l_resource->lr_name)) != 0) {
1499 unlock_res_and_lock(lock);
1500 if (ldlm_lock_change_resource(ns, lock,
1501 dlm_req->lock_desc.l_resource.lr_name)) {
1502 LDLM_ERROR(lock, "Failed to allocate resource");
1503 LDLM_LOCK_PUT(lock);
1507 LDLM_DEBUG(lock, "completion AST, new resource");
1508 CERROR("change resource!\n");
1509 lock_res_and_lock(lock);
1512 if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
1513 /* BL_AST locks are not needed in lru.
1514 * let ldlm_cancel_lru() be fast. */
1515 ldlm_lock_remove_from_lru(lock);
1516 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
1517 LDLM_DEBUG(lock, "completion AST includes blocking AST");
1520 if (lock->l_lvb_len) {
1522 lvb = lustre_swab_reqbuf(req, DLM_REQ_REC_OFF, lock->l_lvb_len,
1523 lock->l_lvb_swabber);
1525 LDLM_ERROR(lock, "completion AST did not contain "
1528 memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
1532 ldlm_grant_lock(lock, &ast_list);
1533 unlock_res_and_lock(lock);
1535 LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
1537 ldlm_run_cp_ast_work(&ast_list);
1539 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
1541 LDLM_LOCK_PUT(lock);
1545 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
1546 struct ldlm_namespace *ns,
1547 struct ldlm_request *dlm_req,
1548 struct ldlm_lock *lock)
1553 LDLM_DEBUG(lock, "client glimpse AST callback handler");
1555 if (lock->l_glimpse_ast != NULL)
1556 rc = lock->l_glimpse_ast(lock, req);
1558 if (req->rq_repmsg != NULL) {
1561 req->rq_status = rc;
1565 lock_res_and_lock(lock);
1566 if (lock->l_granted_mode == LCK_PW &&
1567 !lock->l_readers && !lock->l_writers &&
1568 cfs_time_after(cfs_time_current(),
1569 cfs_time_add(lock->l_last_used,
1570 cfs_time_seconds(10)))) {
1571 unlock_res_and_lock(lock);
1572 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
1573 ldlm_handle_bl_callback(ns, NULL, lock);
1578 unlock_res_and_lock(lock);
1579 LDLM_LOCK_PUT(lock);
1583 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
1585 req->rq_status = rc;
1586 if (!req->rq_packed_final) {
1587 rc = lustre_pack_reply(req, 1, NULL, NULL);
1591 return ptlrpc_reply(req);
1595 static int __ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_bl_work_item *blwi,
1596 struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
1597 struct list_head *cancels, int count, int mode)
1599 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1602 if (cancels && count == 0) {
1603 if (mode == LDLM_ASYNC)
1604 OBD_FREE(blwi, sizeof(*blwi));
1608 init_completion(&blwi->blwi_comp);
1609 atomic_set(&blwi->blwi_ref_count, 1);
1613 blwi->blwi_ld = *ld;
1615 list_add(&blwi->blwi_head, cancels);
1616 list_del_init(cancels);
1617 blwi->blwi_count = count;
1619 blwi->blwi_lock = lock;
1622 spin_lock(&blp->blp_lock);
1623 if (lock && lock->l_flags & LDLM_FL_DISCARD_DATA) {
1624 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
1625 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
1627 /* other blocking callbacks are added to the regular list */
1628 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
1630 spin_unlock(&blp->blp_lock);
1632 if (mode == LDLM_SYNC) {
1633 /* keep ref count as object is on this stack for SYNC call */
1634 ldlm_bl_work_item_get(blwi);
1635 cfs_waitq_signal(&blp->blp_waitq);
1636 wait_for_completion(&blwi->blwi_comp);
1638 cfs_waitq_signal(&blp->blp_waitq);
1644 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
1645 struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
1646 struct list_head *cancels, int count, int mode)
1650 if (mode == LDLM_SYNC) {
1651 /* if it is synchronous call do minimum mem alloc, as it could
1652 * be triggered from kernel shrinker
1654 struct ldlm_bl_work_item blwi;
1655 memset(&blwi, 0, sizeof(blwi));
1656 /* have extra ref as this obj is on stack */
1657 RETURN(__ldlm_bl_to_thread(ns, &blwi, ld, lock, cancels, count, mode));
1659 struct ldlm_bl_work_item *blwi;
1660 OBD_ALLOC(blwi, sizeof(*blwi));
1664 RETURN(__ldlm_bl_to_thread(ns, blwi, ld, lock, cancels, count, mode));
1669 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
1670 struct ldlm_lock *lock)
1673 RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
1679 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
1680 struct list_head *cancels, int count, int mode)
1683 RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
1689 static int ldlm_callback_handler(struct ptlrpc_request *req)
1691 struct ldlm_namespace *ns;
1692 struct ldlm_request *dlm_req;
1693 struct ldlm_lock *lock;
1697 /* Requests arrive in sender's byte order. The ptlrpc service
1698 * handler has already checked and, if necessary, byte-swapped the
1699 * incoming request message body, but I am responsible for the
1700 * message buffers. */
1702 if (req->rq_export == NULL) {
1703 ldlm_callback_reply(req, -ENOTCONN);
1707 LASSERT(req->rq_export != NULL);
1708 LASSERT(req->rq_export->exp_obd != NULL);
1710 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1711 case LDLM_BL_CALLBACK:
1712 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1714 case LDLM_CP_CALLBACK:
1715 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
1717 case LDLM_GL_CALLBACK:
1718 OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
1720 case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
1721 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1722 rc = llog_origin_handle_cancel(req);
1723 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_REP, 0);
1724 ldlm_callback_reply(req, rc);
1726 case OBD_QC_CALLBACK:
1727 OBD_FAIL_RETURN(OBD_FAIL_OBD_QC_CALLBACK_NET, 0);
1728 rc = target_handle_qc_callback(req);
1729 ldlm_callback_reply(req, rc);
1733 /* reply in handler */
1734 rc = target_handle_dqacq_callback(req);
1736 case LLOG_ORIGIN_HANDLE_CREATE:
1737 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1738 rc = llog_origin_handle_create(req);
1739 ldlm_callback_reply(req, rc);
1741 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1742 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1743 rc = llog_origin_handle_next_block(req);
1744 ldlm_callback_reply(req, rc);
1746 case LLOG_ORIGIN_HANDLE_READ_HEADER:
1747 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1748 rc = llog_origin_handle_read_header(req);
1749 ldlm_callback_reply(req, rc);
1751 case LLOG_ORIGIN_HANDLE_CLOSE:
1752 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1753 rc = llog_origin_handle_close(req);
1754 ldlm_callback_reply(req, rc);
1757 CERROR("unknown opcode %u\n",
1758 lustre_msg_get_opc(req->rq_reqmsg));
1759 ldlm_callback_reply(req, -EPROTO);
1763 ns = req->rq_export->exp_obd->obd_namespace;
1764 LASSERT(ns != NULL);
1766 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1767 lustre_swab_ldlm_request);
1768 if (dlm_req == NULL) {
1769 CERROR ("can't unpack dlm_req\n");
1770 ldlm_callback_reply(req, -EPROTO);
1774 /* Force a known safe race, send a cancel to the server for a lock
1775 * which the server has already started a blocking callback on. */
1776 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
1777 lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
1778 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0]);
1780 CERROR("ldlm_cli_cancel: %d\n", rc);
1783 lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle[0]);
1785 CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
1786 "disappeared\n", dlm_req->lock_handle[0].cookie);
1787 ldlm_callback_reply(req, -EINVAL);
1791 if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
1792 lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
1793 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
1795 /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
1796 lock_res_and_lock(lock);
1797 lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
1798 if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
1799 /* If somebody cancels lock and cache is already droped,
1800 * or lock is failed before cp_ast received on client,
1801 * we can tell the server we have no lock. Otherwise, we
1802 * should send cancel after dropping the cache. */
1803 if (((lock->l_flags & LDLM_FL_CANCELING) &&
1804 (lock->l_flags & LDLM_FL_BL_DONE)) ||
1805 (lock->l_flags & LDLM_FL_FAILED)) {
1806 LDLM_DEBUG(lock, "callback on lock "
1807 LPX64" - lock disappeared\n",
1808 dlm_req->lock_handle[0].cookie);
1809 unlock_res_and_lock(lock);
1810 LDLM_LOCK_PUT(lock);
1811 ldlm_callback_reply(req, -EINVAL);
1814 /* BL_AST locks are not needed in lru.
1815 * let ldlm_cancel_lru() be fast. */
1816 ldlm_lock_remove_from_lru(lock);
1817 lock->l_flags |= LDLM_FL_BL_AST;
1819 unlock_res_and_lock(lock);
1821 /* We want the ost thread to get this reply so that it can respond
1822 * to ost requests (write cache writeback) that might be triggered
1825 * But we'd also like to be able to indicate in the reply that we're
1826 * cancelling right now, because it's unused, or have an intent result
1827 * in the reply, so we might have to push the responsibility for sending
1828 * the reply down into the AST handlers, alas. */
1830 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1831 case LDLM_BL_CALLBACK:
1832 CDEBUG(D_INODE, "blocking ast\n");
1833 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
1834 ldlm_callback_reply(req, 0);
1835 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
1836 ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
1838 case LDLM_CP_CALLBACK:
1839 CDEBUG(D_INODE, "completion ast\n");
1840 ldlm_callback_reply(req, 0);
1841 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
1843 case LDLM_GL_CALLBACK:
1844 CDEBUG(D_INODE, "glimpse ast\n");
1845 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
1848 LBUG(); /* checked above */
1854 static int ldlm_cancel_handler(struct ptlrpc_request *req)
1859 /* Requests arrive in sender's byte order. The ptlrpc service
1860 * handler has already checked and, if necessary, byte-swapped the
1861 * incoming request message body, but I am responsible for the
1862 * message buffers. */
1864 if (req->rq_export == NULL) {
1865 struct ldlm_request *dlm_req;
1867 CERROR("operation %d from %s with bad export cookie "LPU64"\n",
1868 lustre_msg_get_opc(req->rq_reqmsg),
1869 libcfs_id2str(req->rq_peer),
1870 lustre_msg_get_handle(req->rq_reqmsg)->cookie);
1872 if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
1873 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF,
1875 lustre_swab_ldlm_request);
1876 if (dlm_req != NULL)
1877 ldlm_lock_dump_handle(D_ERROR,
1878 &dlm_req->lock_handle[0]);
1881 ldlm_callback_reply(req, -ENOTCONN);
1885 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1887 /* XXX FIXME move this back to mds/handler.c, bug 249 */
1889 CDEBUG(D_INODE, "cancel\n");
1890 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1891 rc = ldlm_handle_cancel(req);
1895 case OBD_LOG_CANCEL:
1896 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1897 rc = llog_origin_handle_cancel(req);
1898 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_REP, 0);
1899 ldlm_callback_reply(req, rc);
1902 CERROR("invalid opcode %d\n",
1903 lustre_msg_get_opc(req->rq_reqmsg));
1904 ldlm_callback_reply(req, -EINVAL);
1911 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
1913 struct ldlm_bl_work_item *blwi = NULL;
1914 static unsigned int num_bl = 0;
1916 spin_lock(&blp->blp_lock);
1917 /* process a request from the blp_list at least every blp_num_threads */
1918 if (!list_empty(&blp->blp_list) &&
1919 (list_empty(&blp->blp_prio_list) || num_bl == 0))
1920 blwi = list_entry(blp->blp_list.next,
1921 struct ldlm_bl_work_item, blwi_entry);
1923 if (!list_empty(&blp->blp_prio_list))
1924 blwi = list_entry(blp->blp_prio_list.next,
1925 struct ldlm_bl_work_item, blwi_entry);
1928 if (++num_bl >= atomic_read(&blp->blp_num_threads))
1930 list_del(&blwi->blwi_entry);
1932 spin_unlock(&blp->blp_lock);
1937 /* This only contains temporary data until the thread starts */
1938 struct ldlm_bl_thread_data {
1939 char bltd_name[CFS_CURPROC_COMM_MAX];
1940 struct ldlm_bl_pool *bltd_blp;
1941 struct completion bltd_comp;
1945 static int ldlm_bl_thread_main(void *arg);
1947 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
1949 struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
1952 init_completion(&bltd.bltd_comp);
1953 rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
1955 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
1956 atomic_read(&blp->blp_num_threads), rc);
1959 wait_for_completion(&bltd.bltd_comp);
1964 static int ldlm_bl_thread_main(void *arg)
1966 struct ldlm_bl_pool *blp;
1970 struct ldlm_bl_thread_data *bltd = arg;
1972 blp = bltd->bltd_blp;
1974 bltd->bltd_num = atomic_inc_return(&blp->blp_num_threads) - 1;
1975 atomic_inc(&blp->blp_busy_threads);
1977 snprintf(bltd->bltd_name, sizeof(bltd->bltd_name) - 1,
1978 "ldlm_bl_%02d", bltd->bltd_num);
1979 cfs_daemonize(bltd->bltd_name);
1981 complete(&bltd->bltd_comp);
1982 /* cannot use bltd after this, it is only on caller's stack */
1986 struct l_wait_info lwi = { 0 };
1987 struct ldlm_bl_work_item *blwi = NULL;
1989 blwi = ldlm_bl_get_work(blp);
1994 atomic_dec(&blp->blp_busy_threads);
1995 l_wait_event_exclusive(blp->blp_waitq,
1996 (blwi = ldlm_bl_get_work(blp)) != NULL,
1998 busy = atomic_inc_return(&blp->blp_busy_threads);
2000 if (blwi->blwi_ns == NULL)
2001 /* added by ldlm_cleanup() */
2004 /* Not fatal if racy and have a few too many threads */
2005 if (unlikely(busy < blp->blp_max_threads &&
2006 busy >= atomic_read(&blp->blp_num_threads)))
2007 /* discard the return value, we tried */
2008 ldlm_bl_thread_start(blp);
2010 if (blwi->blwi_ns == NULL)
2011 /* added by ldlm_cleanup() */
2015 if (blwi->blwi_count) {
2016 /* The special case when we cancel locks in lru
2017 * asynchronously, we pass the list of locks here.
2018 * Thus locks are marked LDLM_FL_CANCELING, but NOT
2019 * canceled locally yet. */
2020 ldlm_cli_cancel_list_local(&blwi->blwi_head,
2021 blwi->blwi_count, 0);
2022 ldlm_cli_cancel_list(&blwi->blwi_head,
2023 blwi->blwi_count, NULL, 0);
2025 ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
2028 complete(&blwi->blwi_comp);
2029 ldlm_bl_work_item_put(blwi);
2032 atomic_dec(&blp->blp_busy_threads);
2033 atomic_dec(&blp->blp_num_threads);
2034 complete(&blp->blp_comp);
2041 * Export handle<->lock hash operations.
2044 ldlm_export_lock_hash(lustre_hash_t *lh, void *key, unsigned mask)
2046 return lh_u64_hash(((struct lustre_handle *)key)->cookie, mask);
2050 ldlm_export_lock_key(struct hlist_node *hnode)
2052 struct ldlm_lock *lock;
2055 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2056 RETURN(&lock->l_remote_handle);
2060 ldlm_export_lock_compare(void *key, struct hlist_node *hnode)
2063 RETURN(lustre_handle_equal(ldlm_export_lock_key(hnode), key));
2067 ldlm_export_lock_get(struct hlist_node *hnode)
2069 struct ldlm_lock *lock;
2072 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2073 LDLM_LOCK_GET(lock);
2079 ldlm_export_lock_put(struct hlist_node *hnode)
2081 struct ldlm_lock *lock;
2084 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2085 LDLM_LOCK_PUT(lock);
2090 static lustre_hash_ops_t ldlm_export_lock_ops = {
2091 .lh_hash = ldlm_export_lock_hash,
2092 .lh_key = ldlm_export_lock_key,
2093 .lh_compare = ldlm_export_lock_compare,
2094 .lh_get = ldlm_export_lock_get,
2095 .lh_put = ldlm_export_lock_put
2098 int ldlm_init_export(struct obd_export *exp)
2102 exp->exp_lock_hash =
2103 lustre_hash_init(obd_uuid2str(&exp->exp_client_uuid),
2104 7, 16, &ldlm_export_lock_ops, LH_REHASH);
2106 if (!exp->exp_lock_hash)
2111 EXPORT_SYMBOL(ldlm_init_export);
2113 void ldlm_destroy_export(struct obd_export *exp)
2116 lustre_hash_exit(exp->exp_lock_hash);
2117 exp->exp_lock_hash = NULL;
2120 EXPORT_SYMBOL(ldlm_destroy_export);
2122 static int ldlm_setup(void);
2123 static int ldlm_cleanup(void);
2125 int ldlm_get_ref(void)
2129 mutex_down(&ldlm_ref_sem);
2130 if (++ldlm_refcount == 1) {
2135 mutex_up(&ldlm_ref_sem);
2140 void ldlm_put_ref(void)
2143 mutex_down(&ldlm_ref_sem);
2144 if (ldlm_refcount == 1) {
2145 int rc = ldlm_cleanup();
2147 CERROR("ldlm_cleanup failed: %d\n", rc);
2153 mutex_up(&ldlm_ref_sem);
2158 static int ldlm_setup(void)
2160 struct ldlm_bl_pool *blp;
2162 int ldlm_min_threads = LDLM_THREADS_AUTO_MIN;
2163 int ldlm_max_threads = LDLM_THREADS_AUTO_MAX;
2169 if (ldlm_state != NULL)
2172 OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
2173 if (ldlm_state == NULL)
2177 rc = ldlm_proc_setup();
2183 if (ldlm_num_threads) {
2184 /* If ldlm_num_threads is set, it is the min and the max. */
2185 if (ldlm_num_threads > LDLM_THREADS_AUTO_MAX)
2186 ldlm_num_threads = LDLM_THREADS_AUTO_MAX;
2187 if (ldlm_num_threads < LDLM_THREADS_AUTO_MIN)
2188 ldlm_num_threads = LDLM_THREADS_AUTO_MIN;
2189 ldlm_min_threads = ldlm_max_threads = ldlm_num_threads;
2193 ldlm_state->ldlm_cb_service =
2194 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
2195 LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
2196 LDLM_CB_REPLY_PORTAL, 2,
2197 ldlm_callback_handler, "ldlm_cbd",
2198 ldlm_svc_proc_dir, NULL,
2199 ldlm_min_threads, ldlm_max_threads,
2202 if (!ldlm_state->ldlm_cb_service) {
2203 CERROR("failed to start service\n");
2204 GOTO(out_proc, rc = -ENOMEM);
2207 ldlm_state->ldlm_cancel_service =
2208 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
2209 LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL,
2210 LDLM_CANCEL_REPLY_PORTAL, 6,
2211 ldlm_cancel_handler, "ldlm_canceld",
2212 ldlm_svc_proc_dir, NULL,
2213 ldlm_min_threads, ldlm_max_threads,
2216 if (!ldlm_state->ldlm_cancel_service) {
2217 CERROR("failed to start service\n");
2218 GOTO(out_proc, rc = -ENOMEM);
2221 OBD_ALLOC(blp, sizeof(*blp));
2223 GOTO(out_proc, rc = -ENOMEM);
2224 ldlm_state->ldlm_bl_pool = blp;
2226 spin_lock_init(&blp->blp_lock);
2227 CFS_INIT_LIST_HEAD(&blp->blp_list);
2228 CFS_INIT_LIST_HEAD(&blp->blp_prio_list);
2229 cfs_waitq_init(&blp->blp_waitq);
2230 atomic_set(&blp->blp_num_threads, 0);
2231 atomic_set(&blp->blp_busy_threads, 0);
2232 blp->blp_min_threads = ldlm_min_threads;
2233 blp->blp_max_threads = ldlm_max_threads;
2236 for (i = 0; i < blp->blp_min_threads; i++) {
2237 rc = ldlm_bl_thread_start(blp);
2239 GOTO(out_thread, rc);
2242 rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cancel_service);
2244 GOTO(out_thread, rc);
2246 rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cb_service);
2248 GOTO(out_thread, rc);
2250 CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
2251 expired_lock_thread.elt_state = ELT_STOPPED;
2252 cfs_waitq_init(&expired_lock_thread.elt_waitq);
2254 CFS_INIT_LIST_HEAD(&waiting_locks_list);
2255 spin_lock_init(&waiting_locks_spinlock);
2256 cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
2258 rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES);
2260 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
2261 GOTO(out_thread, rc);
2264 wait_event(expired_lock_thread.elt_waitq,
2265 expired_lock_thread.elt_state == ELT_READY);
2269 rc = ldlm_pools_init();
2271 GOTO(out_thread, rc);
2278 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
2279 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
2284 ldlm_proc_cleanup();
2287 OBD_FREE(ldlm_state, sizeof(*ldlm_state));
2292 static int ldlm_cleanup(void)
2295 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
2299 if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
2300 !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
2301 CERROR("ldlm still has namespaces; clean these up first.\n");
2302 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
2303 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
2312 while (atomic_read(&blp->blp_num_threads) > 0) {
2313 struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
2315 init_completion(&blp->blp_comp);
2317 spin_lock(&blp->blp_lock);
2318 list_add_tail(&blwi.blwi_entry, &blp->blp_list);
2319 cfs_waitq_signal(&blp->blp_waitq);
2320 spin_unlock(&blp->blp_lock);
2322 wait_for_completion(&blp->blp_comp);
2324 OBD_FREE(blp, sizeof(*blp));
2326 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
2327 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
2328 ldlm_proc_cleanup();
2330 expired_lock_thread.elt_state = ELT_TERMINATE;
2331 cfs_waitq_signal(&expired_lock_thread.elt_waitq);
2332 wait_event(expired_lock_thread.elt_waitq,
2333 expired_lock_thread.elt_state == ELT_STOPPED);
2335 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
2336 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
2339 OBD_FREE(ldlm_state, sizeof(*ldlm_state));
2345 int __init ldlm_init(void)
2347 init_mutex(&ldlm_ref_sem);
2348 init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
2349 init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
2350 ldlm_resource_slab = cfs_mem_cache_create("ldlm_resources",
2351 sizeof(struct ldlm_resource), 0,
2352 SLAB_HWCACHE_ALIGN);
2353 if (ldlm_resource_slab == NULL)
2356 ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
2357 sizeof(struct ldlm_lock), 0,
2358 SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU);
2359 if (ldlm_lock_slab == NULL) {
2360 cfs_mem_cache_destroy(ldlm_resource_slab);
2364 ldlm_interval_slab = cfs_mem_cache_create("interval_node",
2365 sizeof(struct ldlm_interval),
2366 0, SLAB_HWCACHE_ALIGN);
2367 if (ldlm_interval_slab == NULL) {
2368 cfs_mem_cache_destroy(ldlm_resource_slab);
2369 cfs_mem_cache_destroy(ldlm_lock_slab);
2376 void __exit ldlm_exit(void)
2380 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
2381 rc = cfs_mem_cache_destroy(ldlm_resource_slab);
2382 LASSERTF(rc == 0, "couldn't free ldlm resource slab\n");
2384 /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
2385 * synchronize_rcu() to wait a grace period elapsed, so that
2386 * ldlm_lock_free() get a chance to be called. */
2389 rc = cfs_mem_cache_destroy(ldlm_lock_slab);
2390 LASSERTF(rc == 0, "couldn't free ldlm lock slab\n");
2391 rc = cfs_mem_cache_destroy(ldlm_interval_slab);
2392 LASSERTF(rc == 0, "couldn't free interval node slab\n");
2396 EXPORT_SYMBOL(ldlm_extent_shift_kms);
2399 EXPORT_SYMBOL(ldlm_get_processing_policy);
2400 EXPORT_SYMBOL(ldlm_lock2desc);
2401 EXPORT_SYMBOL(ldlm_register_intent);
2402 EXPORT_SYMBOL(ldlm_lockname);
2403 EXPORT_SYMBOL(ldlm_typename);
2404 EXPORT_SYMBOL(ldlm_lock2handle);
2405 EXPORT_SYMBOL(__ldlm_handle2lock);
2406 EXPORT_SYMBOL(ldlm_lock_get);
2407 EXPORT_SYMBOL(ldlm_lock_put);
2408 EXPORT_SYMBOL(ldlm_lock_fast_match);
2409 EXPORT_SYMBOL(ldlm_lock_match);
2410 EXPORT_SYMBOL(ldlm_lock_cancel);
2411 EXPORT_SYMBOL(ldlm_lock_addref);
2412 EXPORT_SYMBOL(ldlm_lock_decref);
2413 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
2414 EXPORT_SYMBOL(ldlm_lock_change_resource);
2415 EXPORT_SYMBOL(ldlm_lock_set_data);
2416 EXPORT_SYMBOL(ldlm_it2str);
2417 EXPORT_SYMBOL(ldlm_lock_dump);
2418 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2419 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
2420 EXPORT_SYMBOL(ldlm_lock_allow_match);
2422 /* ldlm_request.c */
2423 EXPORT_SYMBOL(ldlm_completion_ast);
2424 EXPORT_SYMBOL(ldlm_blocking_ast);
2425 EXPORT_SYMBOL(ldlm_glimpse_ast);
2426 EXPORT_SYMBOL(ldlm_expired_completion_wait);
2427 EXPORT_SYMBOL(ldlm_prep_enqueue_req);
2428 EXPORT_SYMBOL(ldlm_prep_elc_req);
2429 EXPORT_SYMBOL(ldlm_cli_convert);
2430 EXPORT_SYMBOL(ldlm_cli_enqueue);
2431 EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
2432 EXPORT_SYMBOL(ldlm_cli_enqueue_local);
2433 EXPORT_SYMBOL(ldlm_cli_cancel);
2434 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
2435 EXPORT_SYMBOL(ldlm_cli_cancel_req);
2436 EXPORT_SYMBOL(ldlm_cli_join_lru);
2437 EXPORT_SYMBOL(ldlm_replay_locks);
2438 EXPORT_SYMBOL(ldlm_resource_foreach);
2439 EXPORT_SYMBOL(ldlm_namespace_foreach);
2440 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
2441 EXPORT_SYMBOL(ldlm_resource_iterate);
2442 EXPORT_SYMBOL(ldlm_cancel_resource_local);
2443 EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
2444 EXPORT_SYMBOL(ldlm_cli_cancel_list);
2447 EXPORT_SYMBOL(ldlm_server_blocking_ast);
2448 EXPORT_SYMBOL(ldlm_server_completion_ast);
2449 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
2450 EXPORT_SYMBOL(ldlm_handle_enqueue);
2451 EXPORT_SYMBOL(ldlm_handle_cancel);
2452 EXPORT_SYMBOL(ldlm_request_cancel);
2453 EXPORT_SYMBOL(ldlm_handle_convert);
2454 EXPORT_SYMBOL(ldlm_del_waiting_lock);
2455 EXPORT_SYMBOL(ldlm_get_ref);
2456 EXPORT_SYMBOL(ldlm_put_ref);
2457 EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
2459 /* ldlm_resource.c */
2460 EXPORT_SYMBOL(ldlm_namespace_new);
2461 EXPORT_SYMBOL(ldlm_namespace_cleanup);
2462 EXPORT_SYMBOL(ldlm_namespace_free);
2463 EXPORT_SYMBOL(ldlm_namespace_dump);
2464 EXPORT_SYMBOL(ldlm_dump_all_namespaces);
2465 EXPORT_SYMBOL(ldlm_resource_get);
2466 EXPORT_SYMBOL(ldlm_resource_putref);
2467 EXPORT_SYMBOL(ldlm_resource_unlink_lock);
2470 EXPORT_SYMBOL(client_import_add_conn);
2471 EXPORT_SYMBOL(client_import_del_conn);
2472 EXPORT_SYMBOL(client_obd_setup);
2473 EXPORT_SYMBOL(client_obd_cleanup);
2474 EXPORT_SYMBOL(client_connect_import);
2475 EXPORT_SYMBOL(client_disconnect_export);
2476 EXPORT_SYMBOL(server_disconnect_export);
2477 EXPORT_SYMBOL(target_abort_recovery);
2478 EXPORT_SYMBOL(target_cleanup_recovery);
2479 EXPORT_SYMBOL(target_handle_connect);
2480 EXPORT_SYMBOL(target_destroy_export);
2481 EXPORT_SYMBOL(target_cancel_recovery_timer);
2482 EXPORT_SYMBOL(target_send_reply);
2483 EXPORT_SYMBOL(target_queue_recovery_request);
2484 EXPORT_SYMBOL(target_handle_ping);
2485 EXPORT_SYMBOL(target_pack_pool_reply);
2486 EXPORT_SYMBOL(target_handle_disconnect);
2487 EXPORT_SYMBOL(target_handle_reply);
2490 EXPORT_SYMBOL(lock_res_and_lock);
2491 EXPORT_SYMBOL(unlock_res_and_lock);