1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 # define EXPORT_SYMTAB
27 #define DEBUG_SUBSYSTEM S_LDLM
30 # include <linux/module.h>
31 # include <linux/slab.h>
32 # include <linux/init.h>
33 # include <linux/wait.h>
35 # include <liblustre.h>
38 #include <linux/lustre_dlm.h>
39 #include <linux/obd_class.h>
40 #include <portals/list.h>
41 #include "ldlm_internal.h"
43 extern kmem_cache_t *ldlm_resource_slab;
44 extern kmem_cache_t *ldlm_lock_slab;
45 extern struct lustre_lock ldlm_handle_lock;
46 extern struct list_head ldlm_namespace_list;
47 extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
48 extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
50 static DECLARE_MUTEX(ldlm_ref_sem);
51 static int ldlm_refcount = 0;
55 static struct ldlm_state *ldlm_state;
57 inline unsigned long round_timeout(unsigned long timeout)
59 return ((timeout / HZ) + 1) * HZ;
63 /* XXX should this be per-ldlm? */
64 static struct list_head waiting_locks_list;
65 static spinlock_t waiting_locks_spinlock;
66 static struct timer_list waiting_locks_timer;
68 static struct expired_lock_thread {
69 wait_queue_head_t elt_waitq;
71 struct list_head elt_expired_locks;
73 } expired_lock_thread;
78 #define ELT_TERMINATE 2
82 struct list_head blp_list;
83 wait_queue_head_t blp_waitq;
84 atomic_t blp_num_threads;
85 struct completion blp_comp;
88 struct ldlm_bl_work_item {
89 struct list_head blwi_entry;
90 struct ldlm_namespace *blwi_ns;
91 struct ldlm_lock_desc blwi_ld;
92 struct ldlm_lock *blwi_lock;
97 static inline int have_expired_locks(void)
101 spin_lock_bh(&expired_lock_thread.elt_lock);
102 need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
103 spin_unlock_bh(&expired_lock_thread.elt_lock);
108 static int expired_lock_main(void *arg)
110 struct list_head *expired = &expired_lock_thread.elt_expired_locks;
111 struct l_wait_info lwi = { 0 };
116 kportal_daemonize("ldlm_elt");
118 SIGNAL_MASK_LOCK(current, flags);
119 sigfillset(¤t->blocked);
121 SIGNAL_MASK_UNLOCK(current, flags);
125 expired_lock_thread.elt_state = ELT_READY;
126 wake_up(&expired_lock_thread.elt_waitq);
129 l_wait_event(expired_lock_thread.elt_waitq,
130 have_expired_locks() ||
131 expired_lock_thread.elt_state == ELT_TERMINATE,
134 spin_lock_bh(&expired_lock_thread.elt_lock);
135 while (!list_empty(expired)) {
136 struct obd_export *export;
137 struct ldlm_lock *lock;
139 lock = list_entry(expired->next, struct ldlm_lock,
141 if ((void *)lock < LP_POISON + PAGE_SIZE &&
142 (void *)lock >= LP_POISON) {
143 CERROR("free lock on elt list %p\n", lock);
146 list_del_init(&lock->l_pending_chain);
147 if ((void *)lock->l_export < LP_POISON + PAGE_SIZE &&
148 (void *)lock->l_export >= LP_POISON + PAGE_SIZE) {
149 CERROR("lock with free export on elt list %p\n",
151 lock->l_export = NULL;
152 LDLM_ERROR(lock, "free export\n");
155 export = class_export_get(lock->l_export);
156 spin_unlock_bh(&expired_lock_thread.elt_lock);
158 ptlrpc_fail_export(export);
159 class_export_put(export);
160 spin_lock_bh(&expired_lock_thread.elt_lock);
162 spin_unlock_bh(&expired_lock_thread.elt_lock);
164 if (expired_lock_thread.elt_state == ELT_TERMINATE)
168 expired_lock_thread.elt_state = ELT_STOPPED;
169 wake_up(&expired_lock_thread.elt_waitq);
173 static void waiting_locks_callback(unsigned long unused)
175 struct ldlm_lock *lock;
176 char str[PTL_NALFMT_SIZE];
178 spin_lock_bh(&waiting_locks_spinlock);
179 while (!list_empty(&waiting_locks_list)) {
180 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
183 if ((lock->l_callback_timeout > jiffies) ||
184 (lock->l_req_mode == LCK_GROUP))
187 LDLM_ERROR(lock, "lock callback timer expired: evicting client "
189 lock->l_export->exp_client_uuid.uuid,
190 lock->l_export->exp_connection->c_remote_uuid.uuid,
191 ptlrpc_peernid2str(&lock->l_export->exp_connection->c_peer, str));
193 spin_lock_bh(&expired_lock_thread.elt_lock);
194 list_del(&lock->l_pending_chain);
195 list_add(&lock->l_pending_chain,
196 &expired_lock_thread.elt_expired_locks);
197 spin_unlock_bh(&expired_lock_thread.elt_lock);
198 wake_up(&expired_lock_thread.elt_waitq);
202 * Make sure the timer will fire again if we have any locks
205 if (!list_empty(&waiting_locks_list)) {
206 unsigned long timeout_rounded;
207 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
209 timeout_rounded = round_timeout(lock->l_callback_timeout);
210 mod_timer(&waiting_locks_timer, timeout_rounded);
212 spin_unlock_bh(&waiting_locks_spinlock);
216 * Indicate that we're waiting for a client to call us back cancelling a given
217 * lock. We add it to the pending-callback chain, and schedule the lock-timeout
218 * timer to fire appropriately. (We round up to the next second, to avoid
219 * floods of timer firings during periods of high lock contention and traffic).
221 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
223 unsigned long timeout_rounded;
225 spin_lock_bh(&waiting_locks_spinlock);
226 if (!list_empty(&lock->l_pending_chain)) {
227 LDLM_DEBUG(lock, "not re-adding to wait list");
228 spin_unlock_bh(&waiting_locks_spinlock);
231 LDLM_DEBUG(lock, "adding to wait list");
233 lock->l_callback_timeout = jiffies + (obd_timeout * HZ / 2);
235 timeout_rounded = round_timeout(lock->l_callback_timeout);
237 if (timeout_rounded < waiting_locks_timer.expires ||
238 !timer_pending(&waiting_locks_timer)) {
239 mod_timer(&waiting_locks_timer, timeout_rounded);
241 list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
242 spin_unlock_bh(&waiting_locks_spinlock);
247 * Remove a lock from the pending list, likely because it had its cancellation
248 * callback arrive without incident. This adjusts the lock-timeout timer if
249 * needed. Returns 0 if the lock wasn't pending after all, 1 if it was.
251 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
253 struct list_head *list_next;
255 if (lock->l_export == NULL) {
256 /* We don't have a "waiting locks list" on clients. */
257 LDLM_DEBUG(lock, "client lock: no-op");
261 spin_lock_bh(&waiting_locks_spinlock);
263 if (list_empty(&lock->l_pending_chain)) {
264 spin_unlock_bh(&waiting_locks_spinlock);
265 LDLM_DEBUG(lock, "wasn't waiting");
269 list_next = lock->l_pending_chain.next;
270 if (lock->l_pending_chain.prev == &waiting_locks_list) {
271 /* Removing the head of the list, adjust timer. */
272 if (list_next == &waiting_locks_list) {
273 /* No more, just cancel. */
274 del_timer(&waiting_locks_timer);
276 struct ldlm_lock *next;
277 next = list_entry(list_next, struct ldlm_lock,
279 mod_timer(&waiting_locks_timer,
280 round_timeout(next->l_callback_timeout));
284 spin_lock_bh(&expired_lock_thread.elt_lock);
285 list_del_init(&lock->l_pending_chain);
286 spin_unlock_bh(&expired_lock_thread.elt_lock);
288 spin_unlock_bh(&waiting_locks_spinlock);
289 LDLM_DEBUG(lock, "removed");
293 #else /* !__KERNEL__ */
295 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
300 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
305 #endif /* __KERNEL__ */
307 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,const char *ast_type)
309 struct ptlrpc_connection *conn = lock->l_export->exp_connection;
310 char str[PTL_NALFMT_SIZE];
312 LDLM_ERROR(lock, "%s AST failed (%d): evicting client %s@%s NID "LPX64
313 " (%s)", ast_type, rc, lock->l_export->exp_client_uuid.uuid,
314 conn->c_remote_uuid.uuid, conn->c_peer.peer_nid,
315 ptlrpc_peernid2str(&conn->c_peer, str));
316 ptlrpc_fail_export(lock->l_export);
319 static int ldlm_handle_ast_error(struct ldlm_lock *lock,
320 struct ptlrpc_request *req, int rc,
321 const char *ast_type)
323 struct ptlrpc_peer *peer = &req->rq_import->imp_connection->c_peer;
324 char str[PTL_NALFMT_SIZE];
326 if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
327 LASSERT(lock->l_export);
328 if (lock->l_export->exp_libclient) {
329 LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
330 " timeout, just cancelling lock", ast_type,
331 ptlrpc_peernid2str(peer, str));
332 ldlm_lock_cancel(lock);
335 ldlm_del_waiting_lock(lock);
336 ldlm_failed_ast(lock, rc, ast_type);
340 LDLM_DEBUG(lock, "client (nid %s) returned %d"
341 " from %s AST - normal race",
342 ptlrpc_peernid2str(peer, str),
343 req->rq_repmsg->status, ast_type);
345 LDLM_ERROR(lock, "client (nid %s) returned %d "
346 "from %s AST", ptlrpc_peernid2str(peer, str),
347 (req->rq_repmsg != NULL) ?
348 req->rq_repmsg->status : 0, ast_type);
349 ldlm_lock_cancel(lock);
350 /* Server-side AST functions are called from ldlm_reprocess_all,
351 * which needs to be told to please restart its reprocessing. */
358 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
359 struct ldlm_lock_desc *desc,
360 void *data, int flag)
362 struct ldlm_request *body;
363 struct ptlrpc_request *req;
364 int rc = 0, size = sizeof(*body);
367 if (flag == LDLM_CB_CANCELING) {
368 /* Don't need to do anything here. */
374 l_lock(&lock->l_resource->lr_namespace->ns_lock);
375 if (lock->l_granted_mode != lock->l_req_mode) {
376 /* this blocking AST will be communicated as part of the
377 * completion AST instead */
378 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
379 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST"); RETURN(0);
382 if (lock->l_destroyed) {
383 /* What's the point? */
384 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
389 if (LTIME_S(CURRENT_TIME) - lock->l_export->exp_last_request_time > 30){
390 ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
391 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
396 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
397 LDLM_BL_CALLBACK, 1, &size, NULL);
399 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
403 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
404 memcpy(&body->lock_handle1, &lock->l_remote_handle,
405 sizeof(body->lock_handle1));
406 memcpy(&body->lock_desc, desc, sizeof(*desc));
407 body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
409 LDLM_DEBUG(lock, "server preparing blocking AST");
410 req->rq_replen = lustre_msg_size(0, NULL);
412 if (lock->l_granted_mode == lock->l_req_mode)
413 ldlm_add_waiting_lock(lock);
414 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
416 req->rq_send_state = LUSTRE_IMP_FULL;
417 req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
418 rc = ptlrpc_queue_wait(req);
420 rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
422 ptlrpc_req_finished(req);
427 /* XXX copied from ptlrpc/service.c */
428 static long timeval_sub(struct timeval *large, struct timeval *small)
430 return (large->tv_sec - small->tv_sec) * 1000000 +
431 (large->tv_usec - small->tv_usec);
434 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
436 struct ldlm_request *body;
437 struct ptlrpc_request *req;
438 struct timeval granted_time;
439 long total_enqueue_wait;
440 int rc = 0, size[2] = {sizeof(*body)}, buffers = 1;
443 LASSERT(lock != NULL);
445 do_gettimeofday(&granted_time);
446 total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time);
448 if (total_enqueue_wait / 1000000 > obd_timeout)
449 LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait);
451 if (lock->l_resource->lr_lvb_len) {
453 size[1] = lock->l_resource->lr_lvb_len;
456 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
457 LDLM_CP_CALLBACK, buffers, size, NULL);
461 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
462 memcpy(&body->lock_handle1, &lock->l_remote_handle,
463 sizeof(body->lock_handle1));
464 body->lock_flags = flags;
465 ldlm_lock2desc(lock, &body->lock_desc);
468 void *lvb = lustre_msg_buf(req->rq_reqmsg, 1,
469 lock->l_resource->lr_lvb_len);
470 memcpy(lvb, lock->l_resource->lr_lvb_data,
471 lock->l_resource->lr_lvb_len);
474 LDLM_DEBUG(lock, "server preparing completion AST (after %ldus wait)",
476 req->rq_replen = lustre_msg_size(0, NULL);
478 req->rq_send_state = LUSTRE_IMP_FULL;
479 req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
481 /* We only send real blocking ASTs after the lock is granted */
482 l_lock(&lock->l_resource->lr_namespace->ns_lock);
483 if (lock->l_flags & LDLM_FL_AST_SENT) {
484 body->lock_flags |= LDLM_FL_AST_SENT;
485 ldlm_add_waiting_lock(lock); /* start the lock-timeout clock */
487 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
489 rc = ptlrpc_queue_wait(req);
491 rc = ldlm_handle_ast_error(lock, req, rc, "completion");
493 ptlrpc_req_finished(req);
498 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
500 struct ldlm_resource *res = lock->l_resource;
501 struct ldlm_request *body;
502 struct ptlrpc_request *req;
503 int rc = 0, size = sizeof(*body);
506 LASSERT(lock != NULL);
508 req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
509 LDLM_GL_CALLBACK, 1, &size, NULL);
513 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
514 memcpy(&body->lock_handle1, &lock->l_remote_handle,
515 sizeof(body->lock_handle1));
516 ldlm_lock2desc(lock, &body->lock_desc);
518 size = lock->l_resource->lr_lvb_len;
519 req->rq_replen = lustre_msg_size(1, &size);
521 req->rq_send_state = LUSTRE_IMP_FULL;
522 req->rq_timeout = 2; /* 2 second timeout for initial AST reply */
524 rc = ptlrpc_queue_wait(req);
525 if (rc == -ELDLM_NO_LOCK_DATA)
526 LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
528 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
530 rc = res->lr_namespace->ns_lvbo->lvbo_update
531 (res, req->rq_repmsg, 0, 1);
532 ptlrpc_req_finished(req);
536 int ldlm_handle_enqueue(struct ptlrpc_request *req,
537 ldlm_completion_callback completion_callback,
538 ldlm_blocking_callback blocking_callback,
539 ldlm_glimpse_callback glimpse_callback)
541 struct obd_device *obddev = req->rq_export->exp_obd;
542 struct ldlm_reply *dlm_rep;
543 struct ldlm_request *dlm_req;
544 int rc = 0, size[2] = {sizeof(*dlm_rep)};
546 ldlm_error_t err = ELDLM_OK;
547 struct ldlm_lock *lock = NULL;
551 LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
553 dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
554 lustre_swab_ldlm_request);
555 if (dlm_req == NULL) {
556 CERROR ("Can't unpack dlm_req\n");
557 GOTO(out, rc = -EFAULT);
560 flags = dlm_req->lock_flags;
562 /* The lock's callback data might be set in the policy function */
563 lock = ldlm_lock_create(obddev->obd_namespace, &dlm_req->lock_handle2,
564 dlm_req->lock_desc.l_resource.lr_name,
565 dlm_req->lock_desc.l_resource.lr_type,
566 dlm_req->lock_desc.l_req_mode,
567 blocking_callback, completion_callback,
568 glimpse_callback, NULL, 0);
570 GOTO(out, rc = -ENOMEM);
572 do_gettimeofday(&lock->l_enqueued_time);
573 memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1,
574 sizeof(lock->l_remote_handle));
575 LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
577 LASSERT(req->rq_export);
578 lock->l_export = class_export_get(req->rq_export);
579 l_lock(&lock->l_resource->lr_namespace->ns_lock);
580 list_add(&lock->l_export_chain,
581 &lock->l_export->exp_ldlm_data.led_held_locks);
582 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
584 if (flags & LDLM_FL_HAS_INTENT) {
585 /* In this case, the reply buffer is allocated deep in
586 * local_lock_enqueue by the policy function. */
590 if (lock->l_resource->lr_lvb_len) {
591 size[1] = lock->l_resource->lr_lvb_len;
595 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
596 GOTO(out, rc = -ENOMEM);
598 rc = lustre_pack_reply(req, buffers, size, NULL);
603 if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
604 memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
605 sizeof(ldlm_policy_data_t));
606 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
607 memcpy(&lock->l_req_extent, &lock->l_policy_data.l_extent,
608 sizeof(lock->l_req_extent));
610 err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, &flags);
614 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
615 dlm_rep->lock_flags = flags;
617 ldlm_lock2desc(lock, &dlm_rep->lock_desc);
618 ldlm_lock2handle(lock, &dlm_rep->lock_handle);
620 /* We never send a blocking AST until the lock is granted, but
621 * we can tell it right now */
622 l_lock(&lock->l_resource->lr_namespace->ns_lock);
623 if (lock->l_flags & LDLM_FL_AST_SENT) {
624 dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
625 if (lock->l_granted_mode == lock->l_req_mode)
626 ldlm_add_waiting_lock(lock);
628 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
632 req->rq_status = err;
633 if (req->rq_reply_state == NULL) {
634 err = lustre_pack_reply(req, 0, NULL, NULL);
640 /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
641 * ldlm_reprocess_all. If this moves, revisit that code. -phil */
643 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
644 "(err=%d, rc=%d)", err, rc);
646 if (lock->l_resource->lr_lvb_len > 0 && rc == 0) {
647 void *lvb = lustre_msg_buf(req->rq_repmsg, 1,
648 lock->l_resource->lr_lvb_len);
649 LASSERT(lvb != NULL);
650 memcpy(lvb, lock->l_resource->lr_lvb_data,
651 lock->l_resource->lr_lvb_len);
654 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
655 ldlm_reprocess_all(lock->l_resource);
658 LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
664 int ldlm_handle_convert(struct ptlrpc_request *req)
666 struct ldlm_request *dlm_req;
667 struct ldlm_reply *dlm_rep;
668 struct ldlm_lock *lock;
669 int rc, size = sizeof(*dlm_rep);
672 dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
673 lustre_swab_ldlm_request);
674 if (dlm_req == NULL) {
675 CERROR ("Can't unpack dlm_req\n");
679 rc = lustre_pack_reply(req, 1, &size, NULL);
681 CERROR("out of memory\n");
684 dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*dlm_rep));
685 dlm_rep->lock_flags = dlm_req->lock_flags;
687 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
689 req->rq_status = EINVAL;
691 LDLM_DEBUG(lock, "server-side convert handler START");
692 ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
693 &dlm_rep->lock_flags);
694 if (ldlm_del_waiting_lock(lock))
695 CDEBUG(D_DLMTRACE, "converted waiting lock %p\n", lock);
700 ldlm_reprocess_all(lock->l_resource);
701 LDLM_DEBUG(lock, "server-side convert handler END");
704 LDLM_DEBUG_NOLOCK("server-side convert handler END");
709 int ldlm_handle_cancel(struct ptlrpc_request *req)
711 struct ldlm_request *dlm_req;
712 struct ldlm_lock *lock;
713 struct ldlm_resource *res;
714 char str[PTL_NALFMT_SIZE];
718 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
719 lustre_swab_ldlm_request);
720 if (dlm_req == NULL) {
721 CERROR("bad request buffer for cancel\n");
725 rc = lustre_pack_reply(req, 0, NULL, NULL);
727 CERROR("out of memory\n");
731 lock = ldlm_handle2lock(&dlm_req->lock_handle1);
733 CERROR("received cancel for unknown lock cookie "LPX64
734 " from client %s nid %s\n",
735 dlm_req->lock_handle1.cookie,
736 req->rq_export->exp_client_uuid.uuid,
737 ptlrpc_peernid2str(&req->rq_peer, str));
738 LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
740 dlm_req->lock_handle1.cookie);
741 req->rq_status = ESTALE;
743 LDLM_DEBUG(lock, "server-side cancel handler START");
744 res = lock->l_resource;
745 if (res && res->lr_namespace->ns_lvbo &&
746 res->lr_namespace->ns_lvbo->lvbo_update) {
747 (void)res->lr_namespace->ns_lvbo->lvbo_update
749 //(res, req->rq_reqmsg, 1);
752 ldlm_lock_cancel(lock);
753 if (ldlm_del_waiting_lock(lock))
754 CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
758 if (ptlrpc_reply(req) != 0)
762 ldlm_reprocess_all(lock->l_resource);
763 LDLM_DEBUG(lock, "server-side cancel handler END");
770 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
771 struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
776 l_lock(&ns->ns_lock);
777 LDLM_DEBUG(lock, "client blocking AST callback handler START");
779 lock->l_flags |= LDLM_FL_CBPENDING;
780 do_ast = (!lock->l_readers && !lock->l_writers);
783 LDLM_DEBUG(lock, "already unused, calling "
784 "callback (%p)", lock->l_blocking_ast);
785 if (lock->l_blocking_ast != NULL) {
786 l_unlock(&ns->ns_lock);
787 l_check_no_ns_lock(ns);
788 lock->l_blocking_ast(lock, ld, lock->l_ast_data,
790 l_lock(&ns->ns_lock);
793 LDLM_DEBUG(lock, "Lock still has references, will be"
797 LDLM_DEBUG(lock, "client blocking callback handler END");
798 l_unlock(&ns->ns_lock);
803 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
804 struct ldlm_namespace *ns,
805 struct ldlm_request *dlm_req,
806 struct ldlm_lock *lock)
811 l_lock(&ns->ns_lock);
812 LDLM_DEBUG(lock, "client completion callback handler START");
814 /* If we receive the completion AST before the actual enqueue returned,
815 * then we might need to switch lock modes, resources, or extents. */
816 if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
817 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
818 LDLM_DEBUG(lock, "completion AST, new lock mode");
821 if (lock->l_resource->lr_type != LDLM_PLAIN) {
822 memcpy(&lock->l_policy_data, &dlm_req->lock_desc.l_policy_data,
823 sizeof(lock->l_policy_data));
824 LDLM_DEBUG(lock, "completion AST, new policy data");
827 ldlm_resource_unlink_lock(lock);
828 if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
829 &lock->l_resource->lr_name,
830 sizeof(lock->l_resource->lr_name)) != 0) {
831 ldlm_lock_change_resource(ns, lock,
832 dlm_req->lock_desc.l_resource.lr_name);
833 LDLM_DEBUG(lock, "completion AST, new resource");
836 if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
837 lock->l_flags |= LDLM_FL_CBPENDING;
838 LDLM_DEBUG(lock, "completion AST includes blocking AST");
841 if (lock->l_lvb_len) {
843 lvb = lustre_swab_reqbuf(req, 1, lock->l_lvb_len,
844 lock->l_lvb_swabber);
846 LDLM_ERROR(lock, "completion AST did not contain "
849 memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
853 lock->l_resource->lr_tmp = &ast_list;
854 ldlm_grant_lock(lock, req, sizeof(*req), 1);
855 lock->l_resource->lr_tmp = NULL;
856 LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
857 l_unlock(&ns->ns_lock);
860 ldlm_run_ast_work(ns, &ast_list);
862 LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
867 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
868 struct ldlm_namespace *ns,
869 struct ldlm_request *dlm_req,
870 struct ldlm_lock *lock)
875 l_lock(&ns->ns_lock);
876 LDLM_DEBUG(lock, "client glimpse AST callback handler");
878 if (lock->l_glimpse_ast != NULL) {
879 l_unlock(&ns->ns_lock);
880 l_check_no_ns_lock(ns);
881 rc = lock->l_glimpse_ast(lock, req);
882 l_lock(&ns->ns_lock);
885 if (req->rq_repmsg != NULL) {
892 if (lock->l_granted_mode == LCK_PW &&
893 !lock->l_readers && !lock->l_writers &&
894 time_after(jiffies, lock->l_last_used + 10 * HZ)) {
895 l_unlock(&ns->ns_lock);
896 ldlm_handle_bl_callback(ns, NULL, lock);
901 l_unlock(&ns->ns_lock);
906 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
909 if (req->rq_reply_state == NULL) {
910 rc = lustre_pack_reply(req, 0, NULL, NULL);
914 return ptlrpc_reply(req);
917 int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
918 struct ldlm_lock *lock)
921 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
922 struct ldlm_bl_work_item *blwi;
925 OBD_ALLOC(blwi, sizeof(*blwi));
932 blwi->blwi_lock = lock;
934 spin_lock(&blp->blp_lock);
935 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
936 wake_up(&blp->blp_waitq);
937 spin_unlock(&blp->blp_lock);
945 static int ldlm_callback_handler(struct ptlrpc_request *req)
947 struct ldlm_namespace *ns;
948 struct ldlm_request *dlm_req;
949 struct ldlm_lock *lock;
950 char str[PTL_NALFMT_SIZE];
954 /* Requests arrive in sender's byte order. The ptlrpc service
955 * handler has already checked and, if necessary, byte-swapped the
956 * incoming request message body, but I am responsible for the
957 * message buffers. */
959 if (req->rq_export == NULL) {
960 struct ldlm_request *dlm_req;
962 CDEBUG(D_RPCTRACE, "operation %d from nid %s with bad "
963 "export cookie "LPX64" (ptl req %d/rep %d); this is "
964 "normal if this node rebooted with a lock held\n",
966 ptlrpc_peernid2str(&req->rq_peer, str),
967 req->rq_reqmsg->handle.cookie,
968 req->rq_request_portal, req->rq_reply_portal);
970 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
971 lustre_swab_ldlm_request);
973 CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
974 dlm_req->lock_handle1.cookie);
976 ldlm_callback_reply(req, -ENOTCONN);
980 LASSERT(req->rq_export != NULL);
981 LASSERT(req->rq_export->exp_obd != NULL);
983 switch(req->rq_reqmsg->opc) {
984 case LDLM_BL_CALLBACK:
985 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
987 case LDLM_CP_CALLBACK:
988 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
990 case LDLM_GL_CALLBACK:
991 OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
994 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
995 rc = llog_origin_handle_cancel(req);
996 ldlm_callback_reply(req, rc);
998 case LLOG_ORIGIN_HANDLE_CREATE:
999 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1000 rc = llog_origin_handle_create(req);
1001 ldlm_callback_reply(req, rc);
1003 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1004 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1005 rc = llog_origin_handle_next_block(req);
1006 ldlm_callback_reply(req, rc);
1008 case LLOG_ORIGIN_HANDLE_READ_HEADER:
1009 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1010 rc = llog_origin_handle_read_header(req);
1011 ldlm_callback_reply(req, rc);
1013 case LLOG_ORIGIN_HANDLE_CLOSE:
1014 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1015 rc = llog_origin_handle_close(req);
1016 ldlm_callback_reply(req, rc);
1019 CERROR("unknown opcode %u\n", req->rq_reqmsg->opc);
1020 ldlm_callback_reply(req, -EPROTO);
1024 ns = req->rq_export->exp_obd->obd_namespace;
1025 LASSERT(ns != NULL);
1027 dlm_req = lustre_swab_reqbuf (req, 0, sizeof (*dlm_req),
1028 lustre_swab_ldlm_request);
1029 if (dlm_req == NULL) {
1030 CERROR ("can't unpack dlm_req\n");
1031 ldlm_callback_reply (req, -EPROTO);
1035 lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
1037 CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
1038 dlm_req->lock_handle1.cookie);
1039 ldlm_callback_reply(req, -EINVAL);
1043 /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
1044 lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
1046 /* We want the ost thread to get this reply so that it can respond
1047 * to ost requests (write cache writeback) that might be triggered
1050 * But we'd also like to be able to indicate in the reply that we're
1051 * cancelling right now, because it's unused, or have an intent result
1052 * in the reply, so we might have to push the responsibility for sending
1053 * the reply down into the AST handlers, alas. */
1055 switch (req->rq_reqmsg->opc) {
1056 case LDLM_BL_CALLBACK:
1057 CDEBUG(D_INODE, "blocking ast\n");
1059 rc = ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock);
1060 ldlm_callback_reply(req, rc);
1063 ldlm_callback_reply(req, rc);
1064 ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
1067 case LDLM_CP_CALLBACK:
1068 CDEBUG(D_INODE, "completion ast\n");
1069 ldlm_callback_reply(req, 0);
1070 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
1072 case LDLM_GL_CALLBACK:
1073 CDEBUG(D_INODE, "glimpse ast\n");
1074 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
1077 LBUG(); /* checked above */
1083 static int ldlm_cancel_handler(struct ptlrpc_request *req)
1088 /* Requests arrive in sender's byte order. The ptlrpc service
1089 * handler has already checked and, if necessary, byte-swapped the
1090 * incoming request message body, but I am responsible for the
1091 * message buffers. */
1093 if (req->rq_export == NULL) {
1094 struct ldlm_request *dlm_req;
1095 CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
1096 req->rq_reqmsg->opc, req->rq_request_portal,
1097 req->rq_reply_portal);
1098 CERROR("--> export cookie: "LPX64"\n",
1099 req->rq_reqmsg->handle.cookie);
1100 dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
1101 lustre_swab_ldlm_request);
1102 if (dlm_req != NULL)
1103 ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
1107 switch (req->rq_reqmsg->opc) {
1109 /* XXX FIXME move this back to mds/handler.c, bug 249 */
1111 CDEBUG(D_INODE, "cancel\n");
1112 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1113 rc = ldlm_handle_cancel(req);
1119 CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
1127 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
1129 struct ldlm_bl_work_item *blwi = NULL;
1131 spin_lock(&blp->blp_lock);
1132 if (!list_empty(&blp->blp_list)) {
1133 blwi = list_entry(blp->blp_list.next, struct ldlm_bl_work_item,
1135 list_del(&blwi->blwi_entry);
1137 spin_unlock(&blp->blp_lock);
1142 struct ldlm_bl_thread_data {
1144 struct ldlm_bl_pool *bltd_blp;
1147 static int ldlm_bl_thread_main(void *arg)
1149 struct ldlm_bl_thread_data *bltd = arg;
1150 struct ldlm_bl_pool *blp = bltd->bltd_blp;
1151 unsigned long flags;
1154 /* XXX boiler-plate */
1156 char name[sizeof(current->comm)];
1157 snprintf(name, sizeof(name) - 1, "ldlm_bl_%02d",
1159 kportal_daemonize(name);
1161 SIGNAL_MASK_LOCK(current, flags);
1162 sigfillset(¤t->blocked);
1164 SIGNAL_MASK_UNLOCK(current, flags);
1166 atomic_inc(&blp->blp_num_threads);
1167 complete(&blp->blp_comp);
1170 struct l_wait_info lwi = { 0 };
1171 struct ldlm_bl_work_item *blwi = NULL;
1173 l_wait_event_exclusive(blp->blp_waitq,
1174 (blwi = ldlm_bl_get_work(blp)) != NULL,
1177 if (blwi->blwi_ns == NULL)
1180 ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
1182 OBD_FREE(blwi, sizeof(*blwi));
1185 atomic_dec(&blp->blp_num_threads);
1186 complete(&blp->blp_comp);
1192 static int ldlm_setup(void);
1193 static int ldlm_cleanup(int force);
1195 int ldlm_get_ref(void)
1198 down(&ldlm_ref_sem);
1199 if (++ldlm_refcount == 1) {
1209 void ldlm_put_ref(int force)
1211 down(&ldlm_ref_sem);
1212 if (ldlm_refcount == 1) {
1213 int rc = ldlm_cleanup(force);
1215 CERROR("ldlm_cleanup failed: %d\n", rc);
1226 static int ldlm_setup(void)
1228 struct ldlm_bl_pool *blp;
1235 if (ldlm_state != NULL)
1238 OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
1239 if (ldlm_state == NULL)
1243 rc = ldlm_proc_setup();
1248 ldlm_state->ldlm_cb_service =
1249 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
1250 LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1251 ldlm_callback_handler, "ldlm_cbd",
1254 if (!ldlm_state->ldlm_cb_service) {
1255 CERROR("failed to start service\n");
1256 GOTO(out_proc, rc = -ENOMEM);
1259 ldlm_state->ldlm_cancel_service =
1260 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
1261 LDLM_CANCEL_REQUEST_PORTAL,
1262 LDLM_CANCEL_REPLY_PORTAL,
1263 ldlm_cancel_handler, "ldlm_canceld",
1266 if (!ldlm_state->ldlm_cancel_service) {
1267 CERROR("failed to start service\n");
1268 GOTO(out_proc, rc = -ENOMEM);
1271 OBD_ALLOC(blp, sizeof(*blp));
1273 GOTO(out_proc, rc = -ENOMEM);
1274 ldlm_state->ldlm_bl_pool = blp;
1276 atomic_set(&blp->blp_num_threads, 0);
1277 init_waitqueue_head(&blp->blp_waitq);
1278 spin_lock_init(&blp->blp_lock);
1280 INIT_LIST_HEAD(&blp->blp_list);
1283 for (i = 0; i < LDLM_NUM_THREADS; i++) {
1284 struct ldlm_bl_thread_data bltd = {
1288 init_completion(&blp->blp_comp);
1289 rc = kernel_thread(ldlm_bl_thread_main, &bltd, 0);
1291 CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
1293 GOTO(out_thread, rc);
1295 wait_for_completion(&blp->blp_comp);
1298 rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cancel_service,
1299 LDLM_NUM_THREADS, "ldlm_cn");
1302 GOTO(out_thread, rc);
1305 rc = ptlrpc_start_n_threads(NULL, ldlm_state->ldlm_cb_service,
1306 LDLM_NUM_THREADS, "ldlm_cb");
1309 GOTO(out_thread, rc);
1312 INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
1313 spin_lock_init(&expired_lock_thread.elt_lock);
1314 expired_lock_thread.elt_state = ELT_STOPPED;
1315 init_waitqueue_head(&expired_lock_thread.elt_waitq);
1317 rc = kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FS);
1319 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
1320 GOTO(out_thread, rc);
1323 wait_event(expired_lock_thread.elt_waitq,
1324 expired_lock_thread.elt_state == ELT_READY);
1326 INIT_LIST_HEAD(&waiting_locks_list);
1327 spin_lock_init(&waiting_locks_spinlock);
1328 waiting_locks_timer.function = waiting_locks_callback;
1329 waiting_locks_timer.data = 0;
1330 init_timer(&waiting_locks_timer);
1337 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1338 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1343 ldlm_proc_cleanup();
1346 OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1351 static int ldlm_cleanup(int force)
1354 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1358 if (!list_empty(&ldlm_namespace_list)) {
1359 CERROR("ldlm still has namespaces; clean these up first.\n");
1360 ldlm_dump_all_namespaces();
1365 while (atomic_read(&blp->blp_num_threads) > 0) {
1366 struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1368 init_completion(&blp->blp_comp);
1370 spin_lock(&blp->blp_lock);
1371 list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1372 wake_up(&blp->blp_waitq);
1373 spin_unlock(&blp->blp_lock);
1375 wait_for_completion(&blp->blp_comp);
1377 OBD_FREE(blp, sizeof(*blp));
1379 ptlrpc_stop_all_threads(ldlm_state->ldlm_cb_service);
1380 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1381 ptlrpc_stop_all_threads(ldlm_state->ldlm_cancel_service);
1382 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
1383 ldlm_proc_cleanup();
1385 expired_lock_thread.elt_state = ELT_TERMINATE;
1386 wake_up(&expired_lock_thread.elt_waitq);
1387 wait_event(expired_lock_thread.elt_waitq,
1388 expired_lock_thread.elt_state == ELT_STOPPED);
1392 OBD_FREE(ldlm_state, sizeof(*ldlm_state));
1398 int __init ldlm_init(void)
1400 ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1401 sizeof(struct ldlm_resource), 0,
1402 SLAB_HWCACHE_ALIGN, NULL, NULL);
1403 if (ldlm_resource_slab == NULL)
1406 ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1407 sizeof(struct ldlm_lock), 0,
1408 SLAB_HWCACHE_ALIGN, NULL, NULL);
1409 if (ldlm_lock_slab == NULL) {
1410 kmem_cache_destroy(ldlm_resource_slab);
1414 l_lock_init(&ldlm_handle_lock);
1419 void __exit ldlm_exit(void)
1421 if ( ldlm_refcount )
1422 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1423 if (kmem_cache_destroy(ldlm_resource_slab) != 0)
1424 CERROR("couldn't free ldlm resource slab\n");
1425 if (kmem_cache_destroy(ldlm_lock_slab) != 0)
1426 CERROR("couldn't free ldlm lock slab\n");
1430 EXPORT_SYMBOL(ldlm_flock_completion_ast);
1433 EXPORT_SYMBOL(ldlm_extent_shift_kms);
1436 EXPORT_SYMBOL(ldlm_get_processing_policy);
1437 EXPORT_SYMBOL(ldlm_lock2desc);
1438 EXPORT_SYMBOL(ldlm_register_intent);
1439 EXPORT_SYMBOL(ldlm_lockname);
1440 EXPORT_SYMBOL(ldlm_typename);
1441 EXPORT_SYMBOL(ldlm_lock2handle);
1442 EXPORT_SYMBOL(__ldlm_handle2lock);
1443 EXPORT_SYMBOL(ldlm_lock_get);
1444 EXPORT_SYMBOL(ldlm_lock_put);
1445 EXPORT_SYMBOL(ldlm_lock_match);
1446 EXPORT_SYMBOL(ldlm_lock_cancel);
1447 EXPORT_SYMBOL(ldlm_lock_addref);
1448 EXPORT_SYMBOL(ldlm_lock_decref);
1449 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
1450 EXPORT_SYMBOL(ldlm_lock_change_resource);
1451 EXPORT_SYMBOL(ldlm_lock_set_data);
1452 EXPORT_SYMBOL(ldlm_it2str);
1453 EXPORT_SYMBOL(ldlm_lock_dump);
1454 EXPORT_SYMBOL(ldlm_lock_dump_handle);
1455 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
1456 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
1457 EXPORT_SYMBOL(ldlm_lock_allow_match);
1459 /* ldlm_request.c */
1460 EXPORT_SYMBOL(ldlm_completion_ast);
1461 EXPORT_SYMBOL(ldlm_expired_completion_wait);
1462 EXPORT_SYMBOL(ldlm_cli_convert);
1463 EXPORT_SYMBOL(ldlm_cli_enqueue);
1464 EXPORT_SYMBOL(ldlm_cli_cancel);
1465 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
1466 EXPORT_SYMBOL(ldlm_replay_locks);
1467 EXPORT_SYMBOL(ldlm_resource_foreach);
1468 EXPORT_SYMBOL(ldlm_namespace_foreach);
1469 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
1470 EXPORT_SYMBOL(ldlm_change_cbdata);
1473 EXPORT_SYMBOL(ldlm_server_blocking_ast);
1474 EXPORT_SYMBOL(ldlm_server_completion_ast);
1475 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
1476 EXPORT_SYMBOL(ldlm_handle_enqueue);
1477 EXPORT_SYMBOL(ldlm_handle_cancel);
1478 EXPORT_SYMBOL(ldlm_handle_convert);
1479 EXPORT_SYMBOL(ldlm_del_waiting_lock);
1480 EXPORT_SYMBOL(ldlm_get_ref);
1481 EXPORT_SYMBOL(ldlm_put_ref);
1485 EXPORT_SYMBOL(ldlm_test);
1486 EXPORT_SYMBOL(ldlm_regression_start);
1487 EXPORT_SYMBOL(ldlm_regression_stop);
1490 /* ldlm_resource.c */
1491 EXPORT_SYMBOL(ldlm_namespace_new);
1492 EXPORT_SYMBOL(ldlm_namespace_cleanup);
1493 EXPORT_SYMBOL(ldlm_namespace_free);
1494 EXPORT_SYMBOL(ldlm_namespace_dump);
1495 EXPORT_SYMBOL(ldlm_dump_all_namespaces);
1496 EXPORT_SYMBOL(ldlm_resource_get);
1497 EXPORT_SYMBOL(ldlm_resource_putref);
1500 EXPORT_SYMBOL(l_lock);
1501 EXPORT_SYMBOL(l_unlock);
1504 EXPORT_SYMBOL(client_obd_setup);
1505 EXPORT_SYMBOL(client_obd_cleanup);
1506 EXPORT_SYMBOL(client_connect_import);
1507 EXPORT_SYMBOL(client_disconnect_export);
1508 EXPORT_SYMBOL(target_abort_recovery);
1509 EXPORT_SYMBOL(target_handle_connect);
1510 EXPORT_SYMBOL(target_destroy_export);
1511 EXPORT_SYMBOL(target_cancel_recovery_timer);
1512 EXPORT_SYMBOL(target_send_reply);
1513 EXPORT_SYMBOL(target_queue_recovery_request);
1514 EXPORT_SYMBOL(target_handle_ping);
1515 EXPORT_SYMBOL(target_handle_disconnect);
1516 EXPORT_SYMBOL(target_queue_final_reply);