1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 # define EXPORT_SYMTAB
27 #define DEBUG_SUBSYSTEM S_LDLM
30 # include <linux/module.h>
31 # include <linux/slab.h>
32 # include <linux/init.h>
33 # include <linux/wait.h>
35 # include <liblustre.h>
38 #include <linux/lustre_dlm.h>
39 #include <linux/obd_class.h>
40 #include <portals/list.h>
41 #include "ldlm_internal.h"
43 extern kmem_cache_t *ldlm_resource_slab;
44 extern kmem_cache_t *ldlm_lock_slab;
45 extern struct lustre_lock ldlm_handle_lock;
46 extern struct list_head ldlm_namespace_list;
48 static DECLARE_MUTEX(ldlm_ref_sem);
49 static int ldlm_refcount;
53 static struct ldlm_state *ldlm_state;
55 inline unsigned long round_timeout(unsigned long timeout)
57 return ((timeout / HZ) + 1) * HZ;
61 /* XXX should this be per-ldlm? */
62 static struct list_head waiting_locks_list;
63 static spinlock_t waiting_locks_spinlock;
64 static struct timer_list waiting_locks_timer;
66 static struct expired_lock_thread {
67 wait_queue_head_t elt_waitq;
69 struct list_head elt_expired_locks;
71 } expired_lock_thread;
76 #define ELT_TERMINATE 2
80 struct list_head blp_list;
81 wait_queue_head_t blp_waitq;
82 atomic_t blp_num_threads;
83 struct completion blp_comp;
86 struct ldlm_bl_work_item {
87 struct list_head blwi_entry;
88 struct ldlm_namespace *blwi_ns;
89 struct ldlm_lock_desc blwi_ld;
90 struct ldlm_lock *blwi_lock;
95 static inline int have_expired_locks(void)
99 spin_lock_bh(&expired_lock_thread.elt_lock);
100 need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
101 spin_unlock_bh(&expired_lock_thread.elt_lock);
106 static int expired_lock_main(void *arg)
108 struct list_head *expired = &expired_lock_thread.elt_expired_locks;
109 struct l_wait_info lwi = { 0 };
114 kportal_daemonize("ldlm_elt");
116 SIGNAL_MASK_LOCK(current, flags);
117 sigfillset(¤t->blocked);
119 SIGNAL_MASK_UNLOCK(current, flags);
123 expired_lock_thread.elt_state = ELT_READY;
124 wake_up(&expired_lock_thread.elt_waitq);
127 l_wait_event(expired_lock_thread.elt_waitq,
128 have_expired_locks() ||
129 expired_lock_thread.elt_state == ELT_TERMINATE,
132 spin_lock_bh(&expired_lock_thread.elt_lock);
133 while (!list_empty(expired)) {
134 struct obd_export *export;
135 struct ldlm_lock *lock;
137 lock = list_entry(expired->next, struct ldlm_lock,
139 if ((void *)lock < LP_POISON + PAGE_SIZE &&
140 (void *)lock >= LP_POISON) {
141 CERROR("free lock on elt list %p\n", lock);
144 list_del_init(&lock->l_pending_chain);
145 if ((void *)lock->l_export < LP_POISON + PAGE_SIZE &&
146 (void *)lock->l_export >= LP_POISON + PAGE_SIZE) {
147 CERROR("lock with free export on elt list %p\n",
149 lock->l_export = NULL;
150 LDLM_ERROR(lock, "free export\n");
153 export = class_export_get(lock->l_export);
154 spin_unlock_bh(&expired_lock_thread.elt_lock);
156 ptlrpc_fail_export(export);
157 class_export_put(export);
158 spin_lock_bh(&expired_lock_thread.elt_lock);
160 spin_unlock_bh(&expired_lock_thread.elt_lock);
162 if (expired_lock_thread.elt_state == ELT_TERMINATE)
166 expired_lock_thread.elt_state = ELT_STOPPED;
167 wake_up(&expired_lock_thread.elt_waitq);
171 static void waiting_locks_callback(unsigned long unused)
173 struct ldlm_lock *lock;
174 char str[PTL_NALFMT_SIZE];
176 spin_lock_bh(&waiting_locks_spinlock);
177 while (!list_empty(&waiting_locks_list)) {
178 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
181 if (lock->l_callback_timeout > jiffies)
184 LDLM_ERROR(lock, "lock callback timer expired: evicting client "
186 lock->l_export->exp_client_uuid.uuid,
187 lock->l_export->exp_connection->c_remote_uuid.uuid,
188 ptlrpc_peernid2str(&lock->l_export->exp_connection->c_peer, str));
190 spin_lock_bh(&expired_lock_thread.elt_lock);
191 list_del(&lock->l_pending_chain);
192 list_add(&lock->l_pending_chain,
193 &expired_lock_thread.elt_expired_locks);
194 spin_unlock_bh(&expired_lock_thread.elt_lock);
195 wake_up(&expired_lock_thread.elt_waitq);
199 * Make sure the timer will fire again if we have any locks
202 if (!list_empty(&waiting_locks_list)) {
203 unsigned long timeout_rounded;
204 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
206 timeout_rounded = round_timeout(lock->l_callback_timeout);
207 mod_timer(&waiting_locks_timer, timeout_rounded);
209 spin_unlock_bh(&waiting_locks_spinlock);
213 * Indicate that we're waiting for a client to call us back cancelling a given
214 * lock. We add it to the pending-callback chain, and schedule the lock-timeout
215 * timer to fire appropriately. (We round up to the next second, to avoid
216 * floods of timer firings during periods of high lock contention and traffic).
218 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
220 unsigned long timeout_rounded;
222 spin_lock_bh(&waiting_locks_spinlock);
223 if (!list_empty(&lock->l_pending_chain)) {
224 LDLM_DEBUG(lock, "not re-adding to wait list");
225 spin_unlock_bh(&waiting_locks_spinlock);
228 LDLM_DEBUG(lock, "adding to wait list");
230 lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
232 timeout_rounded = round_timeout(lock->l_callback_timeout);
234 if (timeout_rounded < waiting_locks_timer.expires ||
235 !timer_pending(&waiting_locks_timer)) {
236 mod_timer(&waiting_locks_timer, timeout_rounded);
238 list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
239 spin_unlock_bh(&waiting_locks_spinlock);
244 * Remove a lock from the pending list, likely because it had its cancellation
245 * callback arrive without incident. This adjusts the lock-timeout timer if
246 * needed. Returns 0 if the lock wasn't pending after all, 1 if it was.
248 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
250 struct list_head *list_next;
252 if (lock->l_export == NULL) {
253 /* We don't have a "waiting locks list" on clients. */
254 LDLM_DEBUG(lock, "client lock: no-op");
258 spin_lock_bh(&waiting_locks_spinlock);
260 if (list_empty(&lock->l_pending_chain)) {
261 spin_unlock_bh(&waiting_locks_spinlock);
262 LDLM_DEBUG(lock, "wasn't waiting");
266 list_next = lock->l_pending_chain.next;
267 if (lock->l_pending_chain.prev == &waiting_locks_list) {
268 /* Removing the head of the list, adjust timer. */
269 if (list_next == &waiting_locks_list) {
270 /* No more, just cancel. */
271 del_timer(&waiting_locks_timer);
273 struct ldlm_lock *next;
274 next = list_entry(list_next, struct ldlm_lock,
276 mod_timer(&waiting_locks_timer,
277 round_timeout(next->l_callback_timeout));
281 /* the lock could already be expired, get the elt_lock also */
282 spin_lock_bh(&expired_lock_thread.elt_lock);
283 list_del_init(&lock->l_pending_chain);
284 spin_unlock_bh(&expired_lock_thread.elt_lock);
286 spin_unlock_bh(&waiting_locks_spinlock);
287 LDLM_DEBUG(lock, "removed");
291 #else /* !__KERNEL__ */
293 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
298 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
303 #endif /* __KERNEL__ */
305 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,const char *ast_type)
307 struct ptlrpc_connection *conn = lock->l_export->exp_connection;
308 char str[PTL_NALFMT_SIZE];
310 LDLM_ERROR(lock, "%s AST failed (%d): evicting client %s@%s NID "LPX64
311 " (%s)", ast_type, rc, lock->l_export->exp_client_uuid.uuid,
312 conn->c_remote_uuid.uuid, conn->c_peer.peer_id.nid,
313 ptlrpc_peernid2str(&conn->c_peer, str));
315 ptlrpc_fail_export(lock->l_export);
318 static int ldlm_handle_ast_error(struct ldlm_lock *lock,
319 struct ptlrpc_request *req, int rc,
320 const char *ast_type)
322 struct ptlrpc_peer *peer = &req->rq_import->imp_connection->c_peer;
323 char str[PTL_NALFMT_SIZE];
325 if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
326 LASSERT(lock->l_export);
327 if (lock->l_export->exp_libclient) {
328 LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
329 " timeout, just cancelling lock", ast_type,
330 ptlrpc_peernid2str(peer, str));
331 ldlm_lock_cancel(lock);
334 ldlm_del_waiting_lock(lock);
335 ldlm_failed_ast(lock, rc, ast_type);
339 LDLM_DEBUG(lock, "client (nid %s) returned %d"
340 " from %s AST - normal race",
341 ptlrpc_peernid2str(peer, str),
342 req->rq_repmsg->status, ast_type);
344 LDLM_ERROR(lock, "client (nid %s) returned %d "
345 "from %s AST", ptlrpc_peernid2str(peer, str),
346 (req->rq_repmsg != NULL) ?
347 req->rq_repmsg->status : 0, ast_type);
348 ldlm_lock_cancel(lock);
349 /* Server-side AST functions are called from ldlm_reprocess_all,
350 * which needs to be told to please restart its reprocessing. */
357 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
358 struct ldlm_lock_desc *desc,
359 void *data, int flag)
361 struct ldlm_request *body;
362 struct ptlrpc_request *req;
363 int rc = 0, size = sizeof(*body);
366 if (flag == LDLM_CB_CANCELING) {
367 /* Don't need to do anything here. */
373 l_lock(&lock->l_resource->lr_namespace->ns_lock);
374 if (lock->l_granted_mode != lock->l_req_mode) {
375 /* this blocking AST will be communicated as part of the
376 * completion AST instead */
377 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
378 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
382 if (lock->l_destroyed) {
383 /* What's the point? */
384 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
389 if (CURRENT_SECONDS - lock->l_export->exp_last_request_time > 30){
390 ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
391 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
396 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
397 LDLM_BL_CALLBACK, 1, &size, NULL);
399 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
403 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
404 memcpy(&body->lock_handle1, &lock->l_remote_handle,
405 sizeof(body->lock_handle1));
406 memcpy(&body->lock_desc, desc, sizeof(*desc));
407 body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
409 LDLM_DEBUG(lock, "server preparing blocking AST");
410 req->rq_replen = lustre_msg_size(0, NULL);
412 if (lock->l_granted_mode == lock->l_req_mode)
413 ldlm_add_waiting_lock(lock);
414 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
416 req->rq_send_state = LUSTRE_IMP_FULL;
417 req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
418 rc = ptlrpc_queue_wait(req);
420 rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
422 ptlrpc_req_finished(req);
427 /* XXX copied from ptlrpc/service.c */
428 static long timeval_sub(struct timeval *large, struct timeval *small)
430 return (large->tv_sec - small->tv_sec) * 1000000 +
431 (large->tv_usec - small->tv_usec);
434 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
436 struct ldlm_request *body;
437 struct ptlrpc_request *req;
438 struct timeval granted_time;
439 long total_enqueue_wait;
440 int rc = 0, size[2] = {sizeof(*body)}, buffers = 1;
443 LASSERT(lock != NULL);
445 do_gettimeofday(&granted_time);
446 total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time);
448 if (total_enqueue_wait / 1000000 > obd_timeout)
449 LDLM_ERROR(lock, "enqueue wait took %luus", total_enqueue_wait);
451 down(&lock->l_resource->lr_lvb_sem);
452 if (lock->l_resource->lr_lvb_len) {
454 size[1] = lock->l_resource->lr_lvb_len;
456 up(&lock->l_resource->lr_lvb_sem);
458 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
459 LDLM_CP_CALLBACK, buffers, size, NULL);
463 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
464 memcpy(&body->lock_handle1, &lock->l_remote_handle,
465 sizeof(body->lock_handle1));
466 body->lock_flags = flags;
467 ldlm_lock2desc(lock, &body->lock_desc);
472 down(&lock->l_resource->lr_lvb_sem);
473 lvb = lustre_msg_buf(req->rq_reqmsg, 1,
474 lock->l_resource->lr_lvb_len);
475 memcpy(lvb, lock->l_resource->lr_lvb_data,
476 lock->l_resource->lr_lvb_len);
477 up(&lock->l_resource->lr_lvb_sem);
480 LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
482 req->rq_replen = lustre_msg_size(0, NULL);
484 req->rq_send_state = LUSTRE_IMP_FULL;
485 req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
487 /* We only send real blocking ASTs after the lock is granted */
488 l_lock(&lock->l_resource->lr_namespace->ns_lock);
489 if (lock->l_flags & LDLM_FL_AST_SENT) {
490 body->lock_flags |= LDLM_FL_AST_SENT;
491 ldlm_add_waiting_lock(lock); /* start the lock-timeout clock */
493 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
495 rc = ptlrpc_queue_wait(req);
497 rc = ldlm_handle_ast_error(lock, req, rc, "completion");
499 ptlrpc_req_finished(req);
504 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
506 struct ldlm_resource *res = lock->l_resource;
507 struct ldlm_request *body;
508 struct ptlrpc_request *req;
509 int rc = 0, size = sizeof(*body);
512 LASSERT(lock != NULL);
514 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
515 LDLM_GL_CALLBACK, 1, &size, NULL);
519 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
520 memcpy(&body->lock_handle1, &lock->l_remote_handle,
521 sizeof(body->lock_handle1));
522 ldlm_lock2desc(lock, &body->lock_desc);
524 down(&lock->l_resource->lr_lvb_sem);
525 size = lock->l_resource->lr_lvb_len;
526 up(&lock->l_resource->lr_lvb_sem);
527 req->rq_replen = lustre_msg_size(1, &size);
529 req->rq_send_state = LUSTRE_IMP_FULL;
530 req->rq_timeout = ldlm_timeout; /* timeout for initial AST reply */
532 rc = ptlrpc_queue_wait(req);
533 if (rc == -ELDLM_NO_LOCK_DATA)
534 LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
536 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
538 rc = res->lr_namespace->ns_lvbo->lvbo_update
539 (res, req->rq_repmsg, 0, 1);
540 ptlrpc_req_finished(req);
544 static struct ldlm_lock *
545 find_existing_lock(struct obd_export *exp, struct lustre_handle *remote_hdl)
547 struct obd_device *obd = exp->exp_obd;
548 struct list_head *iter;
550 l_lock(&obd->obd_namespace->ns_lock);
551 list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
552 struct ldlm_lock *lock;
553 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
554 if (lock->l_remote_handle.cookie == remote_hdl->cookie) {
556 l_unlock(&obd->obd_namespace->ns_lock);
560 l_unlock(&obd->obd_namespace->ns_lock);
565 int ldlm_handle_enqueue(struct ptlrpc_request *req,
566 ldlm_completion_callback completion_callback,
567 ldlm_blocking_callback blocking_callback,
568 ldlm_glimpse_callback glimpse_callback)
570 struct obd_device *obddev = req->rq_export->exp_obd;
571 struct ldlm_reply *dlm_rep;
572 struct ldlm_request *dlm_req;
573 int rc = 0, size[2] = {sizeof(*dlm_rep)};
575 ldlm_error_t err = ELDLM_OK;
576 struct ldlm_lock *lock = NULL;
580 LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
582 dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
583 lustre_swab_ldlm_request);
584 if (dlm_req == NULL) {
585 CERROR ("Can't unpack dlm_req\n");
586 GOTO(out, rc = -EFAULT);
589 flags = dlm_req->lock_flags;
591 LASSERT(req->rq_export);
593 if (flags & LDLM_FL_REPLAY) {
594 lock = find_existing_lock(req->rq_export,
595 &dlm_req->lock_handle1);
597 DEBUG_REQ(D_HA, req, "found existing lock cookie "LPX64,
598 lock->l_handle.h_cookie);
599 GOTO(existing_lock, rc = 0);
603 /* The lock's callback data might be set in the policy function */
604 lock = ldlm_lock_create(obddev->obd_namespace, &dlm_req->lock_handle2,
605 dlm_req->lock_desc.l_resource.lr_name,
606 dlm_req->lock_desc.l_resource.lr_type,
607 dlm_req->lock_desc.l_req_mode,
608 blocking_callback, completion_callback,
609 glimpse_callback, NULL, 0);
611 GOTO(out, rc = -ENOMEM);
613 do_gettimeofday(&lock->l_enqueued_time);
614 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
615 sizeof(lock->l_remote_handle));
616 LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
618 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
619 l_lock(&lock->l_resource->lr_namespace->ns_lock);
620 if (req->rq_export->exp_failed) {
621 LDLM_ERROR(lock,"lock on destroyed export %p\n",req->rq_export);
622 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
623 GOTO(out, rc = -ENOTCONN);
625 lock->l_export = class_export_get(req->rq_export);
626 list_add(&lock->l_export_chain,
627 &lock->l_export->exp_ldlm_data.led_held_locks);
628 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
632 if (flags & LDLM_FL_HAS_INTENT) {
633 /* In this case, the reply buffer is allocated deep in
634 * local_lock_enqueue by the policy function. */
639 down(&lock->l_resource->lr_lvb_sem);
640 if (lock->l_resource->lr_lvb_len) {
641 size[1] = lock->l_resource->lr_lvb_len;
644 up(&lock->l_resource->lr_lvb_sem);
646 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
647 GOTO(out, rc = -ENOMEM);
649 rc = lustre_pack_reply(req, buffers, size, NULL);
654 if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
655 memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
656 sizeof(ldlm_policy_data_t));
657 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
658 memcpy(&lock->l_req_extent, &lock->l_policy_data.l_extent,
659 sizeof(lock->l_req_extent));
661 err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
665 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
666 dlm_rep->lock_flags = flags;
668 ldlm_lock2desc(lock, &dlm_rep->lock_desc);
669 ldlm_lock2handle(lock, &dlm_rep->lock_handle);
671 /* We never send a blocking AST until the lock is granted, but
672 * we can tell it right now */
673 l_lock(&lock->l_resource->lr_namespace->ns_lock);
674 if (lock->l_flags & LDLM_FL_AST_SENT) {
675 dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
676 if (lock->l_granted_mode == lock->l_req_mode)
677 ldlm_add_waiting_lock(lock);
679 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
683 req->rq_status = err;
684 if (req->rq_reply_state == NULL) {
685 err = lustre_pack_reply(req, 0, NULL, NULL);
691 /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
692 * ldlm_reprocess_all. If this moves, revisit that code. -phil */
694 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
695 "(err=%d, rc=%d)", err, rc);
698 down(&lock->l_resource->lr_lvb_sem);
699 size[1] = lock->l_resource->lr_lvb_len;
701 void *lvb = lustre_msg_buf(req->rq_repmsg,
703 LASSERTF(lvb != NULL, "req %p, lock %p\n",
706 memcpy(lvb, lock->l_resource->lr_lvb_data,
709 up(&lock->l_resource->lr_lvb_sem);
711 ldlm_lock_destroy(lock);
714 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
715 ldlm_reprocess_all(lock->l_resource);
718 LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
724 int ldlm_handle_convert(struct ptlrpc_request *req)
726 struct ldlm_request *dlm_req;
727 struct ldlm_reply *dlm_rep;
728 struct ldlm_lock *lock;
729 int rc, size = sizeof(*dlm_rep);
732 dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
733 lustre_swab_ldlm_request);
734 if (dlm_req == NULL) {
735 CERROR ("Can't unpack dlm_req\n");
739 rc = lustre_pack_reply(req, 1, &size, NULL);
741 CERROR("out of memory\n");
744 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
745 dlm_rep->lock_flags = dlm_req->lock_flags;
747 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
749 req->rq_status = EINVAL;
751 LDLM_DEBUG(lock, "server-side convert handler START");
752 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
753 &dlm_rep->lock_flags);
754 if (ldlm_del_waiting_lock(lock))
755 CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
760 ldlm_reprocess_all(lock->l_resource);
761 LDLM_DEBUG(lock, "server-side convert handler END");
764 LDLM_DEBUG_NOLOCK("server-side convert handler END");
769 int ldlm_handle_cancel(struct ptlrpc_request *req)
771 struct ldlm_request *dlm_req;
772 struct ldlm_lock *lock;
773 struct ldlm_resource *res;
777 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
778 lustre_swab_ldlm_request);
779 if (dlm_req == NULL) {
780 CERROR("bad request buffer for cancel\n");
784 rc = lustre_pack_reply(req, 0, NULL, NULL);
786 CERROR("out of memory\n");
790 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
792 CERROR("received cancel for unknown lock cookie "LPX64
793 " from client %s id %s\n",
794 dlm_req->lock_handle1.cookie,
795 req->rq_export->exp_client_uuid.uuid,
797 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
799 dlm_req->lock_handle1.cookie);
800 req->rq_status = ESTALE;
802 LDLM_DEBUG(lock, "server-side cancel handler START");
803 res = lock->l_resource;
804 if (res && res->lr_namespace->ns_lvbo &&
805 res->lr_namespace->ns_lvbo->lvbo_update) {
806 (void)res->lr_namespace->ns_lvbo->lvbo_update
808 //(res, req->rq_reqmsg, 1);
811 ldlm_lock_cancel(lock);
812 if (ldlm_del_waiting_lock(lock))
813 CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
817 if (ptlrpc_reply(req) != 0)
821 ldlm_reprocess_all(lock->l_resource);
822 LDLM_DEBUG(lock, "server-side cancel handler END");
829 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
830 struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
835 l_lock(&ns->ns_lock);
836 LDLM_DEBUG(lock, "client blocking AST callback handler START");
838 lock->l_flags |= LDLM_FL_CBPENDING;
839 do_ast = (!lock->l_readers && !lock->l_writers);
842 LDLM_DEBUG(lock, "already unused, calling "
843 "callback (%p)", lock->l_blocking_ast);
844 if (lock->l_blocking_ast != NULL) {
845 l_unlock(&ns->ns_lock);
846 l_check_no_ns_lock(ns);
847 lock->l_blocking_ast(lock, ld, lock->l_ast_data,
849 l_lock(&ns->ns_lock);
852 LDLM_DEBUG(lock, "Lock still has references, will be"
856 LDLM_DEBUG(lock, "client blocking callback handler END");
857 l_unlock(&ns->ns_lock);
862 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
863 struct ldlm_namespace *ns,
864 struct ldlm_request *dlm_req,
865 struct ldlm_lock *lock)
870 l_lock(&ns->ns_lock);
871 LDLM_DEBUG(lock, "client completion callback handler START");
873 /* If we receive the completion AST before the actual enqueue returned,
874 * then we might need to switch lock modes, resources, or extents. */
875 if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
876 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
877 LDLM_DEBUG(lock, "completion AST, new lock mode");
880 if (lock->l_resource->lr_type != LDLM_PLAIN) {
881 memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
882 sizeof(lock->l_policy_data));
883 LDLM_DEBUG(lock, "completion AST, new policy data");
886 ldlm_resource_unlink_lock(lock);
887 if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
888 &lock->l_resource->lr_name,
889 sizeof(lock->l_resource->lr_name)) != 0) {
890 ldlm_lock_change_resource(ns, lock,
891 dlm_req->lock_desc.l_resource.lr_name);
892 LDLM_DEBUG(lock, "completion AST, new resource");
895 if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
896 lock->l_flags |= LDLM_FL_CBPENDING;
897 LDLM_DEBUG(lock, "completion AST includes blocking AST");
900 if (lock->l_lvb_len) {
902 lvb = lustre_swab_reqbuf(req, 1, lock->l_lvb_len,
903 lock->l_lvb_swabber);
905 LDLM_ERROR(lock, "completion AST did not contain "
908 memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
912 lock->l_resource->lr_tmp = &ast_list;
913 ldlm_grant_lock(lock, req, sizeof(*req), 1);
914 lock->l_resource->lr_tmp = NULL;
915 LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
916 l_unlock(&ns->ns_lock);
919 ldlm_run_ast_work(ns, &ast_list);
921 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
926 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
927 struct ldlm_namespace *ns,
928 struct ldlm_request *dlm_req,
929 struct ldlm_lock *lock)
934 l_lock(&ns->ns_lock);
935 LDLM_DEBUG(lock, "client glimpse AST callback handler");
937 if (lock->l_glimpse_ast != NULL) {
938 l_unlock(&ns->ns_lock);
939 l_check_no_ns_lock(ns);
940 rc = lock->l_glimpse_ast(lock, req);
941 l_lock(&ns->ns_lock);
944 if (req->rq_repmsg != NULL) {
951 l_unlock(&ns->ns_lock);
952 if (lock->l_granted_mode == LCK_PW &&
953 !lock->l_readers && !lock->l_writers &&
954 time_after(jiffies, lock->l_last_used + 10 * HZ)) {
955 if (ldlm_bl_to_thread(ns, NULL, lock))
956 ldlm_handle_bl_callback(ns, NULL, lock);
965 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
968 if (req->rq_reply_state == NULL) {
969 rc = lustre_pack_reply(req, 0, NULL, NULL);
973 return ptlrpc_reply(req);
976 int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
977 struct ldlm_lock *lock)
980 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
981 struct ldlm_bl_work_item *blwi;
984 OBD_ALLOC(blwi, sizeof(*blwi));
991 blwi->blwi_lock = lock;
993 spin_lock(&blp->blp_lock);
994 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
995 wake_up(&blp->blp_waitq);
996 spin_unlock(&blp->blp_lock);
1004 static int ldlm_callback_handler(struct ptlrpc_request *req)
1006 struct ldlm_namespace *ns;
1007 struct ldlm_request *dlm_req;
1008 struct ldlm_lock *lock;
1012 /* Requests arrive in sender's byte order. The ptlrpc service
1013 * handler has already checked and, if necessary, byte-swapped the
1014 * incoming request message body, but I am responsible for the
1015 * message buffers. */
1017 if (req->rq_export == NULL) {
1018 struct ldlm_request *dlm_req;
1020 CDEBUG(D_RPCTRACE, "operation %d from %s with bad "
1021 "export cookie "LPX64"; this is "
1022 "normal if this node rebooted with a lock held\n",
1023 req->rq_reqmsg->opc,
1025 req->rq_reqmsg->handle.cookie);
1027 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
1028 lustre_swab_ldlm_request);
1029 if (dlm_req != NULL)
1030 CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
1031 dlm_req->lock_handle1.cookie);
1033 ldlm_callback_reply(req, -ENOTCONN);
1037 LASSERT(req->rq_export != NULL);
1038 LASSERT(req->rq_export->exp_obd != NULL);
1040 switch(req->rq_reqmsg->opc) {
1041 case LDLM_BL_CALLBACK:
1042 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1044 case LDLM_CP_CALLBACK:
1045 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
1047 case LDLM_GL_CALLBACK:
1048 OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
1050 case OBD_LOG_CANCEL:
1051 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1052 rc = llog_origin_handle_cancel(req);
1053 ldlm_callback_reply(req, rc);
1055 case LLOG_ORIGIN_HANDLE_CREATE:
1056 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1057 rc = llog_origin_handle_create(req);
1058 ldlm_callback_reply(req, rc);
1060 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1061 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1062 rc = llog_origin_handle_next_block(req);
1063 ldlm_callback_reply(req, rc);
1065 case LLOG_ORIGIN_HANDLE_READ_HEADER:
1066 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1067 rc = llog_origin_handle_read_header(req);
1068 ldlm_callback_reply(req, rc);
1070 case LLOG_ORIGIN_HANDLE_CLOSE:
1071 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1072 rc = llog_origin_handle_close(req);
1073 ldlm_callback_reply(req, rc);
1076 CERROR("unknown opcode %u\n", req->rq_reqmsg->opc);
1077 ldlm_callback_reply(req, -EPROTO);
1081 ns = req->rq_export->exp_obd->obd_namespace;
1082 LASSERT(ns != NULL);
1084 dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
1085 lustre_swab_ldlm_request);
1086 if (dlm_req == NULL) {
1087 CERROR ("can't unpack dlm_req\n");
1088 ldlm_callback_reply (req, -EPROTO);
1092 lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
1094 CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
1095 dlm_req->lock_handle1.cookie);
1096 ldlm_callback_reply(req, -EINVAL);
1100 /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
1101 lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
1103 /* We want the ost thread to get this reply so that it can respond
1104 * to ost requests (write cache writeback) that might be triggered
1107 * But we'd also like to be able to indicate in the reply that we're
1108 * cancelling right now, because it's unused, or have an intent result
1109 * in the reply, so we might have to push the responsibility for sending
1110 * the reply down into the AST handlers, alas. */
1112 switch (req->rq_reqmsg->opc) {
1113 case LDLM_BL_CALLBACK:
1114 CDEBUG(D_INODE, "blocking ast\n");
1115 ldlm_callback_reply(req, 0);
1116 if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock))
1117 ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
1119 case LDLM_CP_CALLBACK:
1120 CDEBUG(D_INODE, "completion ast\n");
1121 ldlm_callback_reply(req, 0);
1122 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
1124 case LDLM_GL_CALLBACK:
1125 CDEBUG(D_INODE, "glimpse ast\n");
1126 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
1129 LBUG(); /* checked above */
1135 static int ldlm_cancel_handler(struct ptlrpc_request *req)
1140 /* Requests arrive in sender's byte order. The ptlrpc service
1141 * handler has already checked and, if necessary, byte-swapped the
1142 * incoming request message body, but I am responsible for the
1143 * message buffers. */
1145 if (req->rq_export == NULL) {
1146 struct ldlm_request *dlm_req;
1147 CERROR("operation %d with bad export from %s\n",
1148 req->rq_reqmsg->opc,
1150 CERROR("--> export cookie: "LPX64"\n",
1151 req->rq_reqmsg->handle.cookie);
1152 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
1153 lustre_swab_ldlm_request);
1154 if (dlm_req != NULL)
1155 ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
1156 ldlm_callback_reply(req, -ENOTCONN);
1160 switch (req->rq_reqmsg->opc) {
1162 /* XXX FIXME move this back to mds/handler.c, bug 249 */
1164 CDEBUG(D_INODE, "cancel\n");
1165 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1166 rc = ldlm_handle_cancel(req);
1172 CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
1173 ldlm_callback_reply(req, -EINVAL);
1180 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
1182 struct ldlm_bl_work_item *blwi = NULL;
1184 spin_lock(&blp->blp_lock);
1185 if (!list_empty(&blp->blp_list)) {
1186 blwi = list_entry(blp->blp_list.next, struct ldlm_bl_work_item,
1188 list_del(&blwi->blwi_entry);
1190 spin_unlock(&blp->blp_lock);
1195 struct ldlm_bl_thread_data {
1197 struct ldlm_bl_pool *bltd_blp;
1200 static int ldlm_bl_thread_main(void *arg)
1202 struct ldlm_bl_thread_data *bltd = arg;
1203 struct ldlm_bl_pool *blp = bltd->bltd_blp;
1204 unsigned long flags;
1207 /* XXX boiler-plate */
1209 char name[sizeof(current->comm)];
1210 snprintf(name, sizeof(name) - 1, "ldlm_bl_%02d",
1212 kportal_daemonize(name);
1214 SIGNAL_MASK_LOCK(current, flags);
1215 sigfillset(¤t->blocked);
1217 SIGNAL_MASK_UNLOCK(current, flags);
1219 atomic_inc(&blp->blp_num_threads);
1220 complete(&blp->blp_comp);
1223 struct l_wait_info lwi = { 0 };
1224 struct ldlm_bl_work_item *blwi = NULL;
1226 l_wait_event_exclusive(blp->blp_waitq,
1227 (blwi = ldlm_bl_get_work(blp)) != NULL,
1230 if (blwi->blwi_ns == NULL)
1233 ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
1235 OBD_FREE(blwi, sizeof(*blwi));
1238 atomic_dec(&blp->blp_num_threads);
1239 complete(&blp->blp_comp);
1245 static int ldlm_setup(void);
1246 static int ldlm_cleanup(int force);
1248 int ldlm_get_ref(void)
1251 down(&ldlm_ref_sem);
1252 if (++ldlm_refcount == 1) {
1262 void ldlm_put_ref(int force)
1264 down(&ldlm_ref_sem);
1265 if (ldlm_refcount == 1) {
1266 int rc = ldlm_cleanup(force);
1268 CERROR("ldlm_cleanup failed: %d\n", rc);
1279 static int ldlm_setup(void)
1281 struct ldlm_bl_pool *blp;
1288 if (ldlm_state != NULL)
1291 OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
1292 if (ldlm_state == NULL)
1296 rc = ldlm_proc_setup();
1301 ldlm_state->ldlm_cb_service =
1302 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
1303 LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1304 1500, ldlm_callback_handler, "ldlm_cbd",
1307 if (!ldlm_state->ldlm_cb_service) {
1308 CERROR("failed to start service\n");
1309 GOTO(out_proc, rc = -ENOMEM);
1312 ldlm_state->ldlm_cancel_service =
1313 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
1314 LDLM_CANCEL_REQUEST_PORTAL,
1315 LDLM_CANCEL_REPLY_PORTAL, 30000,
1316 ldlm_cancel_handler, "ldlm_canceld",
1319 if (!ldlm_state->ldlm_cancel_service) {
1320 CERROR("failed to start service\n");
1321 GOTO(out_proc, rc = -ENOMEM);
1324 OBD_ALLOC(blp, sizeof(*blp));
1326 GOTO(out_proc, rc = -ENOMEM);
1327 ldlm_state->ldlm_bl_pool = blp;
1329 atomic_set(&blp->blp_num_threads, 0);
1330 init_waitqueue_head(&blp->blp_waitq);
1331 spin_lock_init(&blp->blp_lock);
1333 INIT_LIST_HEAD(&blp->blp_list);
1336 for (i = 0; i < LDLM_NUM_THREADS; i++) {
1337 struct ldlm_bl_thread_data bltd = {
1341 init_completion(&blp->blp_comp);
1342 rc = kernel_thread(ldlm_bl_thread_main, &bltd, 0);
1344 CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
1345 GOTO(out_thread, rc);
1347 wait_for_completion(&blp->blp_comp);
1350 rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service,
1351 LDLM_NUM_THREADS, "ldlm_cn");
1353 GOTO(out_thread, rc);
1355 rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service,
1356 LDLM_NUM_THREADS, "ldlm_cb");
1358 GOTO(out_thread, rc);
1360 INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
1361 spin_lock_init(&expired_lock_thread.elt_lock);
1362 expired_lock_thread.elt_state = ELT_STOPPED;
1363 init_waitqueue_head(&expired_lock_thread.elt_waitq);
1365 rc = kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FS);
1367 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
1368 GOTO(out_thread, rc);
1371 wait_event(expired_lock_thread.elt_waitq,
1372 expired_lock_thread.elt_state == ELT_READY);
1374 INIT_LIST_HEAD(&waiting_locks_list);
1375 spin_lock_init(&waiting_locks_spinlock);
1376 waiting_locks_timer.function = waiting_locks_callback;
1377 waiting_locks_timer.data = 0;
1378 init_timer(&waiting_locks_timer);
1385 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1386 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1391 ldlm_proc_cleanup();
1394 OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1399 static int ldlm_cleanup(int force)
1402 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1406 if (!list_empty(&ldlm_namespace_list)) {
1407 CERROR("ldlm still has namespaces; clean these up first.\n");
1408 ldlm_dump_all_namespaces(D_DLMTRACE);
1413 while (atomic_read(&blp->blp_num_threads) > 0) {
1414 struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1416 init_completion(&blp->blp_comp);
1418 spin_lock(&blp->blp_lock);
1419 list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1420 wake_up(&blp->blp_waitq);
1421 spin_unlock(&blp->blp_lock);
1423 wait_for_completion(&blp->blp_comp);
1425 OBD_FREE(blp, sizeof(*blp));
1427 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1428 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1429 ldlm_proc_cleanup();
1431 expired_lock_thread.elt_state = ELT_TERMINATE;
1432 wake_up(&expired_lock_thread.elt_waitq);
1433 wait_event(expired_lock_thread.elt_waitq,
1434 expired_lock_thread.elt_state == ELT_STOPPED);
1438 OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1444 int __init ldlm_init(void)
1446 ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1447 sizeof(struct ldlm_resource), 0,
1448 SLAB_HWCACHE_ALIGN, NULL, NULL);
1449 if (ldlm_resource_slab == NULL)
1452 ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1453 sizeof(struct ldlm_lock), 0,
1454 SLAB_HWCACHE_ALIGN, NULL, NULL);
1455 if (ldlm_lock_slab == NULL) {
1456 kmem_cache_destroy(ldlm_resource_slab);
1460 l_lock_init(&ldlm_handle_lock);
1465 void __exit ldlm_exit(void)
1467 if ( ldlm_refcount )
1468 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1469 LASSERTF(kmem_cache_destroy(ldlm_resource_slab) == 0,
1470 "couldn't free ldlm resource slab\n");
1471 LASSERTF(kmem_cache_destroy(ldlm_lock_slab) == 0,
1472 "couldn't free ldlm lock slab\n");
1476 EXPORT_SYMBOL(ldlm_flock_completion_ast);
1479 EXPORT_SYMBOL(ldlm_extent_shift_kms);
1482 EXPORT_SYMBOL(ldlm_get_processing_policy);
1483 EXPORT_SYMBOL(ldlm_lock2desc);
1484 EXPORT_SYMBOL(ldlm_register_intent);
1485 EXPORT_SYMBOL(ldlm_lockname);
1486 EXPORT_SYMBOL(ldlm_typename);
1487 EXPORT_SYMBOL(ldlm_lock2handle);
1488 EXPORT_SYMBOL(__ldlm_handle2lock);
1489 EXPORT_SYMBOL(ldlm_lock_get);
1490 EXPORT_SYMBOL(ldlm_lock_put);
1491 EXPORT_SYMBOL(ldlm_lock_match);
1492 EXPORT_SYMBOL(ldlm_lock_cancel);
1493 EXPORT_SYMBOL(ldlm_lock_addref);
1494 EXPORT_SYMBOL(ldlm_lock_decref);
1495 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
1496 EXPORT_SYMBOL(ldlm_lock_change_resource);
1497 EXPORT_SYMBOL(ldlm_lock_set_data);
1498 EXPORT_SYMBOL(ldlm_it2str);
1499 EXPORT_SYMBOL(ldlm_lock_dump);
1500 EXPORT_SYMBOL(ldlm_lock_dump_handle);
1501 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
1502 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1503 EXPORT_SYMBOL(ldlm_lock_allow_match);
1505 /* ldlm_request.c */
1506 EXPORT_SYMBOL(ldlm_completion_ast);
1507 EXPORT_SYMBOL(ldlm_expired_completion_wait);
1508 EXPORT_SYMBOL(ldlm_cli_convert);
1509 EXPORT_SYMBOL(ldlm_cli_enqueue);
1510 EXPORT_SYMBOL(ldlm_cli_cancel);
1511 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
1512 EXPORT_SYMBOL(ldlm_replay_locks);
1513 EXPORT_SYMBOL(ldlm_resource_foreach);
1514 EXPORT_SYMBOL(ldlm_namespace_foreach);
1515 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
1516 EXPORT_SYMBOL(ldlm_change_cbdata);
1519 EXPORT_SYMBOL(ldlm_server_blocking_ast);
1520 EXPORT_SYMBOL(ldlm_server_completion_ast);
1521 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
1522 EXPORT_SYMBOL(ldlm_handle_enqueue);
1523 EXPORT_SYMBOL(ldlm_handle_cancel);
1524 EXPORT_SYMBOL(ldlm_handle_convert);
1525 EXPORT_SYMBOL(ldlm_del_waiting_lock);
1526 EXPORT_SYMBOL(ldlm_get_ref);
1527 EXPORT_SYMBOL(ldlm_put_ref);
1531 EXPORT_SYMBOL(ldlm_test);
1532 EXPORT_SYMBOL(ldlm_regression_start);
1533 EXPORT_SYMBOL(ldlm_regression_stop);
1536 /* ldlm_resource.c */
1537 EXPORT_SYMBOL(ldlm_namespace_new);
1538 EXPORT_SYMBOL(ldlm_namespace_cleanup);
1539 EXPORT_SYMBOL(ldlm_namespace_free);
1540 EXPORT_SYMBOL(ldlm_namespace_dump);
1541 EXPORT_SYMBOL(ldlm_dump_all_namespaces);
1542 EXPORT_SYMBOL(ldlm_resource_get);
1543 EXPORT_SYMBOL(ldlm_resource_putref);
1546 EXPORT_SYMBOL(l_lock);
1547 EXPORT_SYMBOL(l_unlock);
1550 EXPORT_SYMBOL(client_obd_setup);
1551 EXPORT_SYMBOL(client_obd_cleanup);
1552 EXPORT_SYMBOL(client_connect_import);
1553 EXPORT_SYMBOL(client_disconnect_export);
1554 EXPORT_SYMBOL(target_abort_recovery);
1555 EXPORT_SYMBOL(target_cleanup_recovery);
1556 EXPORT_SYMBOL(target_handle_connect);
1557 EXPORT_SYMBOL(target_destroy_export);
1558 EXPORT_SYMBOL(target_cancel_recovery_timer);
1559 EXPORT_SYMBOL(target_send_reply);
1560 EXPORT_SYMBOL(target_queue_recovery_request);
1561 EXPORT_SYMBOL(target_handle_ping);
1562 EXPORT_SYMBOL(target_handle_disconnect);
1563 EXPORT_SYMBOL(target_queue_final_reply);