1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 # define EXPORT_SYMTAB
27 #define DEBUG_SUBSYSTEM S_LDLM
30 # include <linux/module.h>
31 # include <linux/slab.h>
32 # include <linux/init.h>
33 # include <linux/wait.h>
35 # include <liblustre.h>
38 #include <linux/lustre_dlm.h>
39 #include <linux/obd_class.h>
40 #include <portals/list.h>
41 #include "ldlm_internal.h"
43 extern kmem_cache_t *ldlm_resource_slab;
44 extern kmem_cache_t *ldlm_lock_slab;
45 extern struct lustre_lock ldlm_handle_lock;
46 extern struct list_head ldlm_namespace_list;
48 static DECLARE_MUTEX(ldlm_ref_sem);
49 static int ldlm_refcount;
53 static struct ldlm_state *ldlm_state;
55 inline unsigned long round_timeout(unsigned long timeout)
57 return ((timeout / HZ) + 1) * HZ;
61 /* XXX should this be per-ldlm? */
62 static struct list_head waiting_locks_list;
63 static spinlock_t waiting_locks_spinlock;
64 static struct timer_list waiting_locks_timer;
66 static struct expired_lock_thread {
67 wait_queue_head_t elt_waitq;
69 struct list_head elt_expired_locks;
71 } expired_lock_thread;
76 #define ELT_TERMINATE 2
80 struct list_head blp_list;
81 wait_queue_head_t blp_waitq;
82 atomic_t blp_num_threads;
83 struct completion blp_comp;
86 struct ldlm_bl_work_item {
87 struct list_head blwi_entry;
88 struct ldlm_namespace *blwi_ns;
89 struct ldlm_lock_desc blwi_ld;
90 struct ldlm_lock *blwi_lock;
95 static inline int have_expired_locks(void)
99 spin_lock_bh(&expired_lock_thread.elt_lock);
100 need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
101 spin_unlock_bh(&expired_lock_thread.elt_lock);
106 static int expired_lock_main(void *arg)
108 struct list_head *expired = &expired_lock_thread.elt_expired_locks;
109 struct l_wait_info lwi = { 0 };
114 kportal_daemonize("ldlm_elt");
116 SIGNAL_MASK_LOCK(current, flags);
117 sigfillset(¤t->blocked);
119 SIGNAL_MASK_UNLOCK(current, flags);
123 expired_lock_thread.elt_state = ELT_READY;
124 wake_up(&expired_lock_thread.elt_waitq);
127 l_wait_event(expired_lock_thread.elt_waitq,
128 have_expired_locks() ||
129 expired_lock_thread.elt_state == ELT_TERMINATE,
132 spin_lock_bh(&expired_lock_thread.elt_lock);
133 while (!list_empty(expired)) {
134 struct obd_export *export;
135 struct ldlm_lock *lock;
137 lock = list_entry(expired->next, struct ldlm_lock,
139 if ((void *)lock < LP_POISON + PAGE_SIZE &&
140 (void *)lock >= LP_POISON) {
141 CERROR("free lock on elt list %p\n", lock);
144 list_del_init(&lock->l_pending_chain);
145 if ((void *)lock->l_export < LP_POISON + PAGE_SIZE &&
146 (void *)lock->l_export >= LP_POISON + PAGE_SIZE) {
147 CERROR("lock with free export on elt list %p\n",
149 lock->l_export = NULL;
150 LDLM_ERROR(lock, "free export\n");
153 export = class_export_get(lock->l_export);
154 spin_unlock_bh(&expired_lock_thread.elt_lock);
156 ptlrpc_fail_export(export);
157 class_export_put(export);
158 spin_lock_bh(&expired_lock_thread.elt_lock);
160 spin_unlock_bh(&expired_lock_thread.elt_lock);
162 if (expired_lock_thread.elt_state == ELT_TERMINATE)
166 expired_lock_thread.elt_state = ELT_STOPPED;
167 wake_up(&expired_lock_thread.elt_waitq);
171 static void waiting_locks_callback(unsigned long unused)
173 struct ldlm_lock *lock;
174 char str[PTL_NALFMT_SIZE];
176 spin_lock_bh(&waiting_locks_spinlock);
177 while (!list_empty(&waiting_locks_list)) {
178 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
181 if (lock->l_callback_timeout > jiffies)
184 LDLM_ERROR(lock, "lock callback timer expired: evicting client "
185 "%s@%s nid "LPX64" (%s) ",
186 lock->l_export->exp_client_uuid.uuid,
187 lock->l_export->exp_connection->c_remote_uuid.uuid,
188 lock->l_export->exp_connection->c_peer.peer_nid,
189 portals_nid2str(lock->l_export->exp_connection->c_peer.peer_ni->pni_number,
190 lock->l_export->exp_connection->c_peer.peer_nid,
193 spin_lock_bh(&expired_lock_thread.elt_lock);
194 list_del(&lock->l_pending_chain);
195 list_add(&lock->l_pending_chain,
196 &expired_lock_thread.elt_expired_locks);
197 spin_unlock_bh(&expired_lock_thread.elt_lock);
198 wake_up(&expired_lock_thread.elt_waitq);
202 * Make sure the timer will fire again if we have any locks
205 if (!list_empty(&waiting_locks_list)) {
206 unsigned long timeout_rounded;
207 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
209 timeout_rounded = round_timeout(lock->l_callback_timeout);
210 mod_timer(&waiting_locks_timer, timeout_rounded);
212 spin_unlock_bh(&waiting_locks_spinlock);
216 * Indicate that we're waiting for a client to call us back cancelling a given
217 * lock. We add it to the pending-callback chain, and schedule the lock-timeout
218 * timer to fire appropriately. (We round up to the next second, to avoid
219 * floods of timer firings during periods of high lock contention and traffic).
221 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
223 unsigned long timeout_rounded;
225 spin_lock_bh(&waiting_locks_spinlock);
226 if (!list_empty(&lock->l_pending_chain)) {
227 LDLM_DEBUG(lock, "not re-adding to wait list");
228 spin_unlock_bh(&waiting_locks_spinlock);
231 LDLM_DEBUG(lock, "adding to wait list");
233 lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
235 timeout_rounded = round_timeout(lock->l_callback_timeout);
237 if (timeout_rounded < waiting_locks_timer.expires ||
238 !timer_pending(&waiting_locks_timer)) {
239 mod_timer(&waiting_locks_timer, timeout_rounded);
241 list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
242 spin_unlock_bh(&waiting_locks_spinlock);
247 * Remove a lock from the pending list, likely because it had its cancellation
248 * callback arrive without incident. This adjusts the lock-timeout timer if
249 * needed. Returns 0 if the lock wasn't pending after all, 1 if it was.
251 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
253 struct list_head *list_next;
255 if (lock->l_export == NULL) {
256 /* We don't have a "waiting locks list" on clients. */
257 LDLM_DEBUG(lock, "client lock: no-op");
261 spin_lock_bh(&waiting_locks_spinlock);
263 if (list_empty(&lock->l_pending_chain)) {
264 spin_unlock_bh(&waiting_locks_spinlock);
265 LDLM_DEBUG(lock, "wasn't waiting");
269 list_next = lock->l_pending_chain.next;
270 if (lock->l_pending_chain.prev == &waiting_locks_list) {
271 /* Removing the head of the list, adjust timer. */
272 if (list_next == &waiting_locks_list) {
273 /* No more, just cancel. */
274 del_timer(&waiting_locks_timer);
276 struct ldlm_lock *next;
277 next = list_entry(list_next, struct ldlm_lock,
279 mod_timer(&waiting_locks_timer,
280 round_timeout(next->l_callback_timeout));
284 spin_lock_bh(&expired_lock_thread.elt_lock);
285 list_del_init(&lock->l_pending_chain);
286 spin_unlock_bh(&expired_lock_thread.elt_lock);
288 spin_unlock_bh(&waiting_locks_spinlock);
289 LDLM_DEBUG(lock, "removed");
293 #else /* !__KERNEL__ */
295 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
300 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
305 #endif /* __KERNEL__ */
307 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,const char *ast_type)
309 const struct ptlrpc_connection *conn = lock->l_export->exp_connection;
310 char str[PTL_NALFMT_SIZE];
312 LDLM_ERROR(lock, "%s AST failed (%d): evicting client %s@%s NID "LPX64
313 " (%s)", ast_type, rc, lock->l_export->exp_client_uuid.uuid,
314 conn->c_remote_uuid.uuid, conn->c_peer.peer_nid,
315 portals_nid2str(conn->c_peer.peer_ni->pni_number,
316 conn->c_peer.peer_nid, str));
317 ptlrpc_fail_export(lock->l_export);
320 static int ldlm_handle_ast_error(struct ldlm_lock *lock,
321 struct ptlrpc_request *req, int rc,
322 const char *ast_type)
324 if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
325 LASSERT(lock->l_export);
326 if (lock->l_export->exp_libclient) {
327 LDLM_DEBUG(lock, "%s AST to liblustre client (nid "
328 LPU64") timeout, just cancelling lock",
329 ast_type, req->rq_peer.peer_nid);
330 ldlm_lock_cancel(lock);
333 ldlm_del_waiting_lock(lock);
334 ldlm_failed_ast(lock, rc, ast_type);
338 LDLM_DEBUG(lock, "client (nid "LPU64") returned %d"
339 " from %s AST - normal race",
340 req->rq_peer.peer_nid,
341 req->rq_repmsg->status, ast_type);
343 LDLM_ERROR(lock, "client (nid "LPU64") returned %d "
344 "from %s AST", req->rq_peer.peer_nid,
345 (req->rq_repmsg != NULL) ?
346 req->rq_repmsg->status : 0, ast_type);
347 ldlm_lock_cancel(lock);
348 /* Server-side AST functions are called from ldlm_reprocess_all,
349 * which needs to be told to please restart its reprocessing. */
356 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
357 struct ldlm_lock_desc *desc,
358 void *data, int flag)
360 struct ldlm_request *body;
361 struct ptlrpc_request *req;
362 int rc = 0, size = sizeof(*body);
365 if (flag == LDLM_CB_CANCELING) {
366 /* Don't need to do anything here. */
372 l_lock(&lock->l_resource->lr_namespace->ns_lock);
373 if (lock->l_granted_mode != lock->l_req_mode) {
374 /* this blocking AST will be communicated as part of the
375 * completion AST instead */
376 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
377 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
381 if (lock->l_destroyed) {
382 /* What's the point? */
383 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
388 if (LTIME_S(CURRENT_TIME) - lock->l_export->exp_last_request_time > 30){
389 ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
390 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
395 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
396 LDLM_BL_CALLBACK, 1, &size, NULL);
398 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
402 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
403 memcpy(&body->lock_handle1, &lock->l_remote_handle,
404 sizeof(body->lock_handle1));
405 memcpy(&body->lock_desc, desc, sizeof(*desc));
406 body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
408 LDLM_DEBUG(lock, "server preparing blocking AST");
409 req->rq_replen = lustre_msg_size(0, NULL);
411 if (lock->l_granted_mode == lock->l_req_mode)
412 ldlm_add_waiting_lock(lock);
413 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
415 req->rq_send_state = LUSTRE_IMP_FULL;
416 req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
417 rc = ptlrpc_queue_wait(req);
419 rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
421 ptlrpc_req_finished(req);
426 /* XXX copied from ptlrpc/service.c */
427 static long timeval_sub(struct timeval *large, struct timeval *small)
429 return (large->tv_sec - small->tv_sec) * 1000000 +
430 (large->tv_usec - small->tv_usec);
433 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
435 struct ldlm_request *body;
436 struct ptlrpc_request *req;
437 struct timeval granted_time;
438 long total_enqueue_wait;
439 int rc = 0, size[2] = {sizeof(*body)}, buffers = 1;
442 LASSERT(lock != NULL);
444 do_gettimeofday(&granted_time);
445 total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time);
447 if (total_enqueue_wait / 1000000 > obd_timeout)
448 LDLM_ERROR(lock, "enqueue wait took %luus", total_enqueue_wait);
450 down(&lock->l_resource->lr_lvb_sem);
451 if (lock->l_resource->lr_lvb_len) {
453 size[1] = lock->l_resource->lr_lvb_len;
455 up(&lock->l_resource->lr_lvb_sem);
457 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
458 LDLM_CP_CALLBACK, buffers, size, NULL);
462 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
463 memcpy(&body->lock_handle1, &lock->l_remote_handle,
464 sizeof(body->lock_handle1));
465 body->lock_flags = flags;
466 ldlm_lock2desc(lock, &body->lock_desc);
471 down(&lock->l_resource->lr_lvb_sem);
472 lvb = lustre_msg_buf(req->rq_reqmsg, 1,
473 lock->l_resource->lr_lvb_len);
474 memcpy(lvb, lock->l_resource->lr_lvb_data,
475 lock->l_resource->lr_lvb_len);
476 up(&lock->l_resource->lr_lvb_sem);
479 LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
481 req->rq_replen = lustre_msg_size(0, NULL);
483 req->rq_send_state = LUSTRE_IMP_FULL;
484 req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
486 /* We only send real blocking ASTs after the lock is granted */
487 l_lock(&lock->l_resource->lr_namespace->ns_lock);
488 if (lock->l_flags & LDLM_FL_AST_SENT) {
489 body->lock_flags |= LDLM_FL_AST_SENT;
490 ldlm_add_waiting_lock(lock); /* start the lock-timeout clock */
492 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
494 rc = ptlrpc_queue_wait(req);
496 rc = ldlm_handle_ast_error(lock, req, rc, "completion");
498 ptlrpc_req_finished(req);
503 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
505 struct ldlm_resource *res = lock->l_resource;
506 struct ldlm_request *body;
507 struct ptlrpc_request *req;
508 int rc = 0, size = sizeof(*body);
511 LASSERT(lock != NULL);
513 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
514 LDLM_GL_CALLBACK, 1, &size, NULL);
518 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
519 memcpy(&body->lock_handle1, &lock->l_remote_handle,
520 sizeof(body->lock_handle1));
521 ldlm_lock2desc(lock, &body->lock_desc);
523 down(&lock->l_resource->lr_lvb_sem);
524 size = lock->l_resource->lr_lvb_len;
525 up(&lock->l_resource->lr_lvb_sem);
526 req->rq_replen = lustre_msg_size(1, &size);
528 req->rq_send_state = LUSTRE_IMP_FULL;
529 req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
531 rc = ptlrpc_queue_wait(req);
532 if (rc == -ELDLM_NO_LOCK_DATA)
533 LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
535 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
537 rc = res->lr_namespace->ns_lvbo->lvbo_update
538 (res, req->rq_repmsg, 0, 1);
539 ptlrpc_req_finished(req);
543 int ldlm_handle_enqueue(struct ptlrpc_request *req,
544 ldlm_completion_callback completion_callback,
545 ldlm_blocking_callback blocking_callback,
546 ldlm_glimpse_callback glimpse_callback)
548 struct obd_device *obddev = req->rq_export->exp_obd;
549 struct ldlm_reply *dlm_rep;
550 struct ldlm_request *dlm_req;
551 int rc = 0, size[2] = {sizeof(*dlm_rep)};
553 ldlm_error_t err = ELDLM_OK;
554 struct ldlm_lock *lock = NULL;
558 LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
560 dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
561 lustre_swab_ldlm_request);
562 if (dlm_req == NULL) {
563 CERROR ("Can't unpack dlm_req\n");
564 GOTO(out, rc = -EFAULT);
567 flags = dlm_req->lock_flags;
569 /* The lock's callback data might be set in the policy function */
570 lock = ldlm_lock_create(obddev->obd_namespace, &dlm_req->lock_handle2,
571 dlm_req->lock_desc.l_resource.lr_name,
572 dlm_req->lock_desc.l_resource.lr_type,
573 dlm_req->lock_desc.l_req_mode,
574 blocking_callback, completion_callback,
575 glimpse_callback, NULL, 0);
577 GOTO(out, rc = -ENOMEM);
579 do_gettimeofday(&lock->l_enqueued_time);
580 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
581 sizeof(lock->l_remote_handle));
582 LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
584 LASSERT(req->rq_export);
586 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
587 l_lock(&lock->l_resource->lr_namespace->ns_lock);
588 if (req->rq_export->exp_failed) {
589 LDLM_ERROR(lock,"lock on destroyed export %p\n",req->rq_export);
590 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
591 GOTO(out, rc = -ENOTCONN);
593 lock->l_export = class_export_get(req->rq_export);
594 list_add(&lock->l_export_chain,
595 &lock->l_export->exp_ldlm_data.led_held_locks);
596 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
598 if (flags & LDLM_FL_HAS_INTENT) {
599 /* In this case, the reply buffer is allocated deep in
600 * local_lock_enqueue by the policy function. */
605 down(&lock->l_resource->lr_lvb_sem);
606 if (lock->l_resource->lr_lvb_len) {
607 size[1] = lock->l_resource->lr_lvb_len;
610 up(&lock->l_resource->lr_lvb_sem);
612 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
613 GOTO(out, rc = -ENOMEM);
615 rc = lustre_pack_reply(req, buffers, size, NULL);
620 if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
621 memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
622 sizeof(ldlm_policy_data_t));
623 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
624 memcpy(&lock->l_req_extent, &lock->l_policy_data.l_extent,
625 sizeof(lock->l_req_extent));
627 err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
631 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
632 dlm_rep->lock_flags = flags;
634 ldlm_lock2desc(lock, &dlm_rep->lock_desc);
635 ldlm_lock2handle(lock, &dlm_rep->lock_handle);
637 /* We never send a blocking AST until the lock is granted, but
638 * we can tell it right now */
639 l_lock(&lock->l_resource->lr_namespace->ns_lock);
640 if (lock->l_flags & LDLM_FL_AST_SENT) {
641 dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
642 if (lock->l_granted_mode == lock->l_req_mode)
643 ldlm_add_waiting_lock(lock);
645 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
649 req->rq_status = err;
650 if (req->rq_reply_state == NULL) {
651 err = lustre_pack_reply(req, 0, NULL, NULL);
657 /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
658 * ldlm_reprocess_all. If this moves, revisit that code. -phil */
660 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
661 "(err=%d, rc=%d)", err, rc);
664 down(&lock->l_resource->lr_lvb_sem);
665 size[1] = lock->l_resource->lr_lvb_len;
667 void *lvb = lustre_msg_buf(req->rq_repmsg,
669 LASSERTF(lvb != NULL, "req %p, lock %p\n",
672 memcpy(lvb, lock->l_resource->lr_lvb_data,
675 up(&lock->l_resource->lr_lvb_sem);
677 ldlm_lock_destroy(lock);
680 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
681 ldlm_reprocess_all(lock->l_resource);
684 LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
690 int ldlm_handle_convert(struct ptlrpc_request *req)
692 struct ldlm_request *dlm_req;
693 struct ldlm_reply *dlm_rep;
694 struct ldlm_lock *lock;
695 int rc, size = sizeof(*dlm_rep);
698 dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
699 lustre_swab_ldlm_request);
700 if (dlm_req == NULL) {
701 CERROR ("Can't unpack dlm_req\n");
705 rc = lustre_pack_reply(req, 1, &size, NULL);
707 CERROR("out of memory\n");
710 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
711 dlm_rep->lock_flags = dlm_req->lock_flags;
713 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
715 req->rq_status = EINVAL;
717 LDLM_DEBUG(lock, "server-side convert handler START");
718 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
719 &dlm_rep->lock_flags);
720 if (ldlm_del_waiting_lock(lock))
721 CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
726 ldlm_reprocess_all(lock->l_resource);
727 LDLM_DEBUG(lock, "server-side convert handler END");
730 LDLM_DEBUG_NOLOCK("server-side convert handler END");
735 int ldlm_handle_cancel(struct ptlrpc_request *req)
737 struct ldlm_request *dlm_req;
738 struct ldlm_lock *lock;
739 struct ldlm_resource *res;
740 char str[PTL_NALFMT_SIZE];
744 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
745 lustre_swab_ldlm_request);
746 if (dlm_req == NULL) {
747 CERROR("bad request buffer for cancel\n");
751 rc = lustre_pack_reply(req, 0, NULL, NULL);
753 CERROR("out of memory\n");
757 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
759 CERROR("received cancel for unknown lock cookie "LPX64
760 " from client %s nid "LPX64" (%s)\n",
761 dlm_req->lock_handle1.cookie,
762 req->rq_export->exp_client_uuid.uuid,
763 req->rq_peer.peer_nid,
764 portals_nid2str(req->rq_peer.peer_ni->pni_number,
765 req->rq_peer.peer_nid, str));
766 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
768 dlm_req->lock_handle1.cookie);
769 req->rq_status = ESTALE;
771 LDLM_DEBUG(lock, "server-side cancel handler START");
772 res = lock->l_resource;
773 if (res && res->lr_namespace->ns_lvbo &&
774 res->lr_namespace->ns_lvbo->lvbo_update) {
775 (void)res->lr_namespace->ns_lvbo->lvbo_update
777 //(res, req->rq_reqmsg, 1);
780 ldlm_lock_cancel(lock);
781 if (ldlm_del_waiting_lock(lock))
782 CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
786 if (ptlrpc_reply(req) != 0)
790 ldlm_reprocess_all(lock->l_resource);
791 LDLM_DEBUG(lock, "server-side cancel handler END");
798 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
799 struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
804 l_lock(&ns->ns_lock);
805 LDLM_DEBUG(lock, "client blocking AST callback handler START");
807 lock->l_flags |= LDLM_FL_CBPENDING;
808 do_ast = (!lock->l_readers && !lock->l_writers);
811 LDLM_DEBUG(lock, "already unused, calling "
812 "callback (%p)", lock->l_blocking_ast);
813 if (lock->l_blocking_ast != NULL) {
814 l_unlock(&ns->ns_lock);
815 l_check_no_ns_lock(ns);
816 lock->l_blocking_ast(lock, ld, lock->l_ast_data,
818 l_lock(&ns->ns_lock);
821 LDLM_DEBUG(lock, "Lock still has references, will be"
825 LDLM_DEBUG(lock, "client blocking callback handler END");
826 l_unlock(&ns->ns_lock);
831 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
832 struct ldlm_namespace *ns,
833 struct ldlm_request *dlm_req,
834 struct ldlm_lock *lock)
839 l_lock(&ns->ns_lock);
840 LDLM_DEBUG(lock, "client completion callback handler START");
842 /* If we receive the completion AST before the actual enqueue returned,
843 * then we might need to switch lock modes, resources, or extents. */
844 if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
845 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
846 LDLM_DEBUG(lock, "completion AST, new lock mode");
849 if (lock->l_resource->lr_type != LDLM_PLAIN) {
850 memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
851 sizeof(lock->l_policy_data));
852 LDLM_DEBUG(lock, "completion AST, new policy data");
855 ldlm_resource_unlink_lock(lock);
856 if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
857 &lock->l_resource->lr_name,
858 sizeof(lock->l_resource->lr_name)) != 0) {
859 ldlm_lock_change_resource(ns, lock,
860 dlm_req->lock_desc.l_resource.lr_name);
861 LDLM_DEBUG(lock, "completion AST, new resource");
864 if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
865 lock->l_flags |= LDLM_FL_CBPENDING;
866 LDLM_DEBUG(lock, "completion AST includes blocking AST");
869 if (lock->l_lvb_len) {
871 lvb = lustre_swab_reqbuf(req, 1, lock->l_lvb_len,
872 lock->l_lvb_swabber);
874 LDLM_ERROR(lock, "completion AST did not contain "
877 memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
881 lock->l_resource->lr_tmp = &ast_list;
882 ldlm_grant_lock(lock, req, sizeof(*req), 1);
883 lock->l_resource->lr_tmp = NULL;
884 LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
885 l_unlock(&ns->ns_lock);
888 ldlm_run_ast_work(ns, &ast_list);
890 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
895 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
896 struct ldlm_namespace *ns,
897 struct ldlm_request *dlm_req,
898 struct ldlm_lock *lock)
903 l_lock(&ns->ns_lock);
904 LDLM_DEBUG(lock, "client glimpse AST callback handler");
906 if (lock->l_glimpse_ast != NULL) {
907 l_unlock(&ns->ns_lock);
908 l_check_no_ns_lock(ns);
909 rc = lock->l_glimpse_ast(lock, req);
910 l_lock(&ns->ns_lock);
913 if (req->rq_repmsg != NULL) {
920 l_unlock(&ns->ns_lock);
921 if (lock->l_granted_mode == LCK_PW &&
922 !lock->l_readers && !lock->l_writers &&
923 time_after(jiffies, lock->l_last_used + 10 * HZ)) {
924 if (ldlm_bl_to_thread(ns, NULL, lock))
925 ldlm_handle_bl_callback(ns, NULL, lock);
934 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
937 if (req->rq_reply_state == NULL) {
938 rc = lustre_pack_reply(req, 0, NULL, NULL);
942 return ptlrpc_reply(req);
945 int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
946 struct ldlm_lock *lock)
949 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
950 struct ldlm_bl_work_item *blwi;
953 OBD_ALLOC(blwi, sizeof(*blwi));
960 blwi->blwi_lock = lock;
962 spin_lock(&blp->blp_lock);
963 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
964 wake_up(&blp->blp_waitq);
965 spin_unlock(&blp->blp_lock);
973 static int ldlm_callback_handler(struct ptlrpc_request *req)
975 struct ldlm_namespace *ns;
976 struct ldlm_request *dlm_req;
977 struct ldlm_lock *lock;
978 char str[PTL_NALFMT_SIZE];
982 /* Requests arrive in sender's byte order. The ptlrpc service
983 * handler has already checked and, if necessary, byte-swapped the
984 * incoming request message body, but I am responsible for the
985 * message buffers. */
987 if (req->rq_export == NULL) {
988 struct ldlm_request *dlm_req;
990 CDEBUG(D_RPCTRACE, "operation %d from nid "LPX64" (%s) with bad "
991 "export cookie "LPX64" (ptl req %d/rep %d); this is "
992 "normal if this node rebooted with a lock held\n",
993 req->rq_reqmsg->opc, req->rq_peer.peer_nid,
994 portals_nid2str(req->rq_peer.peer_ni->pni_number,
995 req->rq_peer.peer_nid, str),
996 req->rq_reqmsg->handle.cookie,
997 req->rq_request_portal, req->rq_reply_portal);
999 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
1000 lustre_swab_ldlm_request);
1001 if (dlm_req != NULL)
1002 CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
1003 dlm_req->lock_handle1.cookie);
1005 ldlm_callback_reply(req, -ENOTCONN);
1009 LASSERT(req->rq_export != NULL);
1010 LASSERT(req->rq_export->exp_obd != NULL);
1012 switch(req->rq_reqmsg->opc) {
1013 case LDLM_BL_CALLBACK:
1014 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1016 case LDLM_CP_CALLBACK:
1017 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
1019 case LDLM_GL_CALLBACK:
1020 OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
1022 case OBD_LOG_CANCEL:
1023 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1024 rc = llog_origin_handle_cancel(req);
1025 ldlm_callback_reply(req, rc);
1027 case LLOG_ORIGIN_HANDLE_CREATE:
1028 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1029 rc = llog_origin_handle_create(req);
1030 ldlm_callback_reply(req, rc);
1032 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1033 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1034 rc = llog_origin_handle_next_block(req);
1035 ldlm_callback_reply(req, rc);
1037 case LLOG_ORIGIN_HANDLE_READ_HEADER:
1038 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1039 rc = llog_origin_handle_read_header(req);
1040 ldlm_callback_reply(req, rc);
1042 case LLOG_ORIGIN_HANDLE_CLOSE:
1043 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1044 rc = llog_origin_handle_close(req);
1045 ldlm_callback_reply(req, rc);
1048 CERROR("unknown opcode %u\n", req->rq_reqmsg->opc);
1049 ldlm_callback_reply(req, -EPROTO);
1053 ns = req->rq_export->exp_obd->obd_namespace;
1054 LASSERT(ns != NULL);
1056 dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
1057 lustre_swab_ldlm_request);
1058 if (dlm_req == NULL) {
1059 CERROR ("can't unpack dlm_req\n");
1060 ldlm_callback_reply (req, -EPROTO);
1064 lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
1066 CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
1067 dlm_req->lock_handle1.cookie);
1068 ldlm_callback_reply(req, -EINVAL);
1072 /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
1073 lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
1075 /* We want the ost thread to get this reply so that it can respond
1076 * to ost requests (write cache writeback) that might be triggered
1079 * But we'd also like to be able to indicate in the reply that we're
1080 * cancelling right now, because it's unused, or have an intent result
1081 * in the reply, so we might have to push the responsibility for sending
1082 * the reply down into the AST handlers, alas. */
1084 switch (req->rq_reqmsg->opc) {
1085 case LDLM_BL_CALLBACK:
1086 CDEBUG(D_INODE, "blocking ast\n");
1087 ldlm_callback_reply(req, 0);
1088 if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock))
1089 ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
1091 case LDLM_CP_CALLBACK:
1092 CDEBUG(D_INODE, "completion ast\n");
1093 ldlm_callback_reply(req, 0);
1094 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
1096 case LDLM_GL_CALLBACK:
1097 CDEBUG(D_INODE, "glimpse ast\n");
1098 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
1101 LBUG(); /* checked above */
1107 static int ldlm_cancel_handler(struct ptlrpc_request *req)
1112 /* Requests arrive in sender's byte order. The ptlrpc service
1113 * handler has already checked and, if necessary, byte-swapped the
1114 * incoming request message body, but I am responsible for the
1115 * message buffers. */
1117 if (req->rq_export == NULL) {
1118 struct ldlm_request *dlm_req;
1119 CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
1120 req->rq_reqmsg->opc, req->rq_request_portal,
1121 req->rq_reply_portal);
1122 CERROR("--> export cookie: "LPX64"\n",
1123 req->rq_reqmsg->handle.cookie);
1124 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
1125 lustre_swab_ldlm_request);
1126 if (dlm_req != NULL)
1127 ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
1128 ldlm_callback_reply(req, -ENOTCONN);
1132 switch (req->rq_reqmsg->opc) {
1134 /* XXX FIXME move this back to mds/handler.c, bug 249 */
1136 CDEBUG(D_INODE, "cancel\n");
1137 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1138 rc = ldlm_handle_cancel(req);
1144 CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
1145 ldlm_callback_reply(req, -EINVAL);
1152 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
1154 struct ldlm_bl_work_item *blwi = NULL;
1156 spin_lock(&blp->blp_lock);
1157 if (!list_empty(&blp->blp_list)) {
1158 blwi = list_entry(blp->blp_list.next, struct ldlm_bl_work_item,
1160 list_del(&blwi->blwi_entry);
1162 spin_unlock(&blp->blp_lock);
1167 struct ldlm_bl_thread_data {
1169 struct ldlm_bl_pool *bltd_blp;
1172 static int ldlm_bl_thread_main(void *arg)
1174 struct ldlm_bl_thread_data *bltd = arg;
1175 struct ldlm_bl_pool *blp = bltd->bltd_blp;
1176 unsigned long flags;
1179 /* XXX boiler-plate */
1181 char name[sizeof(current->comm)];
1182 snprintf(name, sizeof(name) - 1, "ldlm_bl_%02d",
1184 kportal_daemonize(name);
1186 SIGNAL_MASK_LOCK(current, flags);
1187 sigfillset(¤t->blocked);
1189 SIGNAL_MASK_UNLOCK(current, flags);
1191 atomic_inc(&blp->blp_num_threads);
1192 complete(&blp->blp_comp);
1195 struct l_wait_info lwi = { 0 };
1196 struct ldlm_bl_work_item *blwi = NULL;
1198 l_wait_event_exclusive(blp->blp_waitq,
1199 (blwi = ldlm_bl_get_work(blp)) != NULL,
1202 if (blwi->blwi_ns == NULL)
1205 ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
1207 OBD_FREE(blwi, sizeof(*blwi));
1210 atomic_dec(&blp->blp_num_threads);
1211 complete(&blp->blp_comp);
1217 static int ldlm_setup(void);
1218 static int ldlm_cleanup(int force);
1220 int ldlm_get_ref(void)
1223 down(&ldlm_ref_sem);
1224 if (++ldlm_refcount == 1) {
1234 void ldlm_put_ref(int force)
1236 down(&ldlm_ref_sem);
1237 if (ldlm_refcount == 1) {
1238 int rc = ldlm_cleanup(force);
1240 CERROR("ldlm_cleanup failed: %d\n", rc);
1251 static int ldlm_setup(void)
1253 struct ldlm_bl_pool *blp;
1260 if (ldlm_state != NULL)
1263 OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
1264 if (ldlm_state == NULL)
1268 rc = ldlm_proc_setup();
1273 ldlm_state->ldlm_cb_service =
1274 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
1275 LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1276 ldlm_callback_handler, "ldlm_cbd",
1279 if (!ldlm_state->ldlm_cb_service) {
1280 CERROR("failed to start service\n");
1281 GOTO(out_proc, rc = -ENOMEM);
1284 ldlm_state->ldlm_cancel_service =
1285 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
1286 LDLM_CANCEL_REQUEST_PORTAL,
1287 LDLM_CANCEL_REPLY_PORTAL,
1288 ldlm_cancel_handler, "ldlm_canceld",
1291 if (!ldlm_state->ldlm_cancel_service) {
1292 CERROR("failed to start service\n");
1293 GOTO(out_proc, rc = -ENOMEM);
1296 OBD_ALLOC(blp, sizeof(*blp));
1298 GOTO(out_proc, rc = -ENOMEM);
1299 ldlm_state->ldlm_bl_pool = blp;
1301 atomic_set(&blp->blp_num_threads, 0);
1302 init_waitqueue_head(&blp->blp_waitq);
1303 spin_lock_init(&blp->blp_lock);
1305 INIT_LIST_HEAD(&blp->blp_list);
1308 for (i = 0; i < LDLM_NUM_THREADS; i++) {
1309 struct ldlm_bl_thread_data bltd = {
1313 init_completion(&blp->blp_comp);
1314 rc = kernel_thread(ldlm_bl_thread_main, &bltd, 0);
1316 CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
1317 GOTO(out_thread, rc);
1319 wait_for_completion(&blp->blp_comp);
1322 rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service,
1323 LDLM_NUM_THREADS, "ldlm_cn");
1325 GOTO(out_thread, rc);
1327 rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service,
1328 LDLM_NUM_THREADS, "ldlm_cb");
1330 GOTO(out_thread, rc);
1332 INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
1333 spin_lock_init(&expired_lock_thread.elt_lock);
1334 expired_lock_thread.elt_state = ELT_STOPPED;
1335 init_waitqueue_head(&expired_lock_thread.elt_waitq);
1337 rc = kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FS);
1339 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
1340 GOTO(out_thread, rc);
1343 wait_event(expired_lock_thread.elt_waitq,
1344 expired_lock_thread.elt_state == ELT_READY);
1346 INIT_LIST_HEAD(&waiting_locks_list);
1347 spin_lock_init(&waiting_locks_spinlock);
1348 waiting_locks_timer.function = waiting_locks_callback;
1349 waiting_locks_timer.data = 0;
1350 init_timer(&waiting_locks_timer);
1357 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1358 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1363 ldlm_proc_cleanup();
1366 OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1371 static int ldlm_cleanup(int force)
1374 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1378 if (!list_empty(&ldlm_namespace_list)) {
1379 CERROR("ldlm still has namespaces; clean these up first.\n");
1380 ldlm_dump_all_namespaces();
1385 while (atomic_read(&blp->blp_num_threads) > 0) {
1386 struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1388 init_completion(&blp->blp_comp);
1390 spin_lock(&blp->blp_lock);
1391 list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1392 wake_up(&blp->blp_waitq);
1393 spin_unlock(&blp->blp_lock);
1395 wait_for_completion(&blp->blp_comp);
1397 OBD_FREE(blp, sizeof(*blp));
1399 ptlrpc_stop_all_threads(ldlm_state->ldlm_cb_service);
1400 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1401 ptlrpc_stop_all_threads(ldlm_state->ldlm_cancel_service);
1402 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1403 ldlm_proc_cleanup();
1405 expired_lock_thread.elt_state = ELT_TERMINATE;
1406 wake_up(&expired_lock_thread.elt_waitq);
1407 wait_event(expired_lock_thread.elt_waitq,
1408 expired_lock_thread.elt_state == ELT_STOPPED);
1412 OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1418 int __init ldlm_init(void)
1420 ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1421 sizeof(struct ldlm_resource), 0,
1422 SLAB_HWCACHE_ALIGN, NULL, NULL);
1423 if (ldlm_resource_slab == NULL)
1426 ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1427 sizeof(struct ldlm_lock), 0,
1428 SLAB_HWCACHE_ALIGN, NULL, NULL);
1429 if (ldlm_lock_slab == NULL) {
1430 kmem_cache_destroy(ldlm_resource_slab);
1434 l_lock_init(&ldlm_handle_lock);
1439 void __exit ldlm_exit(void)
1441 if ( ldlm_refcount )
1442 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1443 LASSERTF(kmem_cache_destroy(ldlm_resource_slab) == 0,
1444 "couldn't free ldlm resource slab\n");
1445 LASSERTF(kmem_cache_destroy(ldlm_lock_slab) == 0,
1446 "couldn't free ldlm lock slab\n");
1450 EXPORT_SYMBOL(ldlm_flock_completion_ast);
1453 EXPORT_SYMBOL(ldlm_extent_shift_kms);
1456 EXPORT_SYMBOL(ldlm_get_processing_policy);
1457 EXPORT_SYMBOL(ldlm_lock2desc);
1458 EXPORT_SYMBOL(ldlm_register_intent);
1459 EXPORT_SYMBOL(ldlm_lockname);
1460 EXPORT_SYMBOL(ldlm_typename);
1461 EXPORT_SYMBOL(ldlm_lock2handle);
1462 EXPORT_SYMBOL(__ldlm_handle2lock);
1463 EXPORT_SYMBOL(ldlm_lock_get);
1464 EXPORT_SYMBOL(ldlm_lock_put);
1465 EXPORT_SYMBOL(ldlm_lock_match);
1466 EXPORT_SYMBOL(ldlm_lock_cancel);
1467 EXPORT_SYMBOL(ldlm_lock_addref);
1468 EXPORT_SYMBOL(ldlm_lock_decref);
1469 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
1470 EXPORT_SYMBOL(ldlm_lock_change_resource);
1471 EXPORT_SYMBOL(ldlm_lock_set_data);
1472 EXPORT_SYMBOL(ldlm_it2str);
1473 EXPORT_SYMBOL(ldlm_lock_dump);
1474 EXPORT_SYMBOL(ldlm_lock_dump_handle);
1475 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
1476 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1477 EXPORT_SYMBOL(ldlm_lock_allow_match);
1479 /* ldlm_request.c */
1480 EXPORT_SYMBOL(ldlm_completion_ast);
1481 EXPORT_SYMBOL(ldlm_expired_completion_wait);
1482 EXPORT_SYMBOL(ldlm_cli_convert);
1483 EXPORT_SYMBOL(ldlm_cli_enqueue);
1484 EXPORT_SYMBOL(ldlm_cli_cancel);
1485 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
1486 EXPORT_SYMBOL(ldlm_replay_locks);
1487 EXPORT_SYMBOL(ldlm_resource_foreach);
1488 EXPORT_SYMBOL(ldlm_namespace_foreach);
1489 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
1490 EXPORT_SYMBOL(ldlm_change_cbdata);
1493 EXPORT_SYMBOL(ldlm_server_blocking_ast);
1494 EXPORT_SYMBOL(ldlm_server_completion_ast);
1495 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
1496 EXPORT_SYMBOL(ldlm_handle_enqueue);
1497 EXPORT_SYMBOL(ldlm_handle_cancel);
1498 EXPORT_SYMBOL(ldlm_handle_convert);
1499 EXPORT_SYMBOL(ldlm_del_waiting_lock);
1500 EXPORT_SYMBOL(ldlm_get_ref);
1501 EXPORT_SYMBOL(ldlm_put_ref);
1505 EXPORT_SYMBOL(ldlm_test);
1506 EXPORT_SYMBOL(ldlm_regression_start);
1507 EXPORT_SYMBOL(ldlm_regression_stop);
1510 /* ldlm_resource.c */
1511 EXPORT_SYMBOL(ldlm_namespace_new);
1512 EXPORT_SYMBOL(ldlm_namespace_cleanup);
1513 EXPORT_SYMBOL(ldlm_namespace_free);
1514 EXPORT_SYMBOL(ldlm_namespace_dump);
1515 EXPORT_SYMBOL(ldlm_dump_all_namespaces);
1516 EXPORT_SYMBOL(ldlm_resource_get);
1517 EXPORT_SYMBOL(ldlm_resource_putref);
1520 EXPORT_SYMBOL(l_lock);
1521 EXPORT_SYMBOL(l_unlock);
1524 EXPORT_SYMBOL(client_obd_setup);
1525 EXPORT_SYMBOL(client_obd_cleanup);
1526 EXPORT_SYMBOL(client_connect_import);
1527 EXPORT_SYMBOL(client_disconnect_export);
1528 EXPORT_SYMBOL(target_abort_recovery);
1529 EXPORT_SYMBOL(target_cleanup_recovery);
1530 EXPORT_SYMBOL(target_handle_connect);
1531 EXPORT_SYMBOL(target_destroy_export);
1532 EXPORT_SYMBOL(target_cancel_recovery_timer);
1533 EXPORT_SYMBOL(target_send_reply);
1534 EXPORT_SYMBOL(target_queue_recovery_request);
1535 EXPORT_SYMBOL(target_handle_ping);
1536 EXPORT_SYMBOL(target_handle_disconnect);
1537 EXPORT_SYMBOL(target_queue_final_reply);