1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002-2004 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of the Lustre file system, http://www.lustre.org
9 * Lustre is a trademark of Cluster File Systems, Inc.
11 * You may have signed or agreed to another license before downloading
12 * this software. If so, you are bound by the terms and conditions
13 * of that agreement, and the following does not apply to you. See the
14 * LICENSE file included with this distribution for more information.
16 * If you did not agree to a different license, then this copy of Lustre
17 * is open source software; you can redistribute it and/or modify it
18 * under the terms of version 2 of the GNU General Public License as
19 * published by the Free Software Foundation.
21 * In either case, Lustre is distributed in the hope that it will be
22 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
23 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24 * license text for more details.
28 # define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_LDLM
33 # include <libcfs/libcfs.h>
35 # include <liblustre.h>
38 #include <lustre_dlm.h>
39 #include <obd_class.h>
40 #include <libcfs/list.h>
41 #include "ldlm_internal.h"
43 extern cfs_mem_cache_t *ldlm_resource_slab;
44 extern cfs_mem_cache_t *ldlm_lock_slab;
45 extern struct lustre_lock ldlm_handle_lock;
47 static struct semaphore ldlm_ref_sem;
48 static int ldlm_refcount;
50 static struct ldlm_state *ldlm_state;
52 inline cfs_time_t round_timeout(cfs_time_t timeout)
54 return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
57 /* timeout for initial callback (AST) reply (bz10399) */
58 static inline unsigned int ldlm_get_rq_timeout(void)
61 unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
63 return timeout < 1 ? 1 : timeout;
67 /* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
68 static spinlock_t waiting_locks_spinlock; /* BH lock (timer) */
69 static struct list_head waiting_locks_list;
70 static cfs_timer_t waiting_locks_timer;
72 static struct expired_lock_thread {
73 cfs_waitq_t elt_waitq;
76 struct list_head elt_expired_locks;
77 } expired_lock_thread;
82 #define ELT_TERMINATE 2
86 struct list_head blp_list;
87 cfs_waitq_t blp_waitq;
88 atomic_t blp_num_threads;
89 struct completion blp_comp;
92 struct ldlm_bl_work_item {
93 struct list_head blwi_entry;
94 struct ldlm_namespace *blwi_ns;
95 struct ldlm_lock_desc blwi_ld;
96 struct ldlm_lock *blwi_lock;
102 static inline int have_expired_locks(void)
107 spin_lock_bh(&waiting_locks_spinlock);
108 need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
109 spin_unlock_bh(&waiting_locks_spinlock);
114 static int expired_lock_main(void *arg)
116 struct list_head *expired = &expired_lock_thread.elt_expired_locks;
117 struct l_wait_info lwi = { 0 };
121 cfs_daemonize("ldlm_elt");
123 expired_lock_thread.elt_state = ELT_READY;
124 cfs_waitq_signal(&expired_lock_thread.elt_waitq);
127 l_wait_event(expired_lock_thread.elt_waitq,
128 have_expired_locks() ||
129 expired_lock_thread.elt_state == ELT_TERMINATE,
132 spin_lock_bh(&waiting_locks_spinlock);
133 if (expired_lock_thread.elt_dump) {
134 spin_unlock_bh(&waiting_locks_spinlock);
136 /* from waiting_locks_callback, but not in timer */
137 libcfs_debug_dumplog();
138 libcfs_run_lbug_upcall(__FILE__,
139 "waiting_locks_callback",
140 expired_lock_thread.elt_dump);
142 spin_lock_bh(&waiting_locks_spinlock);
143 expired_lock_thread.elt_dump = 0;
148 while (!list_empty(expired)) {
149 struct obd_export *export;
150 struct ldlm_lock *lock;
152 lock = list_entry(expired->next, struct ldlm_lock,
154 if ((void *)lock < LP_POISON + CFS_PAGE_SIZE &&
155 (void *)lock >= LP_POISON) {
156 spin_unlock_bh(&waiting_locks_spinlock);
157 CERROR("free lock on elt list %p\n", lock);
160 list_del_init(&lock->l_pending_chain);
161 if ((void *)lock->l_export < LP_POISON + CFS_PAGE_SIZE &&
162 (void *)lock->l_export >= LP_POISON) {
163 CERROR("lock with free export on elt list %p\n",
165 lock->l_export = NULL;
166 LDLM_ERROR(lock, "free export");
169 export = class_export_get(lock->l_export);
170 spin_unlock_bh(&waiting_locks_spinlock);
173 class_fail_export(export);
174 class_export_put(export);
175 spin_lock_bh(&waiting_locks_spinlock);
177 spin_unlock_bh(&waiting_locks_spinlock);
179 if (do_dump && obd_dump_on_eviction) {
180 CERROR("dump the log upon eviction\n");
181 libcfs_debug_dumplog();
184 if (expired_lock_thread.elt_state == ELT_TERMINATE)
188 expired_lock_thread.elt_state = ELT_STOPPED;
189 cfs_waitq_signal(&expired_lock_thread.elt_waitq);
193 /* This is called from within a timer interrupt and cannot schedule */
194 static void waiting_locks_callback(unsigned long unused)
196 struct ldlm_lock *lock, *last = NULL;
198 spin_lock_bh(&waiting_locks_spinlock);
199 while (!list_empty(&waiting_locks_list)) {
200 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
203 if (cfs_time_after(lock->l_callback_timeout, cfs_time_current())
204 || (lock->l_req_mode == LCK_GROUP))
207 LDLM_ERROR(lock, "lock callback timer expired after %lds: "
208 "evicting client at %s ",
209 cfs_time_current_sec()- lock->l_enqueued_time.tv_sec,
211 lock->l_export->exp_connection->c_peer.nid));
213 LDLM_ERROR(lock, "waiting on lock multiple times");
214 CERROR("wll %p n/p %p/%p, l_pending %p n/p %p/%p\n",
216 waiting_locks_list.next, waiting_locks_list.prev,
217 &lock->l_pending_chain,
218 lock->l_pending_chain.next,
219 lock->l_pending_chain.prev);
221 CFS_INIT_LIST_HEAD(&waiting_locks_list); /* HACK */
222 expired_lock_thread.elt_dump = __LINE__;
225 CEMERG("would be an LBUG, but isn't (bug 5653)\n");
226 libcfs_debug_dumpstack(NULL);
227 /*blocks* libcfs_debug_dumplog(); */
228 /*blocks* libcfs_run_lbug_upcall(file, func, line); */
233 list_del(&lock->l_pending_chain);
234 list_add(&lock->l_pending_chain,
235 &expired_lock_thread.elt_expired_locks);
238 if (!list_empty(&expired_lock_thread.elt_expired_locks)) {
239 if (obd_dump_on_timeout)
240 expired_lock_thread.elt_dump = __LINE__;
242 cfs_waitq_signal(&expired_lock_thread.elt_waitq);
246 * Make sure the timer will fire again if we have any locks
249 if (!list_empty(&waiting_locks_list)) {
250 cfs_time_t timeout_rounded;
251 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
253 timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout);
254 cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
256 spin_unlock_bh(&waiting_locks_spinlock);
260 * Indicate that we're waiting for a client to call us back cancelling a given
261 * lock. We add it to the pending-callback chain, and schedule the lock-timeout
262 * timer to fire appropriately. (We round up to the next second, to avoid
263 * floods of timer firings during periods of high lock contention and traffic).
265 * Called with the namespace lock held.
267 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock)
270 cfs_time_t timeout_rounded;
272 if (!list_empty(&lock->l_pending_chain))
275 timeout = ldlm_get_enq_timeout(lock);
277 lock->l_callback_timeout = cfs_time_shift(timeout);
279 timeout_rounded = round_timeout(lock->l_callback_timeout);
281 if (cfs_time_before(timeout_rounded,
282 cfs_timer_deadline(&waiting_locks_timer)) ||
283 !cfs_timer_is_armed(&waiting_locks_timer)) {
284 cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
286 /* if the new lock has a shorter timeout than something earlier on
287 the list, we'll wait the longer amount of time; no big deal. */
288 list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
292 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
296 LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
298 spin_lock_bh(&waiting_locks_spinlock);
299 if (lock->l_destroyed) {
300 static cfs_time_t next;
301 spin_unlock_bh(&waiting_locks_spinlock);
302 LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
303 if (cfs_time_after(cfs_time_current(), next)) {
304 next = cfs_time_shift(14400);
305 libcfs_debug_dumpstack(NULL);
310 ret = __ldlm_add_waiting_lock(lock);
311 spin_unlock_bh(&waiting_locks_spinlock);
313 LDLM_DEBUG(lock, "%sadding to wait list",
314 ret == 0 ? "not re-" : "");
319 * Remove a lock from the pending list, likely because it had its cancellation
320 * callback arrive without incident. This adjusts the lock-timeout timer if
321 * needed. Returns 0 if the lock wasn't pending after all, 1 if it was.
323 * Called with namespace lock held.
325 int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
327 struct list_head *list_next;
329 if (list_empty(&lock->l_pending_chain))
332 list_next = lock->l_pending_chain.next;
333 if (lock->l_pending_chain.prev == &waiting_locks_list) {
334 /* Removing the head of the list, adjust timer. */
335 if (list_next == &waiting_locks_list) {
336 /* No more, just cancel. */
337 cfs_timer_disarm(&waiting_locks_timer);
339 struct ldlm_lock *next;
340 next = list_entry(list_next, struct ldlm_lock,
342 cfs_timer_arm(&waiting_locks_timer,
343 round_timeout(next->l_callback_timeout));
346 list_del_init(&lock->l_pending_chain);
351 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
355 if (lock->l_export == NULL) {
356 /* We don't have a "waiting locks list" on clients. */
357 LDLM_DEBUG(lock, "client lock: no-op");
361 spin_lock_bh(&waiting_locks_spinlock);
362 ret = __ldlm_del_waiting_lock(lock);
363 spin_unlock_bh(&waiting_locks_spinlock);
365 LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
372 * Called with namespace lock held.
374 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
376 if (lock->l_export == NULL) {
377 /* We don't have a "waiting locks list" on clients. */
378 LDLM_DEBUG(lock, "client lock: no-op");
382 spin_lock_bh(&waiting_locks_spinlock);
384 if (list_empty(&lock->l_pending_chain)) {
385 spin_unlock_bh(&waiting_locks_spinlock);
386 LDLM_DEBUG(lock, "wasn't waiting");
390 __ldlm_del_waiting_lock(lock);
391 __ldlm_add_waiting_lock(lock);
392 spin_unlock_bh(&waiting_locks_spinlock);
394 LDLM_DEBUG(lock, "refreshed");
398 #else /* !__KERNEL__ */
400 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
402 LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
406 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
411 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
415 #endif /* __KERNEL__ */
417 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
418 const char *ast_type)
420 struct ptlrpc_connection *conn = lock->l_export->exp_connection;
421 char *str = libcfs_nid2str(conn->c_peer.nid);
423 LCONSOLE_ERROR_MSG(0x138, "%s: A client on nid %s was evicted due "
424 "to a lock %s callback to %s timed out: rc %d\n",
425 lock->l_export->exp_obd->obd_name, str,
426 ast_type, obd_export_nid2str(lock->l_export), rc);
428 if (obd_dump_on_timeout)
429 libcfs_debug_dumplog();
430 class_fail_export(lock->l_export);
433 static int ldlm_handle_ast_error(struct ldlm_lock *lock,
434 struct ptlrpc_request *req, int rc,
435 const char *ast_type)
437 lnet_process_id_t peer = req->rq_import->imp_connection->c_peer;
439 if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
440 LASSERT(lock->l_export);
441 if (lock->l_export->exp_libclient) {
442 LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
443 " timeout, just cancelling lock", ast_type,
444 libcfs_nid2str(peer.nid));
445 ldlm_lock_cancel(lock);
447 } else if (lock->l_flags & LDLM_FL_CANCEL) {
448 LDLM_DEBUG(lock, "%s AST timeout from nid %s, but "
449 "cancel was received (AST reply lost?)",
450 ast_type, libcfs_nid2str(peer.nid));
451 ldlm_lock_cancel(lock);
454 ldlm_del_waiting_lock(lock);
455 ldlm_failed_ast(lock, rc, ast_type);
459 LDLM_DEBUG(lock, "client (nid %s) returned %d"
460 " from %s AST - normal race",
461 libcfs_nid2str(peer.nid),
462 lustre_msg_get_status(req->rq_repmsg),
465 LDLM_ERROR(lock, "client (nid %s) returned %d "
466 "from %s AST", libcfs_nid2str(peer.nid),
467 (req->rq_repmsg != NULL) ?
468 lustre_msg_get_status(req->rq_repmsg) : 0,
470 ldlm_lock_cancel(lock);
471 /* Server-side AST functions are called from ldlm_reprocess_all,
472 * which needs to be told to please restart its reprocessing. */
479 static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc)
481 struct ldlm_cb_set_arg *arg;
482 struct ldlm_lock *lock;
485 LASSERT(data != NULL);
487 arg = req->rq_async_args.pointer_arg[0];
488 lock = req->rq_async_args.pointer_arg[1];
489 LASSERT(lock != NULL);
491 /* If client canceled the lock but the cancel has not
492 * been recieved yet, we need to update lvbo to have the
493 * proper attributes cached. */
494 if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
495 ldlm_res_lvbo_update(lock->l_resource, NULL,
497 rc = ldlm_handle_ast_error(lock, req, rc,
498 arg->type == LDLM_BL_CALLBACK
499 ? "blocking" : "completion");
505 atomic_set(&arg->restart, 1);
510 static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
511 struct ldlm_cb_set_arg *arg,
512 struct ldlm_lock *lock,
518 if (unlikely(instant_cancel)) {
519 rc = ptl_send_rpc(req, 1);
520 ptlrpc_req_finished(req);
522 /* If we cancelled the lock, we need to restart
523 * ldlm_reprocess_queue */
524 atomic_set(&arg->restart, 1);
527 ptlrpc_set_add_req(arg->set, req);
534 * ->l_blocking_ast() method for server-side locks. This is invoked when newly
535 * enqueued server lock conflicts with given one.
537 * Sends blocking ast rpc to the client owning that lock; arms timeout timer
538 * to wait for client response.
540 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
541 struct ldlm_lock_desc *desc,
542 void *data, int flag)
544 struct ldlm_cb_set_arg *arg = data;
545 struct ldlm_request *body;
546 struct ptlrpc_request *req;
547 int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
548 [DLM_LOCKREQ_OFF] = sizeof(*body) };
549 int instant_cancel = 0, rc;
552 if (flag == LDLM_CB_CANCELING) {
553 /* Don't need to do anything here. */
558 LASSERT(data != NULL);
560 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
561 LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 2, size,
566 req->rq_async_args.pointer_arg[0] = arg;
567 req->rq_async_args.pointer_arg[1] = lock;
568 req->rq_interpret_reply = ldlm_cb_interpret;
569 req->rq_no_resend = 1;
571 lock_res(lock->l_resource);
572 if (lock->l_granted_mode != lock->l_req_mode) {
573 /* this blocking AST will be communicated as part of the
574 * completion AST instead */
575 unlock_res(lock->l_resource);
576 ptlrpc_req_finished(req);
577 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
581 if (lock->l_destroyed) {
582 /* What's the point? */
583 unlock_res(lock->l_resource);
584 ptlrpc_req_finished(req);
589 if (CURRENT_SECONDS - lock->l_export->exp_last_request_time > 30){
590 unlock_res(lock->l_resource);
591 ptlrpc_req_finished(req);
592 ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
597 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
600 body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
601 body->lock_handle[0] = lock->l_remote_handle;
602 body->lock_desc = *desc;
603 body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
605 LDLM_DEBUG(lock, "server preparing blocking AST");
607 ptlrpc_req_set_repsize(req, 1, NULL);
608 if (instant_cancel) {
609 unlock_res(lock->l_resource);
610 ldlm_lock_cancel(lock);
612 LASSERT(lock->l_granted_mode == lock->l_req_mode);
613 ldlm_add_waiting_lock(lock);
614 unlock_res(lock->l_resource);
617 req->rq_send_state = LUSTRE_IMP_FULL;
618 /* ptlrpc_prep_req already set timeout */
620 req->rq_timeout = ldlm_get_rq_timeout();
622 if (lock->l_export && lock->l_export->exp_ldlm_stats)
623 lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
624 LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
626 rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
631 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
633 struct ldlm_cb_set_arg *arg = data;
634 struct ldlm_request *body;
635 struct ptlrpc_request *req;
636 struct timeval granted_time;
637 long total_enqueue_wait;
638 int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
639 [DLM_LOCKREQ_OFF] = sizeof(*body) };
640 int rc, buffers = 2, instant_cancel = 0;
643 LASSERT(lock != NULL);
644 LASSERT(data != NULL);
646 do_gettimeofday(&granted_time);
647 total_enqueue_wait = cfs_timeval_sub(&granted_time,
648 &lock->l_enqueued_time, NULL);
650 if (total_enqueue_wait / ONE_MILLION > obd_timeout)
651 /* non-fatal with AT - change to LDLM_DEBUG? */
652 LDLM_ERROR(lock, "enqueue wait took %luus from %lu",
653 total_enqueue_wait, lock->l_enqueued_time.tv_sec);
655 lock_res_and_lock(lock);
656 if (lock->l_resource->lr_lvb_len) {
657 size[DLM_REQ_REC_OFF] = lock->l_resource->lr_lvb_len;
660 unlock_res_and_lock(lock);
662 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
663 LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK, buffers,
668 req->rq_async_args.pointer_arg[0] = arg;
669 req->rq_async_args.pointer_arg[1] = lock;
670 req->rq_interpret_reply = ldlm_cb_interpret;
671 req->rq_no_resend = 1;
673 body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
674 body->lock_handle[0] = lock->l_remote_handle;
675 body->lock_flags = flags;
676 ldlm_lock2desc(lock, &body->lock_desc);
681 lvb = lustre_msg_buf(req->rq_reqmsg, DLM_REQ_REC_OFF,
682 lock->l_resource->lr_lvb_len);
683 lock_res_and_lock(lock);
684 memcpy(lvb, lock->l_resource->lr_lvb_data,
685 lock->l_resource->lr_lvb_len);
686 unlock_res_and_lock(lock);
689 LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
692 /* Server-side enqueue wait time estimate, used in
693 __ldlm_add_waiting_lock to set future enqueue timers */
694 at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
695 total_enqueue_wait / ONE_MILLION);
697 ptlrpc_req_set_repsize(req, 1, NULL);
699 req->rq_send_state = LUSTRE_IMP_FULL;
700 /* ptlrpc_prep_req already set timeout */
702 req->rq_timeout = ldlm_get_rq_timeout();
704 /* We only send real blocking ASTs after the lock is granted */
705 lock_res_and_lock(lock);
706 if (lock->l_flags & LDLM_FL_AST_SENT) {
707 body->lock_flags |= LDLM_FL_AST_SENT;
709 /* We might get here prior to ldlm_handle_enqueue setting
710 * LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
711 * into waiting list, but this is safe and similar code in
712 * ldlm_handle_enqueue will call ldlm_lock_cancel() still,
713 * that would not only cancel the lock, but will also remove
714 * it from waiting list */
715 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
716 unlock_res_and_lock(lock);
717 ldlm_lock_cancel(lock);
719 lock_res_and_lock(lock);
721 ldlm_add_waiting_lock(lock); /* start the lock-timeout
725 unlock_res_and_lock(lock);
727 if (lock->l_export && lock->l_export->exp_ldlm_stats)
728 lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
729 LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
731 rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
736 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
738 struct ldlm_resource *res = lock->l_resource;
739 struct ldlm_request *body;
740 struct ptlrpc_request *req;
741 int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
742 [DLM_LOCKREQ_OFF] = sizeof(*body) };
746 LASSERT(lock != NULL);
748 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
749 LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK, 2, size,
754 body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
755 body->lock_handle[0] = lock->l_remote_handle;
756 ldlm_lock2desc(lock, &body->lock_desc);
758 lock_res_and_lock(lock);
759 size[REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
760 unlock_res_and_lock(lock);
761 res = lock->l_resource;
762 ptlrpc_req_set_repsize(req, 2, size);
764 req->rq_send_state = LUSTRE_IMP_FULL;
765 /* ptlrpc_prep_req already set timeout */
767 req->rq_timeout = ldlm_get_rq_timeout();
769 if (lock->l_export && lock->l_export->exp_ldlm_stats)
770 lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,
771 LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
773 rc = ptlrpc_queue_wait(req);
774 if (rc == -ELDLM_NO_LOCK_DATA)
775 LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
777 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
779 rc = ldlm_res_lvbo_update(res, req->rq_repmsg,
781 ptlrpc_req_finished(req);
785 static struct ldlm_lock *
786 find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl)
788 struct list_head *iter;
790 spin_lock(&exp->exp_ldlm_data.led_lock);
791 list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
792 struct ldlm_lock *lock;
793 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
794 if (lock->l_remote_handle.cookie == remote_hdl->cookie) {
796 spin_unlock(&exp->exp_ldlm_data.led_lock);
800 spin_unlock(&exp->exp_ldlm_data.led_lock);
806 * Main server-side entry point into LDLM. This is called by ptlrpc service
807 * threads to carry out client lock enqueueing requests.
809 int ldlm_handle_enqueue(struct ptlrpc_request *req,
810 ldlm_completion_callback completion_callback,
811 ldlm_blocking_callback blocking_callback,
812 ldlm_glimpse_callback glimpse_callback)
814 struct obd_device *obddev = req->rq_export->exp_obd;
815 struct ldlm_reply *dlm_rep;
816 struct ldlm_request *dlm_req;
817 int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
818 [DLM_LOCKREPLY_OFF] = sizeof(*dlm_rep) };
821 ldlm_error_t err = ELDLM_OK;
822 struct ldlm_lock *lock = NULL;
826 LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
828 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
829 lustre_swab_ldlm_request);
830 if (dlm_req == NULL) {
831 CERROR ("Can't unpack dlm_req\n");
832 GOTO(out, rc = -EFAULT);
835 ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF);
836 flags = dlm_req->lock_flags;
838 LASSERT(req->rq_export);
840 if (req->rq_export->exp_ldlm_stats)
841 lprocfs_counter_incr(req->rq_export->exp_ldlm_stats,
842 LDLM_ENQUEUE - LDLM_FIRST_OPC);
844 if (dlm_req->lock_desc.l_resource.lr_type < LDLM_MIN_TYPE ||
845 dlm_req->lock_desc.l_resource.lr_type >= LDLM_MAX_TYPE) {
846 DEBUG_REQ(D_ERROR, req, "invalid lock request type %d",
847 dlm_req->lock_desc.l_resource.lr_type);
848 GOTO(out, rc = -EFAULT);
851 if (dlm_req->lock_desc.l_req_mode <= LCK_MINMODE ||
852 dlm_req->lock_desc.l_req_mode >= LCK_MAXMODE ||
853 dlm_req->lock_desc.l_req_mode & (dlm_req->lock_desc.l_req_mode-1)) {
854 DEBUG_REQ(D_ERROR, req, "invalid lock request mode %d",
855 dlm_req->lock_desc.l_req_mode);
856 GOTO(out, rc = -EFAULT);
859 if (req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) {
860 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN) {
861 DEBUG_REQ(D_ERROR, req,
862 "PLAIN lock request from IBITS client?");
863 GOTO(out, rc = -EPROTO);
865 } else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) {
866 DEBUG_REQ(D_ERROR, req,
867 "IBITS lock request from unaware client?");
868 GOTO(out, rc = -EPROTO);
872 /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check
873 against server's _CONNECT_SUPPORTED flags? (I don't want to use
874 ibits for mgc/mgs) */
876 /* INODEBITS_INTEROP: Perform conversion from plain lock to
877 * inodebits lock if client does not support them. */
878 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) &&
879 (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) {
880 dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS;
881 dlm_req->lock_desc.l_policy_data.l_inodebits.bits =
882 MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
883 if (dlm_req->lock_desc.l_req_mode == LCK_PR)
884 dlm_req->lock_desc.l_req_mode = LCK_CR;
888 if (flags & LDLM_FL_REPLAY) {
889 lock = find_existing_lock(req->rq_export,
890 &dlm_req->lock_handle[0]);
892 DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
893 LPX64, lock->l_handle.h_cookie);
894 GOTO(existing_lock, rc = 0);
898 /* The lock's callback data might be set in the policy function */
899 lock = ldlm_lock_create(obddev->obd_namespace,
900 dlm_req->lock_desc.l_resource.lr_name,
901 dlm_req->lock_desc.l_resource.lr_type,
902 dlm_req->lock_desc.l_req_mode,
903 blocking_callback, completion_callback,
904 glimpse_callback, NULL, 0);
906 GOTO(out, rc = -ENOMEM);
908 do_gettimeofday(&lock->l_enqueued_time);
909 lock->l_remote_handle = dlm_req->lock_handle[0];
910 LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
912 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
913 /* Don't enqueue a lock onto the export if it has already
914 * been evicted. Cancel it now instead. (bug 3822) */
915 if (req->rq_export->exp_failed) {
916 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
917 GOTO(out, rc = -ENOTCONN);
919 lock->l_export = class_export_get(req->rq_export);
920 spin_lock(&lock->l_export->exp_ldlm_data.led_lock);
921 list_add(&lock->l_export_chain,
922 &lock->l_export->exp_ldlm_data.led_held_locks);
923 spin_unlock(&lock->l_export->exp_ldlm_data.led_lock);
927 if (flags & LDLM_FL_HAS_INTENT) {
928 /* In this case, the reply buffer is allocated deep in
929 * local_lock_enqueue by the policy function. */
934 lock_res_and_lock(lock);
935 if (lock->l_resource->lr_lvb_len) {
936 size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
939 unlock_res_and_lock(lock);
941 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
942 GOTO(out, rc = -ENOMEM);
944 rc = lustre_pack_reply(req, buffers, size, NULL);
949 if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
950 lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
951 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
952 lock->l_req_extent = lock->l_policy_data.l_extent;
954 err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
958 dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
960 dlm_rep->lock_flags = flags;
962 ldlm_lock2desc(lock, &dlm_rep->lock_desc);
963 ldlm_lock2handle(lock, &dlm_rep->lock_handle);
965 /* We never send a blocking AST until the lock is granted, but
966 * we can tell it right now */
967 lock_res_and_lock(lock);
969 /* Now take into account flags to be inherited from original lock
970 request both in reply to client and in our own lock flags. */
971 dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
972 lock->l_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
974 /* Don't move a pending lock onto the export if it has already
975 * been evicted. Cancel it now instead. (bug 5683) */
976 if (req->rq_export->exp_failed ||
977 OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT)) {
978 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
980 } else if (lock->l_flags & LDLM_FL_AST_SENT) {
981 dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
982 if (lock->l_granted_mode == lock->l_req_mode) {
983 /* Only cancel lock if it was granted, because it
984 * would be destroyed immediatelly and would never
985 * be granted in the future, causing timeouts on client.
986 * Not granted lock will be cancelled immediatelly after
987 * sending completion AST.
989 if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) {
990 unlock_res_and_lock(lock);
991 ldlm_lock_cancel(lock);
992 lock_res_and_lock(lock);
994 ldlm_add_waiting_lock(lock);
997 /* Make sure we never ever grant usual metadata locks to liblustre
999 if ((dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN ||
1000 dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) &&
1001 req->rq_export->exp_libclient) {
1002 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) ||
1003 !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
1004 CERROR("Granting sync lock to libclient. "
1005 "req fl %d, rep fl %d, lock fl %d\n",
1006 dlm_req->lock_flags, dlm_rep->lock_flags,
1008 LDLM_ERROR(lock, "sync lock");
1009 if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) {
1010 struct ldlm_intent *it;
1011 it = lustre_msg_buf(req->rq_reqmsg,
1015 CERROR("This is intent %s ("LPU64")\n",
1016 ldlm_it2str(it->opc), it->opc);
1022 unlock_res_and_lock(lock);
1026 req->rq_status = rc ?: err; /* return either error - bug 11190 */
1027 if (!req->rq_packed_final) {
1028 err = lustre_pack_reply(req, 1, NULL, NULL);
1033 /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
1034 * ldlm_reprocess_all. If this moves, revisit that code. -phil */
1036 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
1037 "(err=%d, rc=%d)", err, rc);
1040 lock_res_and_lock(lock);
1041 size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
1042 if (size[DLM_REPLY_REC_OFF] > 0) {
1043 void *lvb = lustre_msg_buf(req->rq_repmsg,
1045 size[DLM_REPLY_REC_OFF]);
1046 LASSERTF(lvb != NULL, "req %p, lock %p\n",
1049 memcpy(lvb, lock->l_resource->lr_lvb_data,
1050 size[DLM_REPLY_REC_OFF]);
1052 unlock_res_and_lock(lock);
1054 lock_res_and_lock(lock);
1055 ldlm_resource_unlink_lock(lock);
1056 ldlm_lock_destroy_nolock(lock);
1057 unlock_res_and_lock(lock);
1060 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
1061 ldlm_reprocess_all(lock->l_resource);
1063 LDLM_LOCK_PUT(lock);
1066 LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
1072 int ldlm_handle_convert(struct ptlrpc_request *req)
1074 struct ldlm_request *dlm_req;
1075 struct ldlm_reply *dlm_rep;
1076 struct ldlm_lock *lock;
1078 int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
1079 [DLM_LOCKREPLY_OFF] = sizeof(*dlm_rep) };
1082 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1083 lustre_swab_ldlm_request);
1084 if (dlm_req == NULL) {
1085 CERROR ("Can't unpack dlm_req\n");
1089 if (req->rq_export && req->rq_export->exp_ldlm_stats)
1090 lprocfs_counter_incr(req->rq_export->exp_ldlm_stats,
1091 LDLM_CONVERT - LDLM_FIRST_OPC);
1093 rc = lustre_pack_reply(req, 2, size, NULL);
1097 dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
1099 dlm_rep->lock_flags = dlm_req->lock_flags;
1101 lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
1103 req->rq_status = EINVAL;
1107 LDLM_DEBUG(lock, "server-side convert handler START");
1109 do_gettimeofday(&lock->l_enqueued_time);
1110 res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
1111 &dlm_rep->lock_flags);
1113 if (ldlm_del_waiting_lock(lock))
1114 LDLM_DEBUG(lock, "converted waiting lock");
1117 req->rq_status = EDEADLOCK;
1122 if (!req->rq_status)
1123 ldlm_reprocess_all(lock->l_resource);
1124 LDLM_DEBUG(lock, "server-side convert handler END");
1125 LDLM_LOCK_PUT(lock);
1127 LDLM_DEBUG_NOLOCK("server-side convert handler END");
1132 /* Cancel all the locks whos handles are packed into ldlm_request */
1133 int ldlm_request_cancel(struct ptlrpc_request *req,
1134 struct ldlm_request *dlm_req, int first)
1136 struct ldlm_resource *res, *pres = NULL;
1137 struct ldlm_lock *lock;
1138 int i, count, done = 0;
1141 count = dlm_req->lock_count ? dlm_req->lock_count : 1;
1145 /* There is no lock on the server at the replay time,
1146 * skip lock cancelling to make replay tests to pass. */
1147 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1150 LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks",
1152 for (i = first; i < count; i++) {
1153 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
1155 LDLM_DEBUG_NOLOCK("server-side cancel handler stale "
1156 "lock (cookie "LPU64")",
1157 dlm_req->lock_handle[i].cookie);
1162 res = lock->l_resource;
1165 ldlm_reprocess_all(pres);
1166 ldlm_resource_putref(pres);
1169 ldlm_resource_getref(res);
1170 ldlm_res_lvbo_update(res, NULL, 0, 1);
1174 ldlm_lock_cancel(lock);
1175 LDLM_LOCK_PUT(lock);
1178 ldlm_reprocess_all(pres);
1179 ldlm_resource_putref(pres);
1181 LDLM_DEBUG_NOLOCK("server-side cancel handler END");
1185 int ldlm_handle_cancel(struct ptlrpc_request *req)
1187 struct ldlm_request *dlm_req;
1191 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1192 lustre_swab_ldlm_request);
1193 if (dlm_req == NULL) {
1194 CERROR("bad request buffer for cancel\n");
1198 if (req->rq_export && req->rq_export->exp_ldlm_stats)
1199 lprocfs_counter_incr(req->rq_export->exp_ldlm_stats,
1200 LDLM_CANCEL - LDLM_FIRST_OPC);
1202 rc = lustre_pack_reply(req, 1, NULL, NULL);
1206 if (!ldlm_request_cancel(req, dlm_req, 0))
1207 req->rq_status = ESTALE;
1209 if (ptlrpc_reply(req) != 0)
1215 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
1216 struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
1221 LDLM_DEBUG(lock, "client blocking AST callback handler START");
1223 lock_res_and_lock(lock);
1224 lock->l_flags |= LDLM_FL_CBPENDING;
1226 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
1227 lock->l_flags |= LDLM_FL_CANCEL;
1229 do_ast = (!lock->l_readers && !lock->l_writers);
1230 unlock_res_and_lock(lock);
1233 LDLM_DEBUG(lock, "already unused, calling "
1234 "callback (%p)", lock->l_blocking_ast);
1235 if (lock->l_blocking_ast != NULL)
1236 lock->l_blocking_ast(lock, ld, lock->l_ast_data,
1239 LDLM_DEBUG(lock, "Lock still has references, will be"
1240 " cancelled later");
1243 LDLM_DEBUG(lock, "client blocking callback handler END");
1244 LDLM_LOCK_PUT(lock);
1248 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
1249 struct ldlm_namespace *ns,
1250 struct ldlm_request *dlm_req,
1251 struct ldlm_lock *lock)
1253 CFS_LIST_HEAD(ast_list);
1256 LDLM_DEBUG(lock, "client completion callback handler START");
1258 lock_res_and_lock(lock);
1260 /* If we receive the completion AST before the actual enqueue returned,
1261 * then we might need to switch lock modes, resources, or extents. */
1262 if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
1263 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
1264 LDLM_DEBUG(lock, "completion AST, new lock mode");
1267 if (lock->l_resource->lr_type != LDLM_PLAIN) {
1268 lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
1269 LDLM_DEBUG(lock, "completion AST, new policy data");
1272 ldlm_resource_unlink_lock(lock);
1273 if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
1274 &lock->l_resource->lr_name,
1275 sizeof(lock->l_resource->lr_name)) != 0) {
1276 unlock_res_and_lock(lock);
1277 ldlm_lock_change_resource(ns, lock,
1278 dlm_req->lock_desc.l_resource.lr_name);
1279 LDLM_DEBUG(lock, "completion AST, new resource");
1280 CERROR("change resource!\n");
1281 lock_res_and_lock(lock);
1284 if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
1285 lock->l_flags |= LDLM_FL_CBPENDING;
1286 LDLM_DEBUG(lock, "completion AST includes blocking AST");
1289 if (lock->l_lvb_len) {
1291 lvb = lustre_swab_reqbuf(req, DLM_REQ_REC_OFF, lock->l_lvb_len,
1292 lock->l_lvb_swabber);
1294 LDLM_ERROR(lock, "completion AST did not contain "
1297 memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
1301 ldlm_grant_lock(lock, &ast_list);
1302 unlock_res_and_lock(lock);
1304 LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
1306 ldlm_run_cp_ast_work(&ast_list);
1308 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
1310 LDLM_LOCK_PUT(lock);
1314 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
1315 struct ldlm_namespace *ns,
1316 struct ldlm_request *dlm_req,
1317 struct ldlm_lock *lock)
1322 LDLM_DEBUG(lock, "client glimpse AST callback handler");
1324 if (lock->l_glimpse_ast != NULL)
1325 rc = lock->l_glimpse_ast(lock, req);
1327 if (req->rq_repmsg != NULL) {
1330 req->rq_status = rc;
1334 lock_res_and_lock(lock);
1335 if (lock->l_granted_mode == LCK_PW &&
1336 !lock->l_readers && !lock->l_writers &&
1337 cfs_time_after(cfs_time_current(),
1338 cfs_time_add(lock->l_last_used,
1339 cfs_time_seconds(10)))) {
1340 unlock_res_and_lock(lock);
1341 if (ldlm_bl_to_thread(ns, NULL, lock, 0))
1342 ldlm_handle_bl_callback(ns, NULL, lock);
1347 unlock_res_and_lock(lock);
1348 LDLM_LOCK_PUT(lock);
1352 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
1354 req->rq_status = rc;
1355 if (!req->rq_packed_final) {
1356 rc = lustre_pack_reply(req, 1, NULL, NULL);
1360 return ptlrpc_reply(req);
1362 int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
1363 struct ldlm_lock *lock, int flags)
1366 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1367 struct ldlm_bl_work_item *blwi;
1370 OBD_ALLOC(blwi, sizeof(*blwi));
1376 blwi->blwi_ld = *ld;
1377 blwi->blwi_lock = lock;
1378 blwi->blwi_flags = flags;
1380 spin_lock(&blp->blp_lock);
1381 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
1382 cfs_waitq_signal(&blp->blp_waitq);
1383 spin_unlock(&blp->blp_lock);
1391 static int ldlm_callback_handler(struct ptlrpc_request *req)
1393 struct ldlm_namespace *ns;
1394 struct ldlm_request *dlm_req;
1395 struct ldlm_lock *lock;
1399 /* Requests arrive in sender's byte order. The ptlrpc service
1400 * handler has already checked and, if necessary, byte-swapped the
1401 * incoming request message body, but I am responsible for the
1402 * message buffers. */
1404 if (req->rq_export == NULL) {
1405 struct ldlm_request *dlm_req;
1407 CDEBUG(D_RPCTRACE, "operation %d from %s with bad "
1408 "export cookie "LPX64"; this is "
1409 "normal if this node rebooted with a lock held\n",
1410 lustre_msg_get_opc(req->rq_reqmsg),
1411 libcfs_id2str(req->rq_peer),
1412 lustre_msg_get_handle(req->rq_reqmsg)->cookie);
1414 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF,
1416 lustre_swab_ldlm_request);
1417 if (dlm_req != NULL)
1418 CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
1419 dlm_req->lock_handle[0].cookie);
1421 ldlm_callback_reply(req, -ENOTCONN);
1425 LASSERT(req->rq_export != NULL);
1426 LASSERT(req->rq_export->exp_obd != NULL);
1428 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1429 case LDLM_BL_CALLBACK:
1430 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1432 case LDLM_CP_CALLBACK:
1433 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
1435 case LDLM_GL_CALLBACK:
1436 OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
1438 case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
1439 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1440 rc = llog_origin_handle_cancel(req);
1441 ldlm_callback_reply(req, rc);
1443 case OBD_QC_CALLBACK:
1444 OBD_FAIL_RETURN(OBD_FAIL_OBD_QC_CALLBACK_NET, 0);
1445 rc = target_handle_qc_callback(req);
1446 ldlm_callback_reply(req, rc);
1450 /* reply in handler */
1451 rc = target_handle_dqacq_callback(req);
1453 case LLOG_ORIGIN_HANDLE_CREATE:
1454 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1455 rc = llog_origin_handle_create(req);
1456 ldlm_callback_reply(req, rc);
1458 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1459 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1460 rc = llog_origin_handle_next_block(req);
1461 ldlm_callback_reply(req, rc);
1463 case LLOG_ORIGIN_HANDLE_READ_HEADER:
1464 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1465 rc = llog_origin_handle_read_header(req);
1466 ldlm_callback_reply(req, rc);
1468 case LLOG_ORIGIN_HANDLE_CLOSE:
1469 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1470 rc = llog_origin_handle_close(req);
1471 ldlm_callback_reply(req, rc);
1474 CERROR("unknown opcode %u\n",
1475 lustre_msg_get_opc(req->rq_reqmsg));
1476 ldlm_callback_reply(req, -EPROTO);
1480 ns = req->rq_export->exp_obd->obd_namespace;
1481 LASSERT(ns != NULL);
1483 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1484 lustre_swab_ldlm_request);
1485 if (dlm_req == NULL) {
1486 CERROR ("can't unpack dlm_req\n");
1487 ldlm_callback_reply(req, -EPROTO);
1491 lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle[0]);
1493 CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
1494 "disappeared\n", dlm_req->lock_handle[0].cookie);
1495 ldlm_callback_reply(req, -EINVAL);
1499 /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
1500 lock_res_and_lock(lock);
1501 lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
1502 if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
1503 /* If somebody cancels locks and cache is already droped,
1504 * we can tell the server we have no lock. Otherwise, we
1505 * should send cancel after dropping the cache. */
1506 if ((lock->l_flags & LDLM_FL_CANCELING) &&
1507 (lock->l_flags & LDLM_FL_BL_DONE)) {
1508 LDLM_DEBUG(lock, "callback on lock "
1509 LPX64" - lock disappeared\n",
1510 dlm_req->lock_handle[0].cookie);
1511 unlock_res_and_lock(lock);
1512 LDLM_LOCK_PUT(lock);
1513 ldlm_callback_reply(req, -EINVAL);
1516 lock->l_flags |= LDLM_FL_BL_AST;
1518 unlock_res_and_lock(lock);
1520 /* We want the ost thread to get this reply so that it can respond
1521 * to ost requests (write cache writeback) that might be triggered
1524 * But we'd also like to be able to indicate in the reply that we're
1525 * cancelling right now, because it's unused, or have an intent result
1526 * in the reply, so we might have to push the responsibility for sending
1527 * the reply down into the AST handlers, alas. */
1529 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1530 case LDLM_BL_CALLBACK:
1531 CDEBUG(D_INODE, "blocking ast\n");
1532 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
1533 ldlm_callback_reply(req, 0);
1534 if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock, 0))
1535 ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
1537 case LDLM_CP_CALLBACK:
1538 CDEBUG(D_INODE, "completion ast\n");
1539 ldlm_callback_reply(req, 0);
1540 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
1542 case LDLM_GL_CALLBACK:
1543 CDEBUG(D_INODE, "glimpse ast\n");
1544 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
1547 LBUG(); /* checked above */
1553 static int ldlm_cancel_handler(struct ptlrpc_request *req)
1558 /* Requests arrive in sender's byte order. The ptlrpc service
1559 * handler has already checked and, if necessary, byte-swapped the
1560 * incoming request message body, but I am responsible for the
1561 * message buffers. */
1563 if (req->rq_export == NULL) {
1564 struct ldlm_request *dlm_req;
1566 CERROR("operation %d from %s with bad export cookie "LPU64"\n",
1567 lustre_msg_get_opc(req->rq_reqmsg),
1568 libcfs_id2str(req->rq_peer),
1569 lustre_msg_get_handle(req->rq_reqmsg)->cookie);
1571 dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF,
1573 lustre_swab_ldlm_request);
1574 if (dlm_req != NULL)
1575 ldlm_lock_dump_handle(D_ERROR,
1576 &dlm_req->lock_handle[0]);
1578 ldlm_callback_reply(req, -ENOTCONN);
1582 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1584 /* XXX FIXME move this back to mds/handler.c, bug 249 */
1586 CDEBUG(D_INODE, "cancel\n");
1587 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1588 rc = ldlm_handle_cancel(req);
1592 case OBD_LOG_CANCEL:
1593 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1594 rc = llog_origin_handle_cancel(req);
1595 ldlm_callback_reply(req, rc);
1598 CERROR("invalid opcode %d\n",
1599 lustre_msg_get_opc(req->rq_reqmsg));
1600 ldlm_callback_reply(req, -EINVAL);
1607 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
1609 struct ldlm_bl_work_item *blwi = NULL;
1611 spin_lock(&blp->blp_lock);
1612 if (!list_empty(&blp->blp_list)) {
1613 blwi = list_entry(blp->blp_list.next, struct ldlm_bl_work_item,
1615 list_del(&blwi->blwi_entry);
1617 spin_unlock(&blp->blp_lock);
1622 struct ldlm_bl_thread_data {
1624 struct ldlm_bl_pool *bltd_blp;
1627 static int ldlm_bl_thread_main(void *arg)
1629 struct ldlm_bl_thread_data *bltd = arg;
1630 struct ldlm_bl_pool *blp = bltd->bltd_blp;
1634 char name[CFS_CURPROC_COMM_MAX];
1635 snprintf(name, sizeof(name) - 1, "ldlm_bl_%02d",
1637 cfs_daemonize(name);
1640 atomic_inc(&blp->blp_num_threads);
1641 complete(&blp->blp_comp);
1644 struct l_wait_info lwi = { 0 };
1645 struct ldlm_bl_work_item *blwi = NULL;
1647 l_wait_event_exclusive(blp->blp_waitq,
1648 (blwi = ldlm_bl_get_work(blp)) != NULL,
1651 if (blwi->blwi_ns == NULL)
1654 if (blwi->blwi_flags == LDLM_FL_CANCELING) {
1655 /* The special case when we cancel locks in lru
1656 * asynchronously, then we first remove the lock from
1657 * l_bl_ast explicitely in ldlm_cancel_lru before
1658 * sending it to this thread. Thus lock is marked
1659 * LDLM_FL_CANCELING, and already cancelled locally. */
1660 CFS_LIST_HEAD(head);
1661 LASSERT(list_empty(&blwi->blwi_lock->l_bl_ast));
1662 list_add(&blwi->blwi_lock->l_bl_ast, &head);
1663 ldlm_cli_cancel_req(blwi->blwi_lock->l_conn_export,
1665 LDLM_LOCK_PUT(blwi->blwi_lock);
1667 ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
1670 OBD_FREE(blwi, sizeof(*blwi));
1673 atomic_dec(&blp->blp_num_threads);
1674 complete(&blp->blp_comp);
1680 static int ldlm_setup(void);
1681 static int ldlm_cleanup(int force);
1683 int ldlm_get_ref(void)
1687 mutex_down(&ldlm_ref_sem);
1688 if (++ldlm_refcount == 1) {
1693 mutex_up(&ldlm_ref_sem);
1698 void ldlm_put_ref(int force)
1701 mutex_down(&ldlm_ref_sem);
1702 if (ldlm_refcount == 1) {
1703 int rc = ldlm_cleanup(force);
1705 CERROR("ldlm_cleanup failed: %d\n", rc);
1711 mutex_up(&ldlm_ref_sem);
1716 static int ldlm_setup(void)
1718 struct ldlm_bl_pool *blp;
1725 if (ldlm_state != NULL)
1728 OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
1729 if (ldlm_state == NULL)
1733 rc = ldlm_proc_setup();
1738 ldlm_state->ldlm_cb_service =
1739 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
1740 LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
1741 LDLM_CB_REPLY_PORTAL, 1800,
1742 ldlm_callback_handler, "ldlm_cbd",
1743 ldlm_svc_proc_dir, NULL,
1744 LDLM_THREADS_AUTO_MIN, LDLM_THREADS_AUTO_MAX,
1747 if (!ldlm_state->ldlm_cb_service) {
1748 CERROR("failed to start service\n");
1749 GOTO(out_proc, rc = -ENOMEM);
1752 ldlm_state->ldlm_cancel_service =
1753 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
1754 LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL,
1755 LDLM_CANCEL_REPLY_PORTAL, 6000,
1756 ldlm_cancel_handler, "ldlm_canceld",
1757 ldlm_svc_proc_dir, NULL,
1758 LDLM_THREADS_AUTO_MIN, LDLM_THREADS_AUTO_MAX,
1761 if (!ldlm_state->ldlm_cancel_service) {
1762 CERROR("failed to start service\n");
1763 GOTO(out_proc, rc = -ENOMEM);
1766 OBD_ALLOC(blp, sizeof(*blp));
1768 GOTO(out_proc, rc = -ENOMEM);
1769 ldlm_state->ldlm_bl_pool = blp;
1771 atomic_set(&blp->blp_num_threads, 0);
1772 cfs_waitq_init(&blp->blp_waitq);
1773 spin_lock_init(&blp->blp_lock);
1775 CFS_INIT_LIST_HEAD(&blp->blp_list);
1778 for (i = 0; i < LDLM_BL_THREADS; i++) {
1779 struct ldlm_bl_thread_data bltd = {
1783 init_completion(&blp->blp_comp);
1784 rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
1786 CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
1787 GOTO(out_thread, rc);
1789 wait_for_completion(&blp->blp_comp);
1792 rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cancel_service);
1794 GOTO(out_thread, rc);
1796 rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cb_service);
1798 GOTO(out_thread, rc);
1800 CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
1801 expired_lock_thread.elt_state = ELT_STOPPED;
1802 cfs_waitq_init(&expired_lock_thread.elt_waitq);
1804 CFS_INIT_LIST_HEAD(&waiting_locks_list);
1805 spin_lock_init(&waiting_locks_spinlock);
1806 cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
1808 rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES);
1810 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
1811 GOTO(out_thread, rc);
1814 wait_event(expired_lock_thread.elt_waitq,
1815 expired_lock_thread.elt_state == ELT_READY);
1819 rc = ldlm_pools_init();
1821 GOTO(out_thread, rc);
1828 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1829 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1834 ldlm_proc_cleanup();
1837 OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1842 static int ldlm_cleanup(int force)
1845 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1849 if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1850 !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1851 CERROR("ldlm still has namespaces; clean these up first.\n");
1852 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1853 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1862 while (atomic_read(&blp->blp_num_threads) > 0) {
1863 struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1865 init_completion(&blp->blp_comp);
1867 spin_lock(&blp->blp_lock);
1868 list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1869 cfs_waitq_signal(&blp->blp_waitq);
1870 spin_unlock(&blp->blp_lock);
1872 wait_for_completion(&blp->blp_comp);
1874 OBD_FREE(blp, sizeof(*blp));
1876 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1877 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1878 ldlm_proc_cleanup();
1880 expired_lock_thread.elt_state = ELT_TERMINATE;
1881 cfs_waitq_signal(&expired_lock_thread.elt_waitq);
1882 wait_event(expired_lock_thread.elt_waitq,
1883 expired_lock_thread.elt_state == ELT_STOPPED);
1885 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1886 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1889 OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1895 int __init ldlm_init(void)
1897 init_mutex(&ldlm_ref_sem);
1898 init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1899 init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1900 ldlm_resource_slab = cfs_mem_cache_create("ldlm_resources",
1901 sizeof(struct ldlm_resource), 0,
1902 SLAB_HWCACHE_ALIGN);
1903 if (ldlm_resource_slab == NULL)
1906 ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
1907 sizeof(struct ldlm_lock), 0,
1908 SLAB_HWCACHE_ALIGN);
1909 if (ldlm_lock_slab == NULL) {
1910 cfs_mem_cache_destroy(ldlm_resource_slab);
1917 void __exit ldlm_exit(void)
1921 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1922 rc = cfs_mem_cache_destroy(ldlm_resource_slab);
1923 LASSERTF(rc == 0, "couldn't free ldlm resource slab\n");
1924 rc = cfs_mem_cache_destroy(ldlm_lock_slab);
1925 LASSERTF(rc == 0, "couldn't free ldlm lock slab\n");
1929 EXPORT_SYMBOL(ldlm_extent_shift_kms);
1932 EXPORT_SYMBOL(ldlm_get_processing_policy);
1933 EXPORT_SYMBOL(ldlm_lock2desc);
1934 EXPORT_SYMBOL(ldlm_register_intent);
1935 EXPORT_SYMBOL(ldlm_lockname);
1936 EXPORT_SYMBOL(ldlm_typename);
1937 EXPORT_SYMBOL(ldlm_lock2handle);
1938 EXPORT_SYMBOL(__ldlm_handle2lock);
1939 EXPORT_SYMBOL(ldlm_lock_get);
1940 EXPORT_SYMBOL(ldlm_lock_put);
1941 EXPORT_SYMBOL(ldlm_lock_match);
1942 EXPORT_SYMBOL(ldlm_lock_cancel);
1943 EXPORT_SYMBOL(ldlm_lock_addref);
1944 EXPORT_SYMBOL(ldlm_lock_decref);
1945 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
1946 EXPORT_SYMBOL(ldlm_lock_change_resource);
1947 EXPORT_SYMBOL(ldlm_lock_set_data);
1948 EXPORT_SYMBOL(ldlm_it2str);
1949 EXPORT_SYMBOL(ldlm_lock_dump);
1950 EXPORT_SYMBOL(ldlm_lock_dump_handle);
1951 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
1952 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1953 EXPORT_SYMBOL(ldlm_lock_allow_match);
1955 /* ldlm_request.c */
1956 EXPORT_SYMBOL(ldlm_completion_ast);
1957 EXPORT_SYMBOL(ldlm_blocking_ast);
1958 EXPORT_SYMBOL(ldlm_glimpse_ast);
1959 EXPORT_SYMBOL(ldlm_expired_completion_wait);
1960 EXPORT_SYMBOL(ldlm_prep_enqueue_req);
1961 EXPORT_SYMBOL(ldlm_cli_convert);
1962 EXPORT_SYMBOL(ldlm_cli_enqueue);
1963 EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
1964 EXPORT_SYMBOL(ldlm_cli_enqueue_local);
1965 EXPORT_SYMBOL(ldlm_cli_cancel);
1966 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
1967 EXPORT_SYMBOL(ldlm_cli_cancel_req);
1968 EXPORT_SYMBOL(ldlm_cli_join_lru);
1969 EXPORT_SYMBOL(ldlm_replay_locks);
1970 EXPORT_SYMBOL(ldlm_resource_foreach);
1971 EXPORT_SYMBOL(ldlm_namespace_foreach);
1972 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
1973 EXPORT_SYMBOL(ldlm_resource_iterate);
1974 EXPORT_SYMBOL(ldlm_cancel_resource_local);
1975 EXPORT_SYMBOL(ldlm_cli_cancel_list);
1978 EXPORT_SYMBOL(ldlm_server_blocking_ast);
1979 EXPORT_SYMBOL(ldlm_server_completion_ast);
1980 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
1981 EXPORT_SYMBOL(ldlm_handle_enqueue);
1982 EXPORT_SYMBOL(ldlm_handle_cancel);
1983 EXPORT_SYMBOL(ldlm_request_cancel);
1984 EXPORT_SYMBOL(ldlm_handle_convert);
1985 EXPORT_SYMBOL(ldlm_del_waiting_lock);
1986 EXPORT_SYMBOL(ldlm_get_ref);
1987 EXPORT_SYMBOL(ldlm_put_ref);
1988 EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
1990 /* ldlm_resource.c */
1991 EXPORT_SYMBOL(ldlm_namespace_new);
1992 EXPORT_SYMBOL(ldlm_namespace_cleanup);
1993 EXPORT_SYMBOL(ldlm_namespace_free);
1994 EXPORT_SYMBOL(ldlm_namespace_dump);
1995 EXPORT_SYMBOL(ldlm_dump_all_namespaces);
1996 EXPORT_SYMBOL(ldlm_resource_get);
1997 EXPORT_SYMBOL(ldlm_resource_putref);
1998 EXPORT_SYMBOL(ldlm_resource_unlink_lock);
2001 EXPORT_SYMBOL(client_import_add_conn);
2002 EXPORT_SYMBOL(client_import_del_conn);
2003 EXPORT_SYMBOL(client_obd_setup);
2004 EXPORT_SYMBOL(client_obd_cleanup);
2005 EXPORT_SYMBOL(client_connect_import);
2006 EXPORT_SYMBOL(client_disconnect_export);
2007 EXPORT_SYMBOL(target_abort_recovery);
2008 EXPORT_SYMBOL(target_cleanup_recovery);
2009 EXPORT_SYMBOL(target_handle_connect);
2010 EXPORT_SYMBOL(target_destroy_export);
2011 EXPORT_SYMBOL(target_cancel_recovery_timer);
2012 EXPORT_SYMBOL(target_send_reply);
2013 EXPORT_SYMBOL(target_queue_recovery_request);
2014 EXPORT_SYMBOL(target_handle_ping);
2015 EXPORT_SYMBOL(target_pack_pool_reply);
2016 EXPORT_SYMBOL(target_handle_disconnect);
2017 EXPORT_SYMBOL(target_queue_last_replay_reply);
2020 EXPORT_SYMBOL(lock_res_and_lock);
2021 EXPORT_SYMBOL(unlock_res_and_lock);