Whamcloud - gitweb
56a8421dd9f2cc0375ff302dc77895c14b72c41d
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lockd.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #ifndef EXPORT_SYMTAB
43 # define EXPORT_SYMTAB
44 #endif
45 #define DEBUG_SUBSYSTEM S_LDLM
46
47 #ifdef __KERNEL__
48 # include <libcfs/libcfs.h>
49 #else
50 # include <liblustre.h>
51 #endif
52
53 #include <lustre_dlm.h>
54 #include <obd_class.h>
55 #include <libcfs/list.h>
56 #include "ldlm_internal.h"
57
58 #ifdef __KERNEL__
59 static int ldlm_num_threads;
60 CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
61                 "number of DLM service threads to start");
62 #endif
63
64 extern cfs_mem_cache_t *ldlm_resource_slab;
65 extern cfs_mem_cache_t *ldlm_lock_slab;
66 extern struct lustre_lock ldlm_handle_lock;
67
68 static struct semaphore ldlm_ref_sem;
69 static int ldlm_refcount;
70
71 static struct ldlm_state *ldlm_state;
72
73 inline cfs_time_t round_timeout(cfs_time_t timeout)
74 {
75         return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
76 }
77
78 /* timeout for initial callback (AST) reply (bz10399) */
79 static inline unsigned int ldlm_get_rq_timeout(void)
80 {
81         /* Non-AT value */
82         unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
83
84         return timeout < 1 ? 1 : timeout;
85 }
86
87 #ifdef __KERNEL__
88 /* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
89 static spinlock_t waiting_locks_spinlock;   /* BH lock (timer) */
90 static struct list_head waiting_locks_list;
91 static cfs_timer_t waiting_locks_timer;
92
93 static struct expired_lock_thread {
94         cfs_waitq_t               elt_waitq;
95         int                       elt_state;
96         int                       elt_dump;
97         struct list_head          elt_expired_locks;
98 } expired_lock_thread;
99 #endif
100
101 #define ELT_STOPPED   0
102 #define ELT_READY     1
103 #define ELT_TERMINATE 2
104
105 struct ldlm_bl_pool {
106         spinlock_t              blp_lock;
107
108         /*
109          * blp_prio_list is used for callbacks that should be handled
110          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
111          * see bug 13843
112          */
113         struct list_head        blp_prio_list;
114
115         /*
116          * blp_list is used for all other callbacks which are likely
117          * to take longer to process.
118          */
119         struct list_head        blp_list;
120
121         cfs_waitq_t             blp_waitq;
122         struct completion       blp_comp;
123         atomic_t                blp_num_threads;
124         atomic_t                blp_busy_threads;
125         int                     blp_min_threads;
126         int                     blp_max_threads;
127 };
128
129 struct ldlm_bl_work_item {
130         struct list_head        blwi_entry;
131         struct ldlm_namespace   *blwi_ns;
132         struct ldlm_lock_desc   blwi_ld;
133         struct ldlm_lock        *blwi_lock;
134         struct list_head        blwi_head;
135         int                     blwi_count;
136         struct completion       blwi_comp;
137         atomic_t                blwi_ref_count;
138 };
139
140 #ifdef __KERNEL__
141 static inline void ldlm_bl_work_item_get(struct ldlm_bl_work_item *blwi)
142 {
143         atomic_inc(&blwi->blwi_ref_count);
144 }
145
146 static inline void ldlm_bl_work_item_put(struct ldlm_bl_work_item *blwi)
147 {
148         if (atomic_dec_and_test(&blwi->blwi_ref_count))
149                 OBD_FREE(blwi, sizeof(*blwi));
150 }
151
152 static inline int have_expired_locks(void)
153 {
154         int need_to_run;
155         ENTRY;
156
157         spin_lock_bh(&waiting_locks_spinlock);
158         need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
159         spin_unlock_bh(&waiting_locks_spinlock);
160
161         RETURN(need_to_run);
162 }
163
164 static int expired_lock_main(void *arg)
165 {
166         struct list_head *expired = &expired_lock_thread.elt_expired_locks;
167         struct l_wait_info lwi = { 0 };
168         int do_dump;
169
170         ENTRY;
171         cfs_daemonize("ldlm_elt");
172
173         expired_lock_thread.elt_state = ELT_READY;
174         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
175
176         while (1) {
177                 l_wait_event(expired_lock_thread.elt_waitq,
178                              have_expired_locks() ||
179                              expired_lock_thread.elt_state == ELT_TERMINATE,
180                              &lwi);
181
182                 spin_lock_bh(&waiting_locks_spinlock);
183                 if (expired_lock_thread.elt_dump) {
184                         spin_unlock_bh(&waiting_locks_spinlock);
185
186                         /* from waiting_locks_callback, but not in timer */
187                         libcfs_debug_dumplog();
188                         libcfs_run_lbug_upcall(__FILE__,
189                                                 "waiting_locks_callback",
190                                                 expired_lock_thread.elt_dump);
191
192                         spin_lock_bh(&waiting_locks_spinlock);
193                         expired_lock_thread.elt_dump = 0;
194                 }
195
196                 do_dump = 0;
197
198                 while (!list_empty(expired)) {
199                         struct obd_export *export;
200                         struct ldlm_lock *lock;
201
202                         lock = list_entry(expired->next, struct ldlm_lock,
203                                           l_pending_chain);
204                         if ((void *)lock < LP_POISON + CFS_PAGE_SIZE &&
205                             (void *)lock >= LP_POISON) {
206                                 spin_unlock_bh(&waiting_locks_spinlock);
207                                 CERROR("free lock on elt list %p\n", lock);
208                                 LBUG();
209                         }
210                         list_del_init(&lock->l_pending_chain);
211                         if ((void *)lock->l_export < LP_POISON + CFS_PAGE_SIZE &&
212                             (void *)lock->l_export >= LP_POISON) {
213                                 CERROR("lock with free export on elt list %p\n",
214                                        lock->l_export);
215                                 lock->l_export = NULL;
216                                 LDLM_ERROR(lock, "free export");
217                                 /* release extra ref grabbed by
218                                  * ldlm_add_waiting_lock() or
219                                  * ldlm_failed_ast() */
220                                 LDLM_LOCK_PUT(lock);
221                                 continue;
222                         }
223                         export = class_export_get(lock->l_export);
224                         spin_unlock_bh(&waiting_locks_spinlock);
225
226                         /* release extra ref grabbed by ldlm_add_waiting_lock()
227                          * or ldlm_failed_ast() */
228                         LDLM_LOCK_PUT(lock);
229
230                         do_dump++;
231                         class_fail_export(export);
232                         class_export_put(export);
233                         spin_lock_bh(&waiting_locks_spinlock);
234                 }
235                 spin_unlock_bh(&waiting_locks_spinlock);
236
237                 if (do_dump && obd_dump_on_eviction) {
238                         CERROR("dump the log upon eviction\n");
239                         libcfs_debug_dumplog();
240                 }
241
242                 if (expired_lock_thread.elt_state == ELT_TERMINATE)
243                         break;
244         }
245
246         expired_lock_thread.elt_state = ELT_STOPPED;
247         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
248         RETURN(0);
249 }
250
251 /**
252  * Check if there is a request in the export request list
253  * which prevents the lock canceling.
254  */
255 static int ldlm_lock_busy(struct ldlm_lock *lock)
256 {
257         struct ptlrpc_request *req;
258         int match = 0;
259         ENTRY;
260
261         if (lock->l_export == NULL)
262                 return 0;
263
264         spin_lock(&lock->l_export->exp_lock);
265         list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
266                 if (req->rq_ops->hpreq_lock_match) {
267                         match = req->rq_ops->hpreq_lock_match(req, lock);
268                         if (match)
269                                 break;
270                 }
271         }
272         spin_unlock(&lock->l_export->exp_lock);
273         RETURN(match);
274 }
275
276 /* This is called from within a timer interrupt and cannot schedule */
277 static void waiting_locks_callback(unsigned long unused)
278 {
279         struct ldlm_lock *lock, *last = NULL;
280
281         spin_lock_bh(&waiting_locks_spinlock);
282         while (!list_empty(&waiting_locks_list)) {
283                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
284                                   l_pending_chain);
285                 if (cfs_time_after(lock->l_callback_timeout, cfs_time_current())
286                     || (lock->l_req_mode == LCK_GROUP))
287                         break;
288
289                 /* Check if we need to prolong timeout */
290                 if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
291                     ldlm_lock_busy(lock)) {
292                         int cont = 1;
293
294                         if (lock->l_pending_chain.next == &waiting_locks_list)
295                                 cont = 0;
296
297                         LDLM_LOCK_GET(lock);
298                         spin_unlock_bh(&waiting_locks_spinlock);
299                         LDLM_DEBUG(lock, "prolong the busy lock");
300                         ldlm_refresh_waiting_lock(lock,
301                                                   ldlm_get_enq_timeout(lock));
302                         spin_lock_bh(&waiting_locks_spinlock);
303
304                         if (!cont) {
305                                 LDLM_LOCK_PUT(lock);
306                                 break;
307                         }
308
309                         LDLM_LOCK_PUT(lock);
310                         continue;
311                 }
312                 lock->l_resource->lr_namespace->ns_timeouts++;
313                 LDLM_ERROR(lock, "lock callback timer expired after %lds: "
314                            "evicting client at %s ",
315                            cfs_time_current_sec()- lock->l_last_activity,
316                            libcfs_nid2str(
317                                    lock->l_export->exp_connection->c_peer.nid));
318                 if (lock == last) {
319                         LDLM_ERROR(lock, "waiting on lock multiple times");
320                         CERROR("wll %p n/p %p/%p, l_pending %p n/p %p/%p\n",
321                                &waiting_locks_list,
322                                waiting_locks_list.next, waiting_locks_list.prev,
323                                &lock->l_pending_chain,
324                                lock->l_pending_chain.next,
325                                lock->l_pending_chain.prev);
326
327                         CFS_INIT_LIST_HEAD(&waiting_locks_list);    /* HACK */
328                         expired_lock_thread.elt_dump = __LINE__;
329
330                         /* LBUG(); */
331                         CEMERG("would be an LBUG, but isn't (bug 5653)\n");
332                         libcfs_debug_dumpstack(NULL);
333                         /*blocks* libcfs_debug_dumplog(); */
334                         /*blocks* libcfs_run_lbug_upcall(file, func, line); */
335                         break;
336                 }
337                 last = lock;
338
339                 /* no needs to take an extra ref on the lock since it was in
340                  * the waiting_locks_list and ldlm_add_waiting_lock()
341                  * already grabbed a ref */
342                 list_del(&lock->l_pending_chain);
343                 list_add(&lock->l_pending_chain,
344                          &expired_lock_thread.elt_expired_locks);
345         }
346
347         if (!list_empty(&expired_lock_thread.elt_expired_locks)) {
348                 if (obd_dump_on_timeout)
349                         expired_lock_thread.elt_dump = __LINE__;
350
351                 cfs_waitq_signal(&expired_lock_thread.elt_waitq);
352         }
353
354         /*
355          * Make sure the timer will fire again if we have any locks
356          * left.
357          */
358         if (!list_empty(&waiting_locks_list)) {
359                 cfs_time_t timeout_rounded;
360                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
361                                   l_pending_chain);
362                 timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout);
363                 cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
364         }
365         spin_unlock_bh(&waiting_locks_spinlock);
366 }
367
368 /*
369  * Indicate that we're waiting for a client to call us back cancelling a given
370  * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
371  * timer to fire appropriately.  (We round up to the next second, to avoid
372  * floods of timer firings during periods of high lock contention and traffic).
373  * As done by ldlm_add_waiting_lock(), the caller must grab a lock reference
374  * if it has been added to the waiting list (1 is returned).
375  *
376  * Called with the namespace lock held.
377  */
378 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds)
379 {
380         cfs_time_t timeout;
381         cfs_time_t timeout_rounded;
382
383         if (!list_empty(&lock->l_pending_chain))
384                 return 0;
385
386         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
387             OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
388                 seconds = 1;
389
390         timeout = cfs_time_shift(seconds);
391         if (likely(cfs_time_after(timeout, lock->l_callback_timeout)))
392                 lock->l_callback_timeout = timeout;
393
394         timeout_rounded = round_timeout(lock->l_callback_timeout);
395
396         if (cfs_time_before(timeout_rounded,
397                             cfs_timer_deadline(&waiting_locks_timer)) ||
398             !cfs_timer_is_armed(&waiting_locks_timer)) {
399                 cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
400         }
401         /* if the new lock has a shorter timeout than something earlier on
402            the list, we'll wait the longer amount of time; no big deal. */
403         list_add_tail(&lock->l_pending_chain, &waiting_locks_list); /* FIFO */
404         return 1;
405 }
406
407 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
408 {
409         int ret;
410         int timeout = ldlm_get_enq_timeout(lock);
411
412         LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
413
414         spin_lock_bh(&waiting_locks_spinlock);
415         if (lock->l_destroyed) {
416                 static cfs_time_t next;
417                 spin_unlock_bh(&waiting_locks_spinlock);
418                 LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
419                 if (cfs_time_after(cfs_time_current(), next)) {
420                         next = cfs_time_shift(14400);
421                         libcfs_debug_dumpstack(NULL);
422                 }
423                 return 0;
424         }
425
426         ret = __ldlm_add_waiting_lock(lock, timeout);
427         if (ret)
428                 /* grab ref on the lock if it has been added to the
429                  * waiting list */
430                 LDLM_LOCK_GET(lock);
431         spin_unlock_bh(&waiting_locks_spinlock);
432
433         LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
434                    ret == 0 ? "not re-" : "", timeout,
435                    AT_OFF ? "off" : "on");
436         return ret;
437 }
438
439 /*
440  * Remove a lock from the pending list, likely because it had its cancellation
441  * callback arrive without incident.  This adjusts the lock-timeout timer if
442  * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
443  * As done by ldlm_del_waiting_lock(), the caller must release the lock
444  * reference when the lock is removed from any list (1 is returned).
445  *
446  * Called with namespace lock held.
447  */
448 static int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
449 {
450         struct list_head *list_next;
451
452         if (list_empty(&lock->l_pending_chain))
453                 return 0;
454
455         list_next = lock->l_pending_chain.next;
456         if (lock->l_pending_chain.prev == &waiting_locks_list) {
457                 /* Removing the head of the list, adjust timer. */
458                 if (list_next == &waiting_locks_list) {
459                         /* No more, just cancel. */
460                         cfs_timer_disarm(&waiting_locks_timer);
461                 } else {
462                         struct ldlm_lock *next;
463                         next = list_entry(list_next, struct ldlm_lock,
464                                           l_pending_chain);
465                         cfs_timer_arm(&waiting_locks_timer,
466                                       round_timeout(next->l_callback_timeout));
467                 }
468         }
469         list_del_init(&lock->l_pending_chain);
470
471         return 1;
472 }
473
474 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
475 {
476         int ret;
477
478         if (lock->l_export == NULL) {
479                 /* We don't have a "waiting locks list" on clients. */
480                 CDEBUG(D_DLMTRACE, "Client lock %p : no-op\n", lock);
481                 return 0;
482         }
483
484         spin_lock_bh(&waiting_locks_spinlock);
485         ret = __ldlm_del_waiting_lock(lock);
486         spin_unlock_bh(&waiting_locks_spinlock);
487         if (ret)
488                 /* release lock ref if it has indeed been removed
489                  * from a list */
490                 LDLM_LOCK_PUT(lock);
491
492         return ret;
493 }
494
495 /*
496  * Prolong the lock
497  *
498  * Called with namespace lock held.
499  */
500 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
501 {
502         if (lock->l_export == NULL) {
503                 /* We don't have a "waiting locks list" on clients. */
504                 LDLM_DEBUG(lock, "client lock: no-op");
505                 return 0;
506         }
507
508         spin_lock_bh(&waiting_locks_spinlock);
509
510         if (list_empty(&lock->l_pending_chain)) {
511                 spin_unlock_bh(&waiting_locks_spinlock);
512                 LDLM_DEBUG(lock, "wasn't waiting");
513                 return 0;
514         }
515
516         /* we remove/add the lock to the waiting list, so no needs to
517          * release/take a lock reference */
518         __ldlm_del_waiting_lock(lock);
519         __ldlm_add_waiting_lock(lock, timeout);
520         spin_unlock_bh(&waiting_locks_spinlock);
521
522         LDLM_DEBUG(lock, "refreshed");
523         return 1;
524 }
525 #else /* !__KERNEL__ */
526
527 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
528 {
529         LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
530         RETURN(1);
531 }
532
533 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
534 {
535         RETURN(0);
536 }
537
538 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
539 {
540         RETURN(0);
541 }
542 #endif /* __KERNEL__ */
543
544 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
545                             const char *ast_type)
546 {
547         struct ptlrpc_connection *conn = lock->l_export->exp_connection;
548         char                     *str = libcfs_nid2str(conn->c_peer.nid);
549
550         LCONSOLE_ERROR_MSG(0x138, "%s: A client on nid %s was evicted due "
551                              "to a lock %s callback to %s timed out: rc %d\n",
552                              lock->l_export->exp_obd->obd_name, str,
553                              ast_type, obd_export_nid2str(lock->l_export), rc);
554
555         if (obd_dump_on_timeout)
556                 libcfs_debug_dumplog();
557 #ifdef __KERNEL__
558         spin_lock_bh(&waiting_locks_spinlock);
559         if (__ldlm_del_waiting_lock(lock) == 0)
560                 /* the lock was not in any list, grab an extra ref before adding
561                  * the lock to the expired list */
562                 LDLM_LOCK_GET(lock);
563         list_add(&lock->l_pending_chain, &expired_lock_thread.elt_expired_locks);
564         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
565         spin_unlock_bh(&waiting_locks_spinlock);
566 #else
567         class_fail_export(lock->l_export);
568 #endif
569 }
570
571 static int ldlm_handle_ast_error(struct ldlm_lock *lock,
572                                  struct ptlrpc_request *req, int rc,
573                                  const char *ast_type)
574 {
575         lnet_process_id_t peer = req->rq_import->imp_connection->c_peer;
576
577         if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
578                 LASSERT(lock->l_export);
579                 if (lock->l_export->exp_libclient) {
580                         LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
581                                    " timeout, just cancelling lock", ast_type,
582                                    libcfs_nid2str(peer.nid));
583                         ldlm_lock_cancel(lock);
584                         rc = -ERESTART;
585                 } else if (lock->l_flags & LDLM_FL_CANCEL) {
586                         LDLM_DEBUG(lock, "%s AST timeout from nid %s, but "
587                                    "cancel was received (AST reply lost?)",
588                                    ast_type, libcfs_nid2str(peer.nid));
589                         ldlm_lock_cancel(lock);
590                         rc = -ERESTART;
591                 } else {
592                         ldlm_del_waiting_lock(lock);
593                         ldlm_failed_ast(lock, rc, ast_type);
594                 }
595         } else if (rc) {
596                 if (rc == -EINVAL)
597                         LDLM_DEBUG(lock, "client (nid %s) returned %d"
598                                    " from %s AST - normal race",
599                                    libcfs_nid2str(peer.nid),
600                                    lustre_msg_get_status(req->rq_repmsg),
601                                    ast_type);
602                 else
603                         LDLM_ERROR(lock, "client (nid %s) returned %d "
604                                    "from %s AST", libcfs_nid2str(peer.nid),
605                                    (req->rq_repmsg != NULL) ?
606                                    lustre_msg_get_status(req->rq_repmsg) : 0,
607                                    ast_type);
608                 ldlm_lock_cancel(lock);
609                 /* Server-side AST functions are called from ldlm_reprocess_all,
610                  * which needs to be told to please restart its reprocessing. */
611                 rc = -ERESTART;
612         }
613
614         return rc;
615 }
616
617 static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc)
618 {
619         struct ldlm_cb_set_arg *arg;
620         struct ldlm_lock *lock;
621         ENTRY;
622
623         LASSERT(data != NULL);
624
625         arg = req->rq_async_args.pointer_arg[0];
626         lock = req->rq_async_args.pointer_arg[1];
627         LASSERT(lock != NULL);
628         if (rc != 0) {
629                 /* If client canceled the lock but the cancel has not
630                  * been recieved yet, we need to update lvbo to have the
631                  * proper attributes cached. */
632                 if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
633                         ldlm_res_lvbo_update(lock->l_resource, NULL,
634                                              0, 1);
635                 rc = ldlm_handle_ast_error(lock, req, rc,
636                                            arg->type == LDLM_BL_CALLBACK
637                                            ? "blocking" : "completion");
638         }
639
640         LDLM_LOCK_PUT(lock);
641
642         if (rc == -ERESTART)
643                 atomic_set(&arg->restart, 1);
644
645         RETURN(0);
646 }
647
648 static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
649                                           struct ldlm_cb_set_arg *arg,
650                                           struct ldlm_lock *lock,
651                                           int instant_cancel)
652 {
653         int rc = 0;
654         ENTRY;
655
656         if (unlikely(instant_cancel)) {
657                 rc = ptl_send_rpc(req, 1);
658                 ptlrpc_req_finished(req);
659                 if (rc == 0)
660                         /* If we cancelled the lock, we need to restart
661                          * ldlm_reprocess_queue */
662                         atomic_set(&arg->restart, 1);
663         } else {
664                 LDLM_LOCK_GET(lock);
665                 ptlrpc_set_add_req(arg->set, req);
666         }
667
668         RETURN(rc);
669 }
670
671 /**
672  * Check if there are requests in the export request list which prevent
673  * the lock canceling and make these requests high priority ones.
674  */
675 static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
676 {
677         struct ptlrpc_request *req;
678         ENTRY;
679
680         if (lock->l_export == NULL) {
681                 LDLM_DEBUG(lock, "client lock: no-op");
682                 RETURN_EXIT;
683         }
684
685         spin_lock(&lock->l_export->exp_lock);
686         list_for_each_entry(req, &lock->l_export->exp_queued_rpc, rq_exp_list) {
687                 if (!req->rq_hp && req->rq_ops->hpreq_lock_match &&
688                     req->rq_ops->hpreq_lock_match(req, lock))
689                         ptlrpc_hpreq_reorder(req);
690         }
691         spin_unlock(&lock->l_export->exp_lock);
692         EXIT;
693 }
694
695 /*
696  * ->l_blocking_ast() method for server-side locks. This is invoked when newly
697  * enqueued server lock conflicts with given one.
698  *
699  * Sends blocking ast rpc to the client owning that lock; arms timeout timer
700  * to wait for client response.
701  */
702 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
703                              struct ldlm_lock_desc *desc,
704                              void *data, int flag)
705 {
706         struct ldlm_cb_set_arg *arg = data;
707         struct ldlm_request *body;
708         struct ptlrpc_request *req;
709         __u32 size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
710                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };
711         int instant_cancel = 0, rc;
712         ENTRY;
713
714         if (flag == LDLM_CB_CANCELING) {
715                 /* Don't need to do anything here. */
716                 RETURN(0);
717         }
718
719         LASSERT(lock);
720         LASSERT(data != NULL);
721
722         ldlm_lock_reorder_req(lock);
723
724         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
725                               LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 2, size,
726                               NULL);
727         if (req == NULL)
728                 RETURN(-ENOMEM);
729
730         req->rq_async_args.pointer_arg[0] = arg;
731         req->rq_async_args.pointer_arg[1] = lock;
732         req->rq_interpret_reply = ldlm_cb_interpret;
733         req->rq_no_resend = 1;
734
735         lock_res(lock->l_resource);
736         if (lock->l_granted_mode != lock->l_req_mode) {
737                 /* this blocking AST will be communicated as part of the
738                  * completion AST instead */
739                 unlock_res(lock->l_resource);
740                 ptlrpc_req_finished(req);
741                 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
742                 RETURN(0);
743         }
744
745         if (lock->l_destroyed) {
746                 /* What's the point? */
747                 unlock_res(lock->l_resource);
748                 ptlrpc_req_finished(req);
749                 RETURN(0);
750         }
751
752 #if 0
753         if (CURRENT_SECONDS - lock->l_export->exp_last_request_time > 30){
754                 unlock_res(lock->l_resource);
755                 ptlrpc_req_finished(req);
756                 ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");
757                 RETURN(-ETIMEDOUT);
758         }
759 #endif
760
761         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
762                 instant_cancel = 1;
763
764         body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
765         body->lock_handle[0] = lock->l_remote_handle;
766         body->lock_desc = *desc;
767         body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
768
769         LDLM_DEBUG(lock, "server preparing blocking AST");
770
771         lock->l_last_activity = cfs_time_current_sec();
772
773         ptlrpc_req_set_repsize(req, 1, NULL);
774         if (instant_cancel) {
775                 unlock_res(lock->l_resource);
776                 ldlm_lock_cancel(lock);
777         } else {
778                 LASSERT(lock->l_granted_mode == lock->l_req_mode);
779                 ldlm_add_waiting_lock(lock);
780                 unlock_res(lock->l_resource);
781         }
782
783         req->rq_send_state = LUSTRE_IMP_FULL;
784         /* ptlrpc_prep_req already set timeout */
785         if (AT_OFF)
786                 req->rq_timeout = ldlm_get_rq_timeout();
787
788         if (lock->l_export && lock->l_export->exp_nid_stats &&
789             lock->l_export->exp_nid_stats->nid_ldlm_stats) {
790                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
791                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
792         }
793
794         rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
795
796         RETURN(rc);
797 }
798
799 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
800 {
801         struct ldlm_cb_set_arg *arg = data;
802         struct ldlm_request *body;
803         struct ptlrpc_request *req;
804         long total_enqueue_wait;
805         __u32 size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
806                         [DLM_LOCKREQ_OFF]     = sizeof(*body) };
807         int rc, buffers = 2, instant_cancel = 0;
808         ENTRY;
809
810         LASSERT(lock != NULL);
811         LASSERT(data != NULL);
812
813         total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(),
814                                           lock->l_last_activity);
815
816         lock_res_and_lock(lock);
817         if (lock->l_resource->lr_lvb_len) {
818                 size[DLM_REQ_REC_OFF] = lock->l_resource->lr_lvb_len;
819                 buffers = 3;
820         }
821         unlock_res_and_lock(lock);
822
823         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
824                               LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK, buffers,
825                               size, NULL);
826         if (req == NULL)
827                 RETURN(-ENOMEM);
828
829         req->rq_async_args.pointer_arg[0] = arg;
830         req->rq_async_args.pointer_arg[1] = lock;
831         req->rq_interpret_reply = ldlm_cb_interpret;
832         req->rq_no_resend = 1;
833
834         body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
835         body->lock_handle[0] = lock->l_remote_handle;
836         body->lock_flags = flags;
837         ldlm_lock2desc(lock, &body->lock_desc);
838
839         if (buffers == 3) {
840                 void *lvb;
841
842                 lvb = lustre_msg_buf(req->rq_reqmsg, DLM_REQ_REC_OFF,
843                                      lock->l_resource->lr_lvb_len);
844                 lock_res_and_lock(lock);
845                 memcpy(lvb, lock->l_resource->lr_lvb_data,
846                        lock->l_resource->lr_lvb_len);
847                 unlock_res_and_lock(lock);
848         }
849
850         LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
851                    total_enqueue_wait);
852
853         /* Server-side enqueue wait time estimate, used in
854             __ldlm_add_waiting_lock to set future enqueue timers */
855         if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
856                 at_measured(&lock->l_resource->lr_namespace->ns_at_estimate,
857                             total_enqueue_wait);
858         else
859                 /* bz18618. Don't add lock enqueue time we spend waiting for a
860                    previous callback to fail. Locks waiting legitimately will
861                    get extended by ldlm_refresh_waiting_lock regardless of the
862                    estimate, so it's okay to underestimate here. */
863                 LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. "
864                        "It is likely that a previous callback timed out.",
865                        total_enqueue_wait,
866                        at_get(&lock->l_resource->lr_namespace->ns_at_estimate));
867
868         ptlrpc_req_set_repsize(req, 1, NULL);
869
870         req->rq_send_state = LUSTRE_IMP_FULL;
871         /* ptlrpc_prep_req already set timeout */
872         if (AT_OFF)
873                 req->rq_timeout = ldlm_get_rq_timeout();
874
875         /* We only send real blocking ASTs after the lock is granted */
876         lock_res_and_lock(lock);
877         if (lock->l_flags & LDLM_FL_AST_SENT) {
878                 body->lock_flags |= LDLM_FL_AST_SENT;
879                 /* copy ast flags like LDLM_FL_DISCARD_DATA */
880                 body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
881
882                 /* We might get here prior to ldlm_handle_enqueue setting
883                  * LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
884                  * into waiting list, but this is safe and similar code in
885                  * ldlm_handle_enqueue will call ldlm_lock_cancel() still,
886                  * that would not only cancel the lock, but will also remove
887                  * it from waiting list */
888                 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
889                         unlock_res_and_lock(lock);
890                         ldlm_lock_cancel(lock);
891                         instant_cancel = 1;
892                         lock_res_and_lock(lock);
893                 } else {
894                         ldlm_add_waiting_lock(lock); /* start the lock-timeout
895                                                          clock */
896                 }
897         }
898         unlock_res_and_lock(lock);
899
900         if (lock->l_export && lock->l_export->exp_nid_stats &&
901             lock->l_export->exp_nid_stats->nid_ldlm_stats) {
902                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
903                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
904         }
905
906         rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
907
908         RETURN(rc);
909 }
910
911 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
912 {
913         struct ldlm_resource *res = lock->l_resource;
914         struct ldlm_request *body;
915         struct ptlrpc_request *req;
916         __u32 size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
917                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };
918         int rc = 0;
919         ENTRY;
920
921         LASSERT(lock != NULL && lock->l_export != NULL);
922
923         req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,
924                               LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK, 2, size,
925                               NULL);
926         if (req == NULL)
927                 RETURN(-ENOMEM);
928
929         body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
930         body->lock_handle[0] = lock->l_remote_handle;
931         ldlm_lock2desc(lock, &body->lock_desc);
932
933         lock_res_and_lock(lock);
934         size[REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
935         unlock_res_and_lock(lock);
936         res = lock->l_resource;
937         ptlrpc_req_set_repsize(req, 2, size);
938
939         req->rq_send_state = LUSTRE_IMP_FULL;
940         /* ptlrpc_prep_req already set timeout */
941         if (AT_OFF)
942                 req->rq_timeout = ldlm_get_rq_timeout();
943
944         if (lock->l_export && lock->l_export->exp_nid_stats &&
945             lock->l_export->exp_nid_stats->nid_ldlm_stats) {
946                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
947                                      LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
948         }
949
950         rc = ptlrpc_queue_wait(req);
951         if (rc == -ELDLM_NO_LOCK_DATA)
952                 LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
953         else if (rc != 0)
954                 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
955         else
956                 rc = ldlm_res_lvbo_update(res, req,
957                                           REPLY_REC_OFF, 1);
958         ptlrpc_req_finished(req);
959         if (rc == -ERESTART)
960                 ldlm_reprocess_all(res);
961
962         RETURN(rc);
963 }
964
965 static void ldlm_svc_get_eopc(struct ldlm_request *dlm_req,
966                        struct lprocfs_stats *srv_stats)
967 {
968         int lock_type = 0, op = 0;
969
970         lock_type = dlm_req->lock_desc.l_resource.lr_type;
971
972         switch (lock_type) {
973         case LDLM_PLAIN:
974                 op = PTLRPC_LAST_CNTR + LDLM_PLAIN_ENQUEUE;
975                 break;
976         case LDLM_EXTENT:
977                 if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT)
978                         op = PTLRPC_LAST_CNTR + LDLM_GLIMPSE_ENQUEUE;
979                 else
980                         op = PTLRPC_LAST_CNTR + LDLM_EXTENT_ENQUEUE;
981                 break;
982         case LDLM_FLOCK:
983                 op = PTLRPC_LAST_CNTR + LDLM_FLOCK_ENQUEUE;
984                 break;
985         case LDLM_IBITS:
986                 op = PTLRPC_LAST_CNTR + LDLM_IBITS_ENQUEUE;
987                 break;
988         default:
989                 op = 0;
990                 break;
991         }
992
993         if (op)
994                 lprocfs_counter_incr(srv_stats, op);
995
996         return ;
997 }
998
999 /*
1000  * Main server-side entry point into LDLM. This is called by ptlrpc service
1001  * threads to carry out client lock enqueueing requests.
1002  */
1003 int ldlm_handle_enqueue(struct ptlrpc_request *req,
1004                         ldlm_completion_callback completion_callback,
1005                         ldlm_blocking_callback blocking_callback,
1006                         ldlm_glimpse_callback glimpse_callback)
1007 {
1008         struct obd_device *obddev = req->rq_export->exp_obd;
1009         struct ldlm_reply *dlm_rep;
1010         struct ldlm_request *dlm_req;
1011         __u32 size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
1012                         [DLM_LOCKREPLY_OFF]   = sizeof(*dlm_rep) };
1013         int rc = 0;
1014         __u32 flags;
1015         ldlm_error_t err = ELDLM_OK;
1016         struct ldlm_lock *lock = NULL;
1017         void *cookie = NULL;
1018         ENTRY;
1019
1020         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
1021
1022         dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1023                                      lustre_swab_ldlm_request);
1024         if (dlm_req == NULL) {
1025                 CERROR ("Can't unpack dlm_req\n");
1026                 GOTO(out, rc = -EFAULT);
1027         }
1028
1029         ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF);
1030         flags = dlm_req->lock_flags;
1031
1032         LASSERT(req->rq_export);
1033
1034         if (req->rq_rqbd->rqbd_service->srv_stats)
1035                 ldlm_svc_get_eopc(dlm_req,
1036                                   req->rq_rqbd->rqbd_service->srv_stats);
1037
1038         if (req->rq_export && req->rq_export->exp_nid_stats &&
1039             req->rq_export->exp_nid_stats->nid_ldlm_stats) {
1040                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1041                                      LDLM_ENQUEUE - LDLM_FIRST_OPC);
1042         }
1043
1044         if (dlm_req->lock_desc.l_resource.lr_type < LDLM_MIN_TYPE ||
1045             dlm_req->lock_desc.l_resource.lr_type >= LDLM_MAX_TYPE) {
1046                 DEBUG_REQ(D_ERROR, req, "invalid lock request type %d",
1047                           dlm_req->lock_desc.l_resource.lr_type);
1048                 GOTO(out, rc = -EFAULT);
1049         }
1050
1051         if (dlm_req->lock_desc.l_req_mode <= LCK_MINMODE ||
1052             dlm_req->lock_desc.l_req_mode >= LCK_MAXMODE ||
1053             dlm_req->lock_desc.l_req_mode & (dlm_req->lock_desc.l_req_mode-1)) {
1054                 DEBUG_REQ(D_ERROR, req, "invalid lock request mode %d",
1055                           dlm_req->lock_desc.l_req_mode);
1056                 GOTO(out, rc = -EFAULT);
1057         }
1058
1059         if (req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) {
1060                 if (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN) {
1061                         DEBUG_REQ(D_ERROR, req,
1062                                   "PLAIN lock request from IBITS client?");
1063                         GOTO(out, rc = -EPROTO);
1064                 }
1065         } else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) {
1066                 DEBUG_REQ(D_ERROR, req,
1067                           "IBITS lock request from unaware client?");
1068                 GOTO(out, rc = -EPROTO);
1069         }
1070
1071 #if 0
1072         /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check
1073            against server's _CONNECT_SUPPORTED flags? (I don't want to use
1074            ibits for mgc/mgs) */
1075
1076         /* INODEBITS_INTEROP: Perform conversion from plain lock to
1077          * inodebits lock if client does not support them. */
1078         if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) &&
1079             (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) {
1080                 dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS;
1081                 dlm_req->lock_desc.l_policy_data.l_inodebits.bits =
1082                         MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1083                 if (dlm_req->lock_desc.l_req_mode == LCK_PR)
1084                         dlm_req->lock_desc.l_req_mode = LCK_CR;
1085         }
1086 #endif
1087
1088         if (flags & LDLM_FL_REPLAY) {
1089                 /* Find an existing lock in the per-export lock hash */
1090                 lock = lustre_hash_lookup(req->rq_export->exp_lock_hash,
1091                                           (void *)&dlm_req->lock_handle[0]);
1092                 if (lock != NULL) {
1093                         DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
1094                                   LPX64, lock->l_handle.h_cookie);
1095                         GOTO(existing_lock, rc = 0);
1096                 }
1097         }
1098
1099         /* The lock's callback data might be set in the policy function */
1100         lock = ldlm_lock_create(obddev->obd_namespace,
1101                                 dlm_req->lock_desc.l_resource.lr_name,
1102                                 dlm_req->lock_desc.l_resource.lr_type,
1103                                 dlm_req->lock_desc.l_req_mode,
1104                                 blocking_callback, completion_callback,
1105                                 glimpse_callback, NULL, 0);
1106         if (!lock)
1107                 GOTO(out, rc = -ENOMEM);
1108
1109         lock->l_last_activity = cfs_time_current_sec();
1110         lock->l_remote_handle = dlm_req->lock_handle[0];
1111         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
1112
1113         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
1114         /* Don't enqueue a lock onto the export if it has already
1115          * been evicted.  Cancel it now instead. (bug 3822) */
1116         if (req->rq_export->exp_failed) {
1117                 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
1118                 GOTO(out, rc = -ENOTCONN);
1119         }
1120         lock->l_export = class_export_get(req->rq_export);
1121
1122         if (lock->l_export->exp_lock_hash)
1123                 lustre_hash_add(lock->l_export->exp_lock_hash,
1124                                 &lock->l_remote_handle, &lock->l_exp_hash);
1125
1126 existing_lock:
1127
1128         if (flags & LDLM_FL_HAS_INTENT) {
1129                 /* In this case, the reply buffer is allocated deep in
1130                  * local_lock_enqueue by the policy function. */
1131                 cookie = req;
1132         } else {
1133                 int buffers = 2;
1134
1135                 lock_res_and_lock(lock);
1136                 if (lock->l_resource->lr_lvb_len) {
1137                         size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
1138                         buffers = 3;
1139                 }
1140                 unlock_res_and_lock(lock);
1141
1142                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
1143                         GOTO(out, rc = -ENOMEM);
1144
1145                 rc = lustre_pack_reply(req, buffers, size, NULL);
1146                 if (rc)
1147                         GOTO(out, rc);
1148         }
1149
1150         if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
1151                 lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
1152         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
1153                 lock->l_req_extent = lock->l_policy_data.l_extent;
1154
1155         err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, (int *)&flags);
1156         if (err)
1157                 GOTO(out, err);
1158
1159         dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
1160                                  sizeof(*dlm_rep));
1161         dlm_rep->lock_flags = flags;
1162
1163         ldlm_lock2desc(lock, &dlm_rep->lock_desc);
1164         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
1165
1166         /* We never send a blocking AST until the lock is granted, but
1167          * we can tell it right now */
1168         lock_res_and_lock(lock);
1169
1170         /* Now take into account flags to be inherited from original lock
1171            request both in reply to client and in our own lock flags. */
1172         dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
1173         lock->l_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
1174
1175         /* Don't move a pending lock onto the export if it has already
1176          * been evicted.  Cancel it now instead. (bug 5683) */
1177         if (req->rq_export->exp_failed ||
1178             OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT)) {
1179                 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
1180                 rc = -ENOTCONN;
1181         } else if (lock->l_flags & LDLM_FL_AST_SENT) {
1182                 dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
1183                 if (lock->l_granted_mode == lock->l_req_mode) {
1184                         /* Only cancel lock if it was granted, because it
1185                          * would be destroyed immediatelly and would never
1186                          * be granted in the future, causing timeouts on client.
1187                          * Not granted lock will be cancelled immediatelly after
1188                          * sending completion AST.
1189                          */
1190                         if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) {
1191                                 unlock_res_and_lock(lock);
1192                                 ldlm_lock_cancel(lock);
1193                                 lock_res_and_lock(lock);
1194                         } else
1195                                 ldlm_add_waiting_lock(lock);
1196                 }
1197         }
1198         /* Make sure we never ever grant usual metadata locks to liblustre
1199            clients */
1200         if ((dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN ||
1201             dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) &&
1202              req->rq_export->exp_libclient) {
1203                 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) ||
1204                     !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
1205                         CERROR("Granting sync lock to libclient. "
1206                                "req fl %d, rep fl %d, lock fl "LPX64"\n",
1207                                dlm_req->lock_flags, dlm_rep->lock_flags,
1208                                lock->l_flags);
1209                         LDLM_ERROR(lock, "sync lock");
1210                         if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) {
1211                                 struct ldlm_intent *it;
1212                                 it = lustre_msg_buf(req->rq_reqmsg,
1213                                                     DLM_INTENT_IT_OFF,
1214                                                     sizeof(*it));
1215                                 if (it != NULL) {
1216                                         CERROR("This is intent %s ("LPU64")\n",
1217                                                ldlm_it2str(it->opc), it->opc);
1218                                 }
1219                         }
1220                 }
1221         }
1222
1223         unlock_res_and_lock(lock);
1224
1225         EXIT;
1226  out:
1227         req->rq_status = rc ?: err;  /* return either error - bug 11190 */
1228         if (!req->rq_packed_final) {
1229                 err = lustre_pack_reply(req, 1, NULL, NULL);
1230                 if (rc == 0)
1231                         rc = err;
1232         }
1233
1234         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
1235          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
1236         if (lock) {
1237                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
1238                            "(err=%d, rc=%d)", err, rc);
1239
1240                 if (rc == 0 && obddev->obd_fail)
1241                         rc = -ENOTCONN;
1242
1243                 if (rc == 0) {
1244                         lock_res_and_lock(lock);
1245                         size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
1246                         if (size[DLM_REPLY_REC_OFF] > 0) {
1247                                 void *lvb = lustre_msg_buf(req->rq_repmsg,
1248                                                        DLM_REPLY_REC_OFF,
1249                                                        size[DLM_REPLY_REC_OFF]);
1250                                 LASSERTF(lvb != NULL, "req %p, lock %p\n",
1251                                          req, lock);
1252
1253                                 memcpy(lvb, lock->l_resource->lr_lvb_data,
1254                                        size[DLM_REPLY_REC_OFF]);
1255                         }
1256                         unlock_res_and_lock(lock);
1257                 } else {
1258                         lock_res_and_lock(lock);
1259                         ldlm_resource_unlink_lock(lock);
1260                         ldlm_lock_destroy_nolock(lock);
1261                         unlock_res_and_lock(lock);
1262                 }
1263
1264                 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
1265                         ldlm_reprocess_all(lock->l_resource);
1266
1267                 LDLM_LOCK_PUT(lock);
1268         }
1269
1270         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
1271                           lock, rc);
1272
1273         return rc;
1274 }
1275
1276 int ldlm_handle_convert(struct ptlrpc_request *req)
1277 {
1278         struct ldlm_request *dlm_req;
1279         struct ldlm_reply *dlm_rep;
1280         struct ldlm_lock *lock;
1281         int rc;
1282         __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
1283                         [DLM_LOCKREPLY_OFF]   = sizeof(*dlm_rep) };
1284         ENTRY;
1285
1286         dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1287                                      lustre_swab_ldlm_request);
1288         if (dlm_req == NULL) {
1289                 CERROR ("Can't unpack dlm_req\n");
1290                 RETURN (-EFAULT);
1291         }
1292
1293         if (req->rq_export && req->rq_export->exp_nid_stats &&
1294             req->rq_export->exp_nid_stats->nid_ldlm_stats) {
1295                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1296                                      LDLM_CONVERT - LDLM_FIRST_OPC);
1297         }
1298
1299         rc = lustre_pack_reply(req, 2, size, NULL);
1300         if (rc)
1301                 RETURN(rc);
1302
1303         dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
1304                                  sizeof(*dlm_rep));
1305         dlm_rep->lock_flags = dlm_req->lock_flags;
1306
1307         lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
1308         if (!lock) {
1309                 req->rq_status = EINVAL;
1310         } else {
1311                 void *res = NULL;
1312
1313                 LDLM_DEBUG(lock, "server-side convert handler START");
1314
1315                 lock->l_last_activity = cfs_time_current_sec();
1316                 res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
1317                                         &dlm_rep->lock_flags);
1318                 if (res) {
1319                         if (ldlm_del_waiting_lock(lock))
1320                                 LDLM_DEBUG(lock, "converted waiting lock");
1321                         req->rq_status = 0;
1322                 } else {
1323                         req->rq_status = EDEADLOCK;
1324                 }
1325         }
1326
1327         if (lock) {
1328                 if (!req->rq_status)
1329                         ldlm_reprocess_all(lock->l_resource);
1330                 LDLM_DEBUG(lock, "server-side convert handler END");
1331                 LDLM_LOCK_PUT(lock);
1332         } else
1333                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
1334
1335         RETURN(0);
1336 }
1337
1338 /* Cancel all the locks whos handles are packed into ldlm_request */
1339 int ldlm_request_cancel(struct ptlrpc_request *req,
1340                         struct ldlm_request *dlm_req, int first)
1341 {
1342         struct ldlm_resource *res, *pres = NULL;
1343         struct ldlm_lock *lock;
1344         int i, count, done = 0;
1345         ENTRY;
1346
1347         count = dlm_req->lock_count ? dlm_req->lock_count : 1;
1348         if (first >= count)
1349                 RETURN(0);
1350
1351         /* There is no lock on the server at the replay time,
1352          * skip lock cancelling to make replay tests to pass. */
1353         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1354                 RETURN(0);
1355
1356         for (i = first; i < count; i++) {
1357                 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
1358                 if (!lock) {
1359                         LDLM_DEBUG_NOLOCK("server-side cancel handler stale "
1360                                           "lock (cookie "LPU64")",
1361                                           dlm_req->lock_handle[i].cookie);
1362                         continue;
1363                 }
1364
1365                 done++;
1366                 res = lock->l_resource;
1367                 if (res != pres) {
1368                         if (pres != NULL) {
1369                                 ldlm_reprocess_all(pres);
1370                                 ldlm_resource_putref(pres);
1371                         }
1372                         if (res != NULL) {
1373                                 ldlm_resource_getref(res);
1374                                 ldlm_res_lvbo_update(res, NULL, 0, 1);
1375                         }
1376                         pres = res;
1377                 }
1378                 ldlm_lock_cancel(lock);
1379                 LDLM_LOCK_PUT(lock);
1380         }
1381         if (pres != NULL) {
1382                 ldlm_reprocess_all(pres);
1383                 ldlm_resource_putref(pres);
1384         }
1385         RETURN(done);
1386 }
1387
1388 int ldlm_handle_cancel(struct ptlrpc_request *req)
1389 {
1390         struct ldlm_request *dlm_req;
1391         int rc;
1392         ENTRY;
1393
1394         dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1395                                      lustre_swab_ldlm_request);
1396         if (dlm_req == NULL) {
1397                 CERROR("bad request buffer for cancel\n");
1398                 RETURN(-EFAULT);
1399         }
1400
1401         if (req->rq_export && req->rq_export->exp_nid_stats &&
1402             req->rq_export->exp_nid_stats->nid_ldlm_stats) {
1403                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1404                                      LDLM_CANCEL - LDLM_FIRST_OPC);
1405         }
1406
1407         rc = lustre_pack_reply(req, 1, NULL, NULL);
1408         if (rc)
1409                 RETURN(rc);
1410
1411         if (!ldlm_request_cancel(req, dlm_req, 0))
1412                 req->rq_status = ESTALE;
1413
1414         if (ptlrpc_reply(req) != 0)
1415                 LBUG();
1416
1417         RETURN(0);
1418 }
1419
1420 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
1421                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
1422 {
1423         int do_ast;
1424         ENTRY;
1425
1426         LDLM_DEBUG(lock, "client blocking AST callback handler");
1427
1428         lock_res_and_lock(lock);
1429         lock->l_flags |= LDLM_FL_CBPENDING;
1430
1431         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
1432                 lock->l_flags |= LDLM_FL_CANCEL;
1433
1434         do_ast = (!lock->l_readers && !lock->l_writers);
1435         unlock_res_and_lock(lock);
1436
1437         if (do_ast) {
1438                 CDEBUG(D_DLMTRACE, "Lock %p is already unused, calling callback (%p)\n",
1439                        lock, lock->l_blocking_ast);
1440                 if (lock->l_blocking_ast != NULL)
1441                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
1442                                              LDLM_CB_BLOCKING);
1443         } else {
1444                 CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
1445                        lock);
1446         }
1447
1448         LDLM_LOCK_PUT(lock);
1449         EXIT;
1450 }
1451
1452 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
1453                                     struct ldlm_namespace *ns,
1454                                     struct ldlm_request *dlm_req,
1455                                     struct ldlm_lock *lock)
1456 {
1457         CFS_LIST_HEAD(ast_list);
1458         ENTRY;
1459
1460         LDLM_DEBUG(lock, "client completion callback handler START");
1461
1462         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
1463                 int to = cfs_time_seconds(1);
1464                 while (to > 0) {
1465                         to = schedule_timeout(to);
1466                         if (lock->l_granted_mode == lock->l_req_mode ||
1467                             lock->l_destroyed)
1468                                 break;
1469                 }
1470         }
1471
1472         lock_res_and_lock(lock);
1473         if (lock->l_destroyed ||
1474             lock->l_granted_mode == lock->l_req_mode) {
1475                 /* bug 11300: the lock has already been granted */
1476                 unlock_res_and_lock(lock);
1477                 LDLM_DEBUG(lock, "Double grant race happened");
1478                 LDLM_LOCK_PUT(lock);
1479                 EXIT;
1480                 return;
1481         }
1482
1483         /* If we receive the completion AST before the actual enqueue returned,
1484          * then we might need to switch lock modes, resources, or extents. */
1485         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
1486                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
1487                 LDLM_DEBUG(lock, "completion AST, new lock mode");
1488         }
1489
1490         if (lock->l_resource->lr_type != LDLM_PLAIN) {
1491                 lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
1492                 LDLM_DEBUG(lock, "completion AST, new policy data");
1493         }
1494
1495         ldlm_resource_unlink_lock(lock);
1496         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
1497                    &lock->l_resource->lr_name,
1498                    sizeof(lock->l_resource->lr_name)) != 0) {
1499                 unlock_res_and_lock(lock);
1500                 if (ldlm_lock_change_resource(ns, lock,
1501                                 dlm_req->lock_desc.l_resource.lr_name)) {
1502                         LDLM_ERROR(lock, "Failed to allocate resource");
1503                         LDLM_LOCK_PUT(lock);
1504                         EXIT;
1505                         return;
1506                 }
1507                 LDLM_DEBUG(lock, "completion AST, new resource");
1508                 CERROR("change resource!\n");
1509                 lock_res_and_lock(lock);
1510         }
1511
1512         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
1513                 /* BL_AST locks are not needed in lru.
1514                  * let ldlm_cancel_lru() be fast. */
1515                 ldlm_lock_remove_from_lru(lock);
1516                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
1517                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
1518         }
1519
1520         if (lock->l_lvb_len) {
1521                 void *lvb;
1522                 lvb = lustre_swab_reqbuf(req, DLM_REQ_REC_OFF, lock->l_lvb_len,
1523                                          lock->l_lvb_swabber);
1524                 if (lvb == NULL) {
1525                         LDLM_ERROR(lock, "completion AST did not contain "
1526                                    "expected LVB!");
1527                 } else {
1528                         memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
1529                 }
1530         }
1531
1532         ldlm_grant_lock(lock, &ast_list);
1533         unlock_res_and_lock(lock);
1534
1535         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
1536
1537         ldlm_run_cp_ast_work(&ast_list);
1538
1539         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
1540                           lock);
1541         LDLM_LOCK_PUT(lock);
1542         EXIT;
1543 }
1544
1545 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
1546                                     struct ldlm_namespace *ns,
1547                                     struct ldlm_request *dlm_req,
1548                                     struct ldlm_lock *lock)
1549 {
1550         int rc = -ENOSYS;
1551         ENTRY;
1552
1553         LDLM_DEBUG(lock, "client glimpse AST callback handler");
1554
1555         if (lock->l_glimpse_ast != NULL)
1556                 rc = lock->l_glimpse_ast(lock, req);
1557
1558         if (req->rq_repmsg != NULL) {
1559                 ptlrpc_reply(req);
1560         } else {
1561                 req->rq_status = rc;
1562                 ptlrpc_error(req);
1563         }
1564
1565         lock_res_and_lock(lock);
1566         if (lock->l_granted_mode == LCK_PW &&
1567             !lock->l_readers && !lock->l_writers &&
1568             cfs_time_after(cfs_time_current(),
1569                            cfs_time_add(lock->l_last_used,
1570                                         cfs_time_seconds(10)))) {
1571                 unlock_res_and_lock(lock);
1572                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
1573                         ldlm_handle_bl_callback(ns, NULL, lock);
1574
1575                 EXIT;
1576                 return;
1577         }
1578         unlock_res_and_lock(lock);
1579         LDLM_LOCK_PUT(lock);
1580         EXIT;
1581 }
1582
1583 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
1584 {
1585         req->rq_status = rc;
1586         if (!req->rq_packed_final) {
1587                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1588                 if (rc)
1589                         return rc;
1590         }
1591         return ptlrpc_reply(req);
1592 }
1593
1594 #ifdef __KERNEL__
1595 static int __ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_bl_work_item *blwi,
1596                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
1597                              struct list_head *cancels, int count, int mode)
1598 {
1599         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1600         ENTRY;
1601
1602         if (cancels && count == 0) {
1603                 if (mode == LDLM_ASYNC)
1604                         OBD_FREE(blwi, sizeof(*blwi));
1605                 RETURN(0);
1606         }
1607
1608         init_completion(&blwi->blwi_comp);
1609         atomic_set(&blwi->blwi_ref_count, 1);
1610
1611         blwi->blwi_ns = ns;
1612         if (ld != NULL)
1613                 blwi->blwi_ld = *ld;
1614         if (count) {
1615                 list_add(&blwi->blwi_head, cancels);
1616                 list_del_init(cancels);
1617                 blwi->blwi_count = count;
1618         } else {
1619                 blwi->blwi_lock = lock;
1620         }
1621
1622         spin_lock(&blp->blp_lock);
1623         if (lock && lock->l_flags & LDLM_FL_DISCARD_DATA) {
1624                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
1625                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
1626         } else {
1627                 /* other blocking callbacks are added to the regular list */
1628                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
1629         }
1630         spin_unlock(&blp->blp_lock);
1631
1632         if (mode == LDLM_SYNC) {
1633                 /* keep ref count as object is on this stack for SYNC call */
1634                 ldlm_bl_work_item_get(blwi);
1635                 cfs_waitq_signal(&blp->blp_waitq);
1636                 wait_for_completion(&blwi->blwi_comp);
1637         } else {
1638                 cfs_waitq_signal(&blp->blp_waitq);
1639         }
1640
1641         RETURN(0);
1642 }
1643
1644 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
1645                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
1646                              struct list_head *cancels, int count, int mode)
1647 {
1648         ENTRY;
1649
1650         if (mode == LDLM_SYNC) {
1651                 /* if it is synchronous call do minimum mem alloc, as it could
1652                  * be triggered from kernel shrinker
1653                  */
1654                 struct ldlm_bl_work_item blwi;
1655                 memset(&blwi, 0, sizeof(blwi));
1656                 /* have extra ref as this obj is on stack */
1657                 RETURN(__ldlm_bl_to_thread(ns, &blwi, ld, lock, cancels, count, mode));
1658         } else {
1659                 struct ldlm_bl_work_item *blwi;
1660                 OBD_ALLOC(blwi, sizeof(*blwi));
1661                 if (blwi == NULL)
1662                         RETURN(-ENOMEM);
1663
1664                 RETURN(__ldlm_bl_to_thread(ns, blwi, ld, lock, cancels, count, mode));
1665         }
1666 }
1667 #endif
1668
1669 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
1670                            struct ldlm_lock *lock)
1671 {
1672 #ifdef __KERNEL__
1673         RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
1674 #else
1675         RETURN(-ENOSYS);
1676 #endif
1677 }
1678
1679 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
1680                            struct list_head *cancels, int count, int mode)
1681 {
1682 #ifdef __KERNEL__
1683         RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
1684 #else
1685         RETURN(-ENOSYS);
1686 #endif
1687 }
1688
1689 static int ldlm_callback_handler(struct ptlrpc_request *req)
1690 {
1691         struct ldlm_namespace *ns;
1692         struct ldlm_request *dlm_req;
1693         struct ldlm_lock *lock;
1694         int rc;
1695         ENTRY;
1696
1697         /* Requests arrive in sender's byte order.  The ptlrpc service
1698          * handler has already checked and, if necessary, byte-swapped the
1699          * incoming request message body, but I am responsible for the
1700          * message buffers. */
1701
1702         if (req->rq_export == NULL) {
1703                 ldlm_callback_reply(req, -ENOTCONN);
1704                 RETURN(0);
1705         }
1706
1707         LASSERT(req->rq_export != NULL);
1708         LASSERT(req->rq_export->exp_obd != NULL);
1709
1710         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1711         case LDLM_BL_CALLBACK:
1712                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1713                 break;
1714         case LDLM_CP_CALLBACK:
1715                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
1716                 break;
1717         case LDLM_GL_CALLBACK:
1718                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_GL_CALLBACK, 0);
1719                 break;
1720         case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
1721                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1722                 rc = llog_origin_handle_cancel(req);
1723                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_REP, 0);
1724                 ldlm_callback_reply(req, rc);
1725                 RETURN(0);
1726         case OBD_QC_CALLBACK:
1727                 OBD_FAIL_RETURN(OBD_FAIL_OBD_QC_CALLBACK_NET, 0);
1728                 rc = target_handle_qc_callback(req);
1729                 ldlm_callback_reply(req, rc);
1730                 RETURN(0);
1731         case QUOTA_DQACQ:
1732         case QUOTA_DQREL:
1733                 /* reply in handler */
1734                 rc = target_handle_dqacq_callback(req);
1735                 RETURN(0);
1736         case LLOG_ORIGIN_HANDLE_CREATE:
1737                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1738                 rc = llog_origin_handle_create(req);
1739                 ldlm_callback_reply(req, rc);
1740                 RETURN(0);
1741         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1742                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1743                 rc = llog_origin_handle_next_block(req);
1744                 ldlm_callback_reply(req, rc);
1745                 RETURN(0);
1746         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1747                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1748                 rc = llog_origin_handle_read_header(req);
1749                 ldlm_callback_reply(req, rc);
1750                 RETURN(0);
1751         case LLOG_ORIGIN_HANDLE_CLOSE:
1752                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1753                 rc = llog_origin_handle_close(req);
1754                 ldlm_callback_reply(req, rc);
1755                 RETURN(0);
1756         default:
1757                 CERROR("unknown opcode %u\n",
1758                        lustre_msg_get_opc(req->rq_reqmsg));
1759                 ldlm_callback_reply(req, -EPROTO);
1760                 RETURN(0);
1761         }
1762
1763         ns = req->rq_export->exp_obd->obd_namespace;
1764         LASSERT(ns != NULL);
1765
1766         dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
1767                                      lustre_swab_ldlm_request);
1768         if (dlm_req == NULL) {
1769                 CERROR ("can't unpack dlm_req\n");
1770                 ldlm_callback_reply(req, -EPROTO);
1771                 RETURN (0);
1772         }
1773
1774         /* Force a known safe race, send a cancel to the server for a lock
1775          * which the server has already started a blocking callback on. */
1776         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
1777             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
1778                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0]);
1779                 if (rc < 0)
1780                         CERROR("ldlm_cli_cancel: %d\n", rc);
1781         }
1782
1783         lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle[0]);
1784         if (!lock) {
1785                 CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
1786                        "disappeared\n", dlm_req->lock_handle[0].cookie);
1787                 ldlm_callback_reply(req, -EINVAL);
1788                 RETURN(0);
1789         }
1790
1791         if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
1792             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
1793                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
1794
1795         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
1796         lock_res_and_lock(lock);
1797         lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
1798         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
1799                 /* If somebody cancels lock and cache is already droped,
1800                  * or lock is failed before cp_ast received on client,
1801                  * we can tell the server we have no lock. Otherwise, we
1802                  * should send cancel after dropping the cache. */
1803                 if (((lock->l_flags & LDLM_FL_CANCELING) &&
1804                     (lock->l_flags & LDLM_FL_BL_DONE)) ||
1805                     (lock->l_flags & LDLM_FL_FAILED)) {
1806                         LDLM_DEBUG(lock, "callback on lock "
1807                                    LPX64" - lock disappeared\n",
1808                                    dlm_req->lock_handle[0].cookie);
1809                         unlock_res_and_lock(lock);
1810                         LDLM_LOCK_PUT(lock);
1811                         ldlm_callback_reply(req, -EINVAL);
1812                         RETURN(0);
1813                 }
1814                 /* BL_AST locks are not needed in lru.
1815                  * let ldlm_cancel_lru() be fast. */
1816                 ldlm_lock_remove_from_lru(lock);
1817                 lock->l_flags |= LDLM_FL_BL_AST;
1818         }
1819         unlock_res_and_lock(lock);
1820
1821         /* We want the ost thread to get this reply so that it can respond
1822          * to ost requests (write cache writeback) that might be triggered
1823          * in the callback.
1824          *
1825          * But we'd also like to be able to indicate in the reply that we're
1826          * cancelling right now, because it's unused, or have an intent result
1827          * in the reply, so we might have to push the responsibility for sending
1828          * the reply down into the AST handlers, alas. */
1829
1830         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1831         case LDLM_BL_CALLBACK:
1832                 CDEBUG(D_INODE, "blocking ast\n");
1833                 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
1834                         ldlm_callback_reply(req, 0);
1835                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
1836                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
1837                 break;
1838         case LDLM_CP_CALLBACK:
1839                 CDEBUG(D_INODE, "completion ast\n");
1840                 ldlm_callback_reply(req, 0);
1841                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
1842                 break;
1843         case LDLM_GL_CALLBACK:
1844                 CDEBUG(D_INODE, "glimpse ast\n");
1845                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
1846                 break;
1847         default:
1848                 LBUG();                         /* checked above */
1849         }
1850
1851         RETURN(0);
1852 }
1853
1854 static int ldlm_cancel_handler(struct ptlrpc_request *req)
1855 {
1856         int rc;
1857         ENTRY;
1858
1859         /* Requests arrive in sender's byte order.  The ptlrpc service
1860          * handler has already checked and, if necessary, byte-swapped the
1861          * incoming request message body, but I am responsible for the
1862          * message buffers. */
1863
1864         if (req->rq_export == NULL) {
1865                 struct ldlm_request *dlm_req;
1866
1867                 CERROR("operation %d from %s with bad export cookie "LPU64"\n",
1868                        lustre_msg_get_opc(req->rq_reqmsg),
1869                        libcfs_id2str(req->rq_peer),
1870                        lustre_msg_get_handle(req->rq_reqmsg)->cookie);
1871
1872                 if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
1873                         dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF,
1874                                                      sizeof(*dlm_req),
1875                                                      lustre_swab_ldlm_request);
1876                         if (dlm_req != NULL)
1877                                 ldlm_lock_dump_handle(D_ERROR,
1878                                                       &dlm_req->lock_handle[0]);
1879                 }
1880
1881                 ldlm_callback_reply(req, -ENOTCONN);
1882                 RETURN(0);
1883         }
1884
1885         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1886
1887         /* XXX FIXME move this back to mds/handler.c, bug 249 */
1888         case LDLM_CANCEL:
1889                 CDEBUG(D_INODE, "cancel\n");
1890                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
1891                 rc = ldlm_handle_cancel(req);
1892                 if (rc)
1893                         break;
1894                 RETURN(0);
1895         case OBD_LOG_CANCEL:
1896                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1897                 rc = llog_origin_handle_cancel(req);
1898                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_REP, 0);
1899                 ldlm_callback_reply(req, rc);
1900                 RETURN(0);
1901         default:
1902                 CERROR("invalid opcode %d\n",
1903                        lustre_msg_get_opc(req->rq_reqmsg));
1904                 ldlm_callback_reply(req, -EINVAL);
1905         }
1906
1907         RETURN(0);
1908 }
1909
1910 #ifdef __KERNEL__
1911 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
1912 {
1913         struct ldlm_bl_work_item *blwi = NULL;
1914         static unsigned int num_bl = 0;
1915
1916         spin_lock(&blp->blp_lock);
1917         /* process a request from the blp_list at least every blp_num_threads */
1918         if (!list_empty(&blp->blp_list) &&
1919             (list_empty(&blp->blp_prio_list) || num_bl == 0))
1920                 blwi = list_entry(blp->blp_list.next,
1921                                   struct ldlm_bl_work_item, blwi_entry);
1922         else
1923                 if (!list_empty(&blp->blp_prio_list))
1924                         blwi = list_entry(blp->blp_prio_list.next,
1925                                           struct ldlm_bl_work_item, blwi_entry);
1926
1927         if (blwi) {
1928                 if (++num_bl >= atomic_read(&blp->blp_num_threads))
1929                         num_bl = 0;
1930                 list_del(&blwi->blwi_entry);
1931         }
1932         spin_unlock(&blp->blp_lock);
1933
1934         return blwi;
1935 }
1936
1937 /* This only contains temporary data until the thread starts */
1938 struct ldlm_bl_thread_data {
1939         char                    bltd_name[CFS_CURPROC_COMM_MAX];
1940         struct ldlm_bl_pool     *bltd_blp;
1941         struct completion       bltd_comp;
1942         int                     bltd_num;
1943 };
1944
1945 static int ldlm_bl_thread_main(void *arg);
1946
1947 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
1948 {
1949         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
1950         int rc;
1951
1952         init_completion(&bltd.bltd_comp);
1953         rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
1954         if (rc < 0) {
1955                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
1956                        atomic_read(&blp->blp_num_threads), rc);
1957                 return rc;
1958         }
1959         wait_for_completion(&bltd.bltd_comp);
1960
1961         return 0;
1962 }
1963
1964 static int ldlm_bl_thread_main(void *arg)
1965 {
1966         struct ldlm_bl_pool *blp;
1967         ENTRY;
1968
1969         {
1970                 struct ldlm_bl_thread_data *bltd = arg;
1971
1972                 blp = bltd->bltd_blp;
1973
1974                 bltd->bltd_num = atomic_inc_return(&blp->blp_num_threads) - 1;
1975                 atomic_inc(&blp->blp_busy_threads);
1976
1977                 snprintf(bltd->bltd_name, sizeof(bltd->bltd_name) - 1,
1978                         "ldlm_bl_%02d", bltd->bltd_num);
1979                 cfs_daemonize(bltd->bltd_name);
1980
1981                 complete(&bltd->bltd_comp);
1982                 /* cannot use bltd after this, it is only on caller's stack */
1983         }
1984
1985         while (1) {
1986                 struct l_wait_info lwi = { 0 };
1987                 struct ldlm_bl_work_item *blwi = NULL;
1988
1989                 blwi = ldlm_bl_get_work(blp);
1990
1991                 if (blwi == NULL) {
1992                         int busy;
1993
1994                         atomic_dec(&blp->blp_busy_threads);
1995                         l_wait_event_exclusive(blp->blp_waitq,
1996                                          (blwi = ldlm_bl_get_work(blp)) != NULL,
1997                                          &lwi);
1998                         busy = atomic_inc_return(&blp->blp_busy_threads);
1999
2000                         if (blwi->blwi_ns == NULL)
2001                                 /* added by ldlm_cleanup() */
2002                                 break;
2003
2004                         /* Not fatal if racy and have a few too many threads */
2005                         if (unlikely(busy < blp->blp_max_threads &&
2006                                     busy >= atomic_read(&blp->blp_num_threads)))
2007                                 /* discard the return value, we tried */
2008                                 ldlm_bl_thread_start(blp);
2009                 } else {
2010                         if (blwi->blwi_ns == NULL)
2011                                 /* added by ldlm_cleanup() */
2012                                 break;
2013                 }
2014
2015                 if (blwi->blwi_count) {
2016                         /* The special case when we cancel locks in lru
2017                          * asynchronously, we pass the list of locks here.
2018                          * Thus locks are marked LDLM_FL_CANCELING, but NOT
2019                          * canceled locally yet. */
2020                         ldlm_cli_cancel_list_local(&blwi->blwi_head,
2021                                                    blwi->blwi_count, 0);
2022                         ldlm_cli_cancel_list(&blwi->blwi_head,
2023                                              blwi->blwi_count, NULL, 0);
2024                 } else {
2025                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
2026                                                 blwi->blwi_lock);
2027                 }
2028                 complete(&blwi->blwi_comp);
2029                 ldlm_bl_work_item_put(blwi);
2030         }
2031
2032         atomic_dec(&blp->blp_busy_threads);
2033         atomic_dec(&blp->blp_num_threads);
2034         complete(&blp->blp_comp);
2035         RETURN(0);
2036 }
2037
2038 #endif
2039
2040 /*
2041  * Export handle<->lock hash operations.
2042  */
2043 static unsigned
2044 ldlm_export_lock_hash(lustre_hash_t *lh, void *key, unsigned mask)
2045 {
2046         return lh_u64_hash(((struct lustre_handle *)key)->cookie, mask);
2047 }
2048
2049 static void *
2050 ldlm_export_lock_key(struct hlist_node *hnode)
2051 {
2052         struct ldlm_lock *lock;
2053         ENTRY;
2054
2055         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2056         RETURN(&lock->l_remote_handle);
2057 }
2058
2059 static int
2060 ldlm_export_lock_compare(void *key, struct hlist_node *hnode)
2061 {
2062         ENTRY;
2063         RETURN(lustre_handle_equal(ldlm_export_lock_key(hnode), key));
2064 }
2065
2066 static void *
2067 ldlm_export_lock_get(struct hlist_node *hnode)
2068 {
2069         struct ldlm_lock *lock;
2070         ENTRY;
2071
2072         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2073         LDLM_LOCK_GET(lock);
2074
2075         RETURN(lock);
2076 }
2077
2078 static void *
2079 ldlm_export_lock_put(struct hlist_node *hnode)
2080 {
2081         struct ldlm_lock *lock;
2082         ENTRY;
2083
2084         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2085         LDLM_LOCK_PUT(lock);
2086
2087         RETURN(lock);
2088 }
2089
2090 static lustre_hash_ops_t ldlm_export_lock_ops = {
2091         .lh_hash    = ldlm_export_lock_hash,
2092         .lh_key     = ldlm_export_lock_key,
2093         .lh_compare = ldlm_export_lock_compare,
2094         .lh_get     = ldlm_export_lock_get,
2095         .lh_put     = ldlm_export_lock_put
2096 };
2097
2098 int ldlm_init_export(struct obd_export *exp)
2099 {
2100         ENTRY;
2101
2102         exp->exp_lock_hash =
2103                 lustre_hash_init(obd_uuid2str(&exp->exp_client_uuid),
2104                                  7, 16, &ldlm_export_lock_ops, LH_REHASH);
2105
2106         if (!exp->exp_lock_hash)
2107                 RETURN(-ENOMEM);
2108
2109         RETURN(0);
2110 }
2111 EXPORT_SYMBOL(ldlm_init_export);
2112
2113 void ldlm_destroy_export(struct obd_export *exp)
2114 {
2115         ENTRY;
2116         lustre_hash_exit(exp->exp_lock_hash);
2117         exp->exp_lock_hash = NULL;
2118         EXIT;
2119 }
2120 EXPORT_SYMBOL(ldlm_destroy_export);
2121
2122 static int ldlm_setup(void);
2123 static int ldlm_cleanup(void);
2124
2125 int ldlm_get_ref(void)
2126 {
2127         int rc = 0;
2128         ENTRY;
2129         mutex_down(&ldlm_ref_sem);
2130         if (++ldlm_refcount == 1) {
2131                 rc = ldlm_setup();
2132                 if (rc)
2133                         ldlm_refcount--;
2134         }
2135         mutex_up(&ldlm_ref_sem);
2136
2137         RETURN(rc);
2138 }
2139
2140 void ldlm_put_ref(void)
2141 {
2142         ENTRY;
2143         mutex_down(&ldlm_ref_sem);
2144         if (ldlm_refcount == 1) {
2145                 int rc = ldlm_cleanup();
2146                 if (rc)
2147                         CERROR("ldlm_cleanup failed: %d\n", rc);
2148                 else
2149                         ldlm_refcount--;
2150         } else {
2151                 ldlm_refcount--;
2152         }
2153         mutex_up(&ldlm_ref_sem);
2154
2155         EXIT;
2156 }
2157
2158 static int ldlm_setup(void)
2159 {
2160         struct ldlm_bl_pool *blp;
2161         int rc = 0;
2162         int ldlm_min_threads = LDLM_THREADS_AUTO_MIN;
2163         int ldlm_max_threads = LDLM_THREADS_AUTO_MAX;
2164 #ifdef __KERNEL__
2165         int i;
2166 #endif
2167         ENTRY;
2168
2169         if (ldlm_state != NULL)
2170                 RETURN(-EALREADY);
2171
2172         OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
2173         if (ldlm_state == NULL)
2174                 RETURN(-ENOMEM);
2175
2176 #ifdef LPROCFS
2177         rc = ldlm_proc_setup();
2178         if (rc != 0)
2179                 GOTO(out_free, rc);
2180 #endif
2181
2182 #ifdef __KERNEL__
2183         if (ldlm_num_threads) {
2184                 /* If ldlm_num_threads is set, it is the min and the max. */
2185                 if (ldlm_num_threads > LDLM_THREADS_AUTO_MAX)
2186                         ldlm_num_threads = LDLM_THREADS_AUTO_MAX;
2187                 if (ldlm_num_threads < LDLM_THREADS_AUTO_MIN)
2188                         ldlm_num_threads = LDLM_THREADS_AUTO_MIN;
2189                 ldlm_min_threads = ldlm_max_threads = ldlm_num_threads;
2190         }
2191 #endif
2192
2193         ldlm_state->ldlm_cb_service =
2194                 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
2195                                 LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
2196                                 LDLM_CB_REPLY_PORTAL, 2,
2197                                 ldlm_callback_handler, "ldlm_cbd",
2198                                 ldlm_svc_proc_dir, NULL,
2199                                 ldlm_min_threads, ldlm_max_threads,
2200                                 "ldlm_cb", NULL);
2201
2202         if (!ldlm_state->ldlm_cb_service) {
2203                 CERROR("failed to start service\n");
2204                 GOTO(out_proc, rc = -ENOMEM);
2205         }
2206
2207         ldlm_state->ldlm_cancel_service =
2208                 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
2209                                 LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL,
2210                                 LDLM_CANCEL_REPLY_PORTAL, 6,
2211                                 ldlm_cancel_handler, "ldlm_canceld",
2212                                 ldlm_svc_proc_dir, NULL,
2213                                 ldlm_min_threads, ldlm_max_threads,
2214                                 "ldlm_cn", NULL);
2215
2216         if (!ldlm_state->ldlm_cancel_service) {
2217                 CERROR("failed to start service\n");
2218                 GOTO(out_proc, rc = -ENOMEM);
2219         }
2220
2221         OBD_ALLOC(blp, sizeof(*blp));
2222         if (blp == NULL)
2223                 GOTO(out_proc, rc = -ENOMEM);
2224         ldlm_state->ldlm_bl_pool = blp;
2225
2226         spin_lock_init(&blp->blp_lock);
2227         CFS_INIT_LIST_HEAD(&blp->blp_list);
2228         CFS_INIT_LIST_HEAD(&blp->blp_prio_list);
2229         cfs_waitq_init(&blp->blp_waitq);
2230         atomic_set(&blp->blp_num_threads, 0);
2231         atomic_set(&blp->blp_busy_threads, 0);
2232         blp->blp_min_threads = ldlm_min_threads;
2233         blp->blp_max_threads = ldlm_max_threads;
2234
2235 #ifdef __KERNEL__
2236         for (i = 0; i < blp->blp_min_threads; i++) {
2237                 rc = ldlm_bl_thread_start(blp);
2238                 if (rc < 0)
2239                         GOTO(out_thread, rc);
2240         }
2241
2242         rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cancel_service);
2243         if (rc)
2244                 GOTO(out_thread, rc);
2245
2246         rc = ptlrpc_start_threads(NULL, ldlm_state->ldlm_cb_service);
2247         if (rc)
2248                 GOTO(out_thread, rc);
2249
2250         CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
2251         expired_lock_thread.elt_state = ELT_STOPPED;
2252         cfs_waitq_init(&expired_lock_thread.elt_waitq);
2253
2254         CFS_INIT_LIST_HEAD(&waiting_locks_list);
2255         spin_lock_init(&waiting_locks_spinlock);
2256         cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
2257
2258         rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES);
2259         if (rc < 0) {
2260                 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
2261                 GOTO(out_thread, rc);
2262         }
2263
2264         wait_event(expired_lock_thread.elt_waitq,
2265                    expired_lock_thread.elt_state == ELT_READY);
2266 #endif
2267
2268 #ifdef __KERNEL__
2269         rc = ldlm_pools_init();
2270         if (rc)
2271                 GOTO(out_thread, rc);
2272 #endif
2273
2274         RETURN(0);
2275
2276 #ifdef __KERNEL__
2277  out_thread:
2278         ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
2279         ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
2280 #endif
2281
2282  out_proc:
2283 #ifdef LPROCFS
2284         ldlm_proc_cleanup();
2285  out_free:
2286 #endif
2287         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
2288         ldlm_state = NULL;
2289         return rc;
2290 }
2291
2292 static int ldlm_cleanup(void)
2293 {
2294 #ifdef __KERNEL__
2295         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
2296 #endif
2297         ENTRY;
2298
2299         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
2300             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
2301                 CERROR("ldlm still has namespaces; clean these up first.\n");
2302                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
2303                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
2304                 RETURN(-EBUSY);
2305         }
2306
2307 #ifdef __KERNEL__
2308         ldlm_pools_fini();
2309 #endif
2310
2311 #ifdef __KERNEL__
2312         while (atomic_read(&blp->blp_num_threads) > 0) {
2313                 struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
2314
2315                 init_completion(&blp->blp_comp);
2316
2317                 spin_lock(&blp->blp_lock);
2318                 list_add_tail(&blwi.blwi_entry, &blp->blp_list);
2319                 cfs_waitq_signal(&blp->blp_waitq);
2320                 spin_unlock(&blp->blp_lock);
2321
2322                 wait_for_completion(&blp->blp_comp);
2323         }
2324         OBD_FREE(blp, sizeof(*blp));
2325
2326         ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
2327         ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
2328         ldlm_proc_cleanup();
2329
2330         expired_lock_thread.elt_state = ELT_TERMINATE;
2331         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
2332         wait_event(expired_lock_thread.elt_waitq,
2333                    expired_lock_thread.elt_state == ELT_STOPPED);
2334 #else
2335         ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
2336         ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
2337 #endif
2338
2339         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
2340         ldlm_state = NULL;
2341
2342         RETURN(0);
2343 }
2344
2345 int __init ldlm_init(void)
2346 {
2347         init_mutex(&ldlm_ref_sem);
2348         init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
2349         init_mutex(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
2350         ldlm_resource_slab = cfs_mem_cache_create("ldlm_resources",
2351                                                sizeof(struct ldlm_resource), 0,
2352                                                SLAB_HWCACHE_ALIGN);
2353         if (ldlm_resource_slab == NULL)
2354                 return -ENOMEM;
2355
2356         ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
2357                                       sizeof(struct ldlm_lock), 0,
2358                                       SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU);
2359         if (ldlm_lock_slab == NULL) {
2360                 cfs_mem_cache_destroy(ldlm_resource_slab);
2361                 return -ENOMEM;
2362         }
2363
2364         ldlm_interval_slab = cfs_mem_cache_create("interval_node",
2365                                         sizeof(struct ldlm_interval),
2366                                         0, SLAB_HWCACHE_ALIGN);
2367         if (ldlm_interval_slab == NULL) {
2368                 cfs_mem_cache_destroy(ldlm_resource_slab);
2369                 cfs_mem_cache_destroy(ldlm_lock_slab);
2370                 return -ENOMEM;
2371         }
2372
2373         return 0;
2374 }
2375
2376 void __exit ldlm_exit(void)
2377 {
2378         int rc;
2379         if (ldlm_refcount)
2380                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
2381         rc = cfs_mem_cache_destroy(ldlm_resource_slab);
2382         LASSERTF(rc == 0, "couldn't free ldlm resource slab\n");
2383 #ifdef __KERNEL__
2384         /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
2385          * synchronize_rcu() to wait a grace period elapsed, so that
2386          * ldlm_lock_free() get a chance to be called. */
2387         synchronize_rcu();
2388 #endif
2389         rc = cfs_mem_cache_destroy(ldlm_lock_slab);
2390         LASSERTF(rc == 0, "couldn't free ldlm lock slab\n");
2391         rc = cfs_mem_cache_destroy(ldlm_interval_slab);
2392         LASSERTF(rc == 0, "couldn't free interval node slab\n");
2393 }
2394
2395 /* ldlm_extent.c */
2396 EXPORT_SYMBOL(ldlm_extent_shift_kms);
2397
2398 /* ldlm_lock.c */
2399 EXPORT_SYMBOL(ldlm_get_processing_policy);
2400 EXPORT_SYMBOL(ldlm_lock2desc);
2401 EXPORT_SYMBOL(ldlm_register_intent);
2402 EXPORT_SYMBOL(ldlm_lockname);
2403 EXPORT_SYMBOL(ldlm_typename);
2404 EXPORT_SYMBOL(ldlm_lock2handle);
2405 EXPORT_SYMBOL(__ldlm_handle2lock);
2406 EXPORT_SYMBOL(ldlm_lock_get);
2407 EXPORT_SYMBOL(ldlm_lock_put);
2408 EXPORT_SYMBOL(ldlm_lock_fast_match);
2409 EXPORT_SYMBOL(ldlm_lock_match);
2410 EXPORT_SYMBOL(ldlm_lock_cancel);
2411 EXPORT_SYMBOL(ldlm_lock_addref);
2412 EXPORT_SYMBOL(ldlm_lock_decref);
2413 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
2414 EXPORT_SYMBOL(ldlm_lock_change_resource);
2415 EXPORT_SYMBOL(ldlm_lock_set_data);
2416 EXPORT_SYMBOL(ldlm_it2str);
2417 EXPORT_SYMBOL(ldlm_lock_dump);
2418 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2419 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
2420 EXPORT_SYMBOL(ldlm_lock_allow_match);
2421
2422 /* ldlm_request.c */
2423 EXPORT_SYMBOL(ldlm_completion_ast);
2424 EXPORT_SYMBOL(ldlm_blocking_ast);
2425 EXPORT_SYMBOL(ldlm_glimpse_ast);
2426 EXPORT_SYMBOL(ldlm_expired_completion_wait);
2427 EXPORT_SYMBOL(ldlm_prep_enqueue_req);
2428 EXPORT_SYMBOL(ldlm_prep_elc_req);
2429 EXPORT_SYMBOL(ldlm_cli_convert);
2430 EXPORT_SYMBOL(ldlm_cli_enqueue);
2431 EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
2432 EXPORT_SYMBOL(ldlm_cli_enqueue_local);
2433 EXPORT_SYMBOL(ldlm_cli_cancel);
2434 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
2435 EXPORT_SYMBOL(ldlm_cli_cancel_req);
2436 EXPORT_SYMBOL(ldlm_cli_join_lru);
2437 EXPORT_SYMBOL(ldlm_replay_locks);
2438 EXPORT_SYMBOL(ldlm_resource_foreach);
2439 EXPORT_SYMBOL(ldlm_namespace_foreach);
2440 EXPORT_SYMBOL(ldlm_namespace_foreach_res);
2441 EXPORT_SYMBOL(ldlm_resource_iterate);
2442 EXPORT_SYMBOL(ldlm_cancel_resource_local);
2443 EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
2444 EXPORT_SYMBOL(ldlm_cli_cancel_list);
2445
2446 /* ldlm_lockd.c */
2447 EXPORT_SYMBOL(ldlm_server_blocking_ast);
2448 EXPORT_SYMBOL(ldlm_server_completion_ast);
2449 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
2450 EXPORT_SYMBOL(ldlm_handle_enqueue);
2451 EXPORT_SYMBOL(ldlm_handle_cancel);
2452 EXPORT_SYMBOL(ldlm_request_cancel);
2453 EXPORT_SYMBOL(ldlm_handle_convert);
2454 EXPORT_SYMBOL(ldlm_del_waiting_lock);
2455 EXPORT_SYMBOL(ldlm_get_ref);
2456 EXPORT_SYMBOL(ldlm_put_ref);
2457 EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
2458
2459 /* ldlm_resource.c */
2460 EXPORT_SYMBOL(ldlm_namespace_new);
2461 EXPORT_SYMBOL(ldlm_namespace_cleanup);
2462 EXPORT_SYMBOL(ldlm_namespace_free);
2463 EXPORT_SYMBOL(ldlm_namespace_dump);
2464 EXPORT_SYMBOL(ldlm_dump_all_namespaces);
2465 EXPORT_SYMBOL(ldlm_resource_get);
2466 EXPORT_SYMBOL(ldlm_resource_putref);
2467 EXPORT_SYMBOL(ldlm_resource_unlink_lock);
2468
2469 /* ldlm_lib.c */
2470 EXPORT_SYMBOL(client_import_add_conn);
2471 EXPORT_SYMBOL(client_import_del_conn);
2472 EXPORT_SYMBOL(client_obd_setup);
2473 EXPORT_SYMBOL(client_obd_cleanup);
2474 EXPORT_SYMBOL(client_connect_import);
2475 EXPORT_SYMBOL(client_disconnect_export);
2476 EXPORT_SYMBOL(server_disconnect_export);
2477 EXPORT_SYMBOL(target_abort_recovery);
2478 EXPORT_SYMBOL(target_cleanup_recovery);
2479 EXPORT_SYMBOL(target_handle_connect);
2480 EXPORT_SYMBOL(target_destroy_export);
2481 EXPORT_SYMBOL(target_cancel_recovery_timer);
2482 EXPORT_SYMBOL(target_send_reply);
2483 EXPORT_SYMBOL(target_queue_recovery_request);
2484 EXPORT_SYMBOL(target_handle_ping);
2485 EXPORT_SYMBOL(target_pack_pool_reply);
2486 EXPORT_SYMBOL(target_handle_disconnect);
2487 EXPORT_SYMBOL(target_handle_reply);
2488
2489 /* l_lock.c */
2490 EXPORT_SYMBOL(lock_res_and_lock);
2491 EXPORT_SYMBOL(unlock_res_and_lock);