Whamcloud - gitweb
LU-1406 ofd: grant support
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ldlm/ldlm_lockd.c
37  *
38  * Author: Peter Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LDLM
43
44 #ifdef __KERNEL__
45 # include <libcfs/libcfs.h>
46 #else
47 # include <liblustre.h>
48 #endif
49
50 #include <lustre_dlm.h>
51 #include <obd_class.h>
52 #include <libcfs/list.h>
53 #include "ldlm_internal.h"
54
55 #ifdef __KERNEL__
56 static int ldlm_num_threads;
57 CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
58                 "number of DLM service threads to start");
59 #endif
60
61 extern cfs_mem_cache_t *ldlm_resource_slab;
62 extern cfs_mem_cache_t *ldlm_lock_slab;
63 static cfs_mutex_t      ldlm_ref_mutex;
64 static int ldlm_refcount;
65
66 struct ldlm_cb_async_args {
67         struct ldlm_cb_set_arg *ca_set_arg;
68         struct ldlm_lock       *ca_lock;
69 };
70
71 /* LDLM state */
72
73 static struct ldlm_state *ldlm_state;
74
75 inline cfs_time_t round_timeout(cfs_time_t timeout)
76 {
77         return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
78 }
79
80 /* timeout for initial callback (AST) reply (bz10399) */
81 static inline unsigned int ldlm_get_rq_timeout(void)
82 {
83         /* Non-AT value */
84         unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
85
86         return timeout < 1 ? 1 : timeout;
87 }
88
89 #ifdef __KERNEL__
90 /* w_l_spinlock protects both waiting_locks_list and expired_lock_thread */
91 static cfs_spinlock_t waiting_locks_spinlock;   /* BH lock (timer) */
92 static cfs_list_t waiting_locks_list;
93 static cfs_timer_t waiting_locks_timer;
94
95 static struct expired_lock_thread {
96         cfs_waitq_t               elt_waitq;
97         int                       elt_state;
98         int                       elt_dump;
99         cfs_list_t                elt_expired_locks;
100 } expired_lock_thread;
101 #endif
102
103 #define ELT_STOPPED   0
104 #define ELT_READY     1
105 #define ELT_TERMINATE 2
106
107 struct ldlm_bl_pool {
108         cfs_spinlock_t          blp_lock;
109
110         /*
111          * blp_prio_list is used for callbacks that should be handled
112          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
113          * see bug 13843
114          */
115         cfs_list_t              blp_prio_list;
116
117         /*
118          * blp_list is used for all other callbacks which are likely
119          * to take longer to process.
120          */
121         cfs_list_t              blp_list;
122
123         cfs_waitq_t             blp_waitq;
124         cfs_completion_t        blp_comp;
125         cfs_atomic_t            blp_num_threads;
126         cfs_atomic_t            blp_busy_threads;
127         int                     blp_min_threads;
128         int                     blp_max_threads;
129 };
130
131 struct ldlm_bl_work_item {
132         cfs_list_t              blwi_entry;
133         struct ldlm_namespace  *blwi_ns;
134         struct ldlm_lock_desc   blwi_ld;
135         struct ldlm_lock       *blwi_lock;
136         cfs_list_t              blwi_head;
137         int                     blwi_count;
138         cfs_completion_t        blwi_comp;
139         int                     blwi_mode;
140         int                     blwi_mem_pressure;
141 };
142
143 #ifdef __KERNEL__
144
145 static inline int have_expired_locks(void)
146 {
147         int need_to_run;
148
149         ENTRY;
150         cfs_spin_lock_bh(&waiting_locks_spinlock);
151         need_to_run = !cfs_list_empty(&expired_lock_thread.elt_expired_locks);
152         cfs_spin_unlock_bh(&waiting_locks_spinlock);
153
154         RETURN(need_to_run);
155 }
156
157 static int expired_lock_main(void *arg)
158 {
159         cfs_list_t *expired = &expired_lock_thread.elt_expired_locks;
160         struct l_wait_info lwi = { 0 };
161         int do_dump;
162
163         ENTRY;
164         cfs_daemonize("ldlm_elt");
165
166         expired_lock_thread.elt_state = ELT_READY;
167         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
168
169         while (1) {
170                 l_wait_event(expired_lock_thread.elt_waitq,
171                              have_expired_locks() ||
172                              expired_lock_thread.elt_state == ELT_TERMINATE,
173                              &lwi);
174
175                 cfs_spin_lock_bh(&waiting_locks_spinlock);
176                 if (expired_lock_thread.elt_dump) {
177                         struct libcfs_debug_msg_data msgdata = {
178                                 .msg_file = __FILE__,
179                                 .msg_fn = "waiting_locks_callback",
180                                 .msg_line = expired_lock_thread.elt_dump };
181                         cfs_spin_unlock_bh(&waiting_locks_spinlock);
182
183                         /* from waiting_locks_callback, but not in timer */
184                         libcfs_debug_dumplog();
185                         libcfs_run_lbug_upcall(&msgdata);
186
187                         cfs_spin_lock_bh(&waiting_locks_spinlock);
188                         expired_lock_thread.elt_dump = 0;
189                 }
190
191                 do_dump = 0;
192
193                 while (!cfs_list_empty(expired)) {
194                         struct obd_export *export;
195                         struct ldlm_lock *lock;
196
197                         lock = cfs_list_entry(expired->next, struct ldlm_lock,
198                                           l_pending_chain);
199                         if ((void *)lock < LP_POISON + CFS_PAGE_SIZE &&
200                             (void *)lock >= LP_POISON) {
201                                 cfs_spin_unlock_bh(&waiting_locks_spinlock);
202                                 CERROR("free lock on elt list %p\n", lock);
203                                 LBUG();
204                         }
205                         cfs_list_del_init(&lock->l_pending_chain);
206                         if ((void *)lock->l_export < LP_POISON + CFS_PAGE_SIZE &&
207                             (void *)lock->l_export >= LP_POISON) {
208                                 CERROR("lock with free export on elt list %p\n",
209                                        lock->l_export);
210                                 lock->l_export = NULL;
211                                 LDLM_ERROR(lock, "free export");
212                                 /* release extra ref grabbed by
213                                  * ldlm_add_waiting_lock() or
214                                  * ldlm_failed_ast() */
215                                 LDLM_LOCK_RELEASE(lock);
216                                 continue;
217                         }
218                         export = class_export_lock_get(lock->l_export, lock);
219                         cfs_spin_unlock_bh(&waiting_locks_spinlock);
220
221                         do_dump++;
222                         class_fail_export(export);
223                         class_export_lock_put(export, lock);
224
225                         /* release extra ref grabbed by ldlm_add_waiting_lock()
226                          * or ldlm_failed_ast() */
227                         LDLM_LOCK_RELEASE(lock);
228
229                         cfs_spin_lock_bh(&waiting_locks_spinlock);
230                 }
231                 cfs_spin_unlock_bh(&waiting_locks_spinlock);
232
233                 if (do_dump && obd_dump_on_eviction) {
234                         CERROR("dump the log upon eviction\n");
235                         libcfs_debug_dumplog();
236                 }
237
238                 if (expired_lock_thread.elt_state == ELT_TERMINATE)
239                         break;
240         }
241
242         expired_lock_thread.elt_state = ELT_STOPPED;
243         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
244         RETURN(0);
245 }
246
247 static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
248
249 /**
250  * Check if there is a request in the export request list
251  * which prevents the lock canceling.
252  */
253 static int ldlm_lock_busy(struct ldlm_lock *lock)
254 {
255         struct ptlrpc_request *req;
256         int match = 0;
257         ENTRY;
258
259         if (lock->l_export == NULL)
260                 return 0;
261
262         cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
263         cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
264                                 rq_exp_list) {
265                 if (req->rq_ops->hpreq_lock_match) {
266                         match = req->rq_ops->hpreq_lock_match(req, lock);
267                         if (match)
268                                 break;
269                 }
270         }
271         cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock);
272         RETURN(match);
273 }
274
275 /* This is called from within a timer interrupt and cannot schedule */
276 static void waiting_locks_callback(unsigned long unused)
277 {
278         struct ldlm_lock *lock;
279
280 repeat:
281         cfs_spin_lock_bh(&waiting_locks_spinlock);
282         while (!cfs_list_empty(&waiting_locks_list)) {
283                 lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock,
284                                       l_pending_chain);
285                 if (cfs_time_after(lock->l_callback_timeout,
286                                    cfs_time_current()) ||
287                     (lock->l_req_mode == LCK_GROUP))
288                         break;
289
290                 if (ptlrpc_check_suspend()) {
291                         /* there is a case when we talk to one mds, holding
292                          * lock from another mds. this way we easily can get
293                          * here, if second mds is being recovered. so, we
294                          * suspend timeouts. bug 6019 */
295
296                         LDLM_ERROR(lock, "recharge timeout: %s@%s nid %s ",
297                                    lock->l_export->exp_client_uuid.uuid,
298                                    lock->l_export->exp_connection->c_remote_uuid.uuid,
299                                    libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
300
301                         cfs_list_del_init(&lock->l_pending_chain);
302                         cfs_spin_unlock_bh(&waiting_locks_spinlock);
303                         ldlm_add_waiting_lock(lock);
304                         goto repeat;
305                 }
306
307                 /* if timeout overlaps the activation time of suspended timeouts
308                  * then extend it to give a chance for client to reconnect */
309                 if (cfs_time_before(cfs_time_sub(lock->l_callback_timeout,
310                                                  cfs_time_seconds(obd_timeout)/2),
311                                     ptlrpc_suspend_wakeup_time())) {
312                         LDLM_ERROR(lock, "extend timeout due to recovery: %s@%s nid %s ",
313                                    lock->l_export->exp_client_uuid.uuid,
314                                    lock->l_export->exp_connection->c_remote_uuid.uuid,
315                                    libcfs_nid2str(lock->l_export->exp_connection->c_peer.nid));
316
317                         cfs_list_del_init(&lock->l_pending_chain);
318                         cfs_spin_unlock_bh(&waiting_locks_spinlock);
319                         ldlm_add_waiting_lock(lock);
320                         goto repeat;
321                 }
322
323                 /* Check if we need to prolong timeout */
324                 if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
325                     ldlm_lock_busy(lock)) {
326                         int cont = 1;
327
328                         if (lock->l_pending_chain.next == &waiting_locks_list)
329                                 cont = 0;
330
331                         LDLM_LOCK_GET(lock);
332
333                         cfs_spin_unlock_bh(&waiting_locks_spinlock);
334                         LDLM_DEBUG(lock, "prolong the busy lock");
335                         ldlm_refresh_waiting_lock(lock,
336                                                   ldlm_get_enq_timeout(lock));
337                         cfs_spin_lock_bh(&waiting_locks_spinlock);
338
339                         if (!cont) {
340                                 LDLM_LOCK_RELEASE(lock);
341                                 break;
342                         }
343
344                         LDLM_LOCK_RELEASE(lock);
345                         continue;
346                 }
347                 ldlm_lock_to_ns(lock)->ns_timeouts++;
348                 LDLM_ERROR(lock, "lock callback timer expired after %lds: "
349                            "evicting client at %s ",
350                            cfs_time_current_sec()- lock->l_last_activity,
351                            libcfs_nid2str(
352                                    lock->l_export->exp_connection->c_peer.nid));
353
354                 /* no needs to take an extra ref on the lock since it was in
355                  * the waiting_locks_list and ldlm_add_waiting_lock()
356                  * already grabbed a ref */
357                 cfs_list_del(&lock->l_pending_chain);
358                 cfs_list_add(&lock->l_pending_chain,
359                              &expired_lock_thread.elt_expired_locks);
360         }
361
362         if (!cfs_list_empty(&expired_lock_thread.elt_expired_locks)) {
363                 if (obd_dump_on_timeout)
364                         expired_lock_thread.elt_dump = __LINE__;
365
366                 cfs_waitq_signal(&expired_lock_thread.elt_waitq);
367         }
368
369         /*
370          * Make sure the timer will fire again if we have any locks
371          * left.
372          */
373         if (!cfs_list_empty(&waiting_locks_list)) {
374                 cfs_time_t timeout_rounded;
375                 lock = cfs_list_entry(waiting_locks_list.next, struct ldlm_lock,
376                                       l_pending_chain);
377                 timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout);
378                 cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
379         }
380         cfs_spin_unlock_bh(&waiting_locks_spinlock);
381 }
382
383 /*
384  * Indicate that we're waiting for a client to call us back cancelling a given
385  * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
386  * timer to fire appropriately.  (We round up to the next second, to avoid
387  * floods of timer firings during periods of high lock contention and traffic).
388  * As done by ldlm_add_waiting_lock(), the caller must grab a lock reference
389  * if it has been added to the waiting list (1 is returned).
390  *
391  * Called with the namespace lock held.
392  */
393 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds)
394 {
395         cfs_time_t timeout;
396         cfs_time_t timeout_rounded;
397
398         if (!cfs_list_empty(&lock->l_pending_chain))
399                 return 0;
400
401         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
402             OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
403                 seconds = 1;
404
405         timeout = cfs_time_shift(seconds);
406         if (likely(cfs_time_after(timeout, lock->l_callback_timeout)))
407                 lock->l_callback_timeout = timeout;
408
409         timeout_rounded = round_timeout(lock->l_callback_timeout);
410
411         if (cfs_time_before(timeout_rounded,
412                             cfs_timer_deadline(&waiting_locks_timer)) ||
413             !cfs_timer_is_armed(&waiting_locks_timer)) {
414                 cfs_timer_arm(&waiting_locks_timer, timeout_rounded);
415         }
416         /* if the new lock has a shorter timeout than something earlier on
417            the list, we'll wait the longer amount of time; no big deal. */
418         /* FIFO */
419         cfs_list_add_tail(&lock->l_pending_chain, &waiting_locks_list);
420         return 1;
421 }
422
423 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
424 {
425         int ret;
426         int timeout = ldlm_get_enq_timeout(lock);
427
428         LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
429
430         cfs_spin_lock_bh(&waiting_locks_spinlock);
431         if (lock->l_destroyed) {
432                 static cfs_time_t next;
433                 cfs_spin_unlock_bh(&waiting_locks_spinlock);
434                 LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
435                 if (cfs_time_after(cfs_time_current(), next)) {
436                         next = cfs_time_shift(14400);
437                         libcfs_debug_dumpstack(NULL);
438                 }
439                 return 0;
440         }
441
442         ret = __ldlm_add_waiting_lock(lock, timeout);
443         if (ret) {
444                 /* grab ref on the lock if it has been added to the
445                  * waiting list */
446                 LDLM_LOCK_GET(lock);
447         }
448         cfs_spin_unlock_bh(&waiting_locks_spinlock);
449
450         if (ret) {
451                 cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock);
452                 if (cfs_list_empty(&lock->l_exp_list))
453                         cfs_list_add(&lock->l_exp_list,
454                                      &lock->l_export->exp_bl_list);
455                 cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
456         }
457
458         LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
459                    ret == 0 ? "not re-" : "", timeout,
460                    AT_OFF ? "off" : "on");
461         return ret;
462 }
463
464 /*
465  * Remove a lock from the pending list, likely because it had its cancellation
466  * callback arrive without incident.  This adjusts the lock-timeout timer if
467  * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
468  * As done by ldlm_del_waiting_lock(), the caller must release the lock
469  * reference when the lock is removed from any list (1 is returned).
470  *
471  * Called with namespace lock held.
472  */
473 static int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
474 {
475         cfs_list_t *list_next;
476
477         if (cfs_list_empty(&lock->l_pending_chain))
478                 return 0;
479
480         list_next = lock->l_pending_chain.next;
481         if (lock->l_pending_chain.prev == &waiting_locks_list) {
482                 /* Removing the head of the list, adjust timer. */
483                 if (list_next == &waiting_locks_list) {
484                         /* No more, just cancel. */
485                         cfs_timer_disarm(&waiting_locks_timer);
486                 } else {
487                         struct ldlm_lock *next;
488                         next = cfs_list_entry(list_next, struct ldlm_lock,
489                                               l_pending_chain);
490                         cfs_timer_arm(&waiting_locks_timer,
491                                       round_timeout(next->l_callback_timeout));
492                 }
493         }
494         cfs_list_del_init(&lock->l_pending_chain);
495
496         return 1;
497 }
498
499 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
500 {
501         int ret;
502
503         if (lock->l_export == NULL) {
504                 /* We don't have a "waiting locks list" on clients. */
505                 CDEBUG(D_DLMTRACE, "Client lock %p : no-op\n", lock);
506                 return 0;
507         }
508
509         cfs_spin_lock_bh(&waiting_locks_spinlock);
510         ret = __ldlm_del_waiting_lock(lock);
511         cfs_spin_unlock_bh(&waiting_locks_spinlock);
512
513         /* remove the lock out of export blocking list */
514         cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock);
515         cfs_list_del_init(&lock->l_exp_list);
516         cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
517
518         if (ret) {
519                 /* release lock ref if it has indeed been removed
520                  * from a list */
521                 LDLM_LOCK_RELEASE(lock);
522         }
523
524         LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
525         return ret;
526 }
527
528 /*
529  * Prolong the lock
530  *
531  * Called with namespace lock held.
532  */
533 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
534 {
535         if (lock->l_export == NULL) {
536                 /* We don't have a "waiting locks list" on clients. */
537                 LDLM_DEBUG(lock, "client lock: no-op");
538                 return 0;
539         }
540
541         cfs_spin_lock_bh(&waiting_locks_spinlock);
542
543         if (cfs_list_empty(&lock->l_pending_chain)) {
544                 cfs_spin_unlock_bh(&waiting_locks_spinlock);
545                 LDLM_DEBUG(lock, "wasn't waiting");
546                 return 0;
547         }
548
549         /* we remove/add the lock to the waiting list, so no needs to
550          * release/take a lock reference */
551         __ldlm_del_waiting_lock(lock);
552         __ldlm_add_waiting_lock(lock, timeout);
553         cfs_spin_unlock_bh(&waiting_locks_spinlock);
554
555         LDLM_DEBUG(lock, "refreshed");
556         return 1;
557 }
558 #else /* !__KERNEL__ */
559
560 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
561 {
562         RETURN(0);
563 }
564
565 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
566 {
567         RETURN(0);
568 }
569 #endif /* __KERNEL__ */
570
571 #ifdef HAVE_SERVER_SUPPORT
572 # ifndef __KERNEL__
573 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
574 {
575         LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
576         RETURN(1);
577 }
578 # endif
579
580 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
581                             const char *ast_type)
582 {
583         LCONSOLE_ERROR_MSG(0x138, "%s: A client on nid %s was evicted due "
584                            "to a lock %s callback time out: rc %d\n",
585                            lock->l_export->exp_obd->obd_name,
586                            obd_export_nid2str(lock->l_export), ast_type, rc);
587
588         if (obd_dump_on_timeout)
589                 libcfs_debug_dumplog();
590 #ifdef __KERNEL__
591         cfs_spin_lock_bh(&waiting_locks_spinlock);
592         if (__ldlm_del_waiting_lock(lock) == 0)
593                 /* the lock was not in any list, grab an extra ref before adding
594                  * the lock to the expired list */
595                 LDLM_LOCK_GET(lock);
596         cfs_list_add(&lock->l_pending_chain,
597                      &expired_lock_thread.elt_expired_locks);
598         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
599         cfs_spin_unlock_bh(&waiting_locks_spinlock);
600 #else
601         class_fail_export(lock->l_export);
602 #endif
603 }
604
605 static int ldlm_handle_ast_error(struct ldlm_lock *lock,
606                                  struct ptlrpc_request *req, int rc,
607                                  const char *ast_type)
608 {
609         lnet_process_id_t peer = req->rq_import->imp_connection->c_peer;
610
611         if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {
612                 LASSERT(lock->l_export);
613                 if (lock->l_export->exp_libclient) {
614                         LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"
615                                    " timeout, just cancelling lock", ast_type,
616                                    libcfs_nid2str(peer.nid));
617                         ldlm_lock_cancel(lock);
618                         rc = -ERESTART;
619                 } else if (lock->l_flags & LDLM_FL_CANCEL) {
620                         LDLM_DEBUG(lock, "%s AST timeout from nid %s, but "
621                                    "cancel was received (AST reply lost?)",
622                                    ast_type, libcfs_nid2str(peer.nid));
623                         ldlm_lock_cancel(lock);
624                         rc = -ERESTART;
625                 } else {
626                         ldlm_del_waiting_lock(lock);
627                         ldlm_failed_ast(lock, rc, ast_type);
628                 }
629         } else if (rc) {
630                 if (rc == -EINVAL) {
631                         struct ldlm_resource *res = lock->l_resource;
632                         LDLM_DEBUG(lock, "client (nid %s) returned %d"
633                                " from %s AST - normal race",
634                                libcfs_nid2str(peer.nid),
635                                req->rq_repmsg ?
636                                lustre_msg_get_status(req->rq_repmsg) : -1,
637                                ast_type);
638                         if (res) {
639                                 /* update lvbo to return proper attributes.
640                                  * see bug 23174 */
641                                 ldlm_resource_getref(res);
642                                 ldlm_res_lvbo_update(res, NULL, 1);
643                                 ldlm_resource_putref(res);
644                         }
645
646                 } else {
647                         LDLM_ERROR(lock, "client (nid %s) returned %d "
648                                    "from %s AST", libcfs_nid2str(peer.nid),
649                                    (req->rq_repmsg != NULL) ?
650                                    lustre_msg_get_status(req->rq_repmsg) : 0,
651                                    ast_type);
652                 }
653                 ldlm_lock_cancel(lock);
654                 /* Server-side AST functions are called from ldlm_reprocess_all,
655                  * which needs to be told to please restart its reprocessing. */
656                 rc = -ERESTART;
657         }
658
659         return rc;
660 }
661
662 static int ldlm_cb_interpret(const struct lu_env *env,
663                              struct ptlrpc_request *req, void *data, int rc)
664 {
665         struct ldlm_cb_async_args *ca   = data;
666         struct ldlm_lock          *lock = ca->ca_lock;
667         struct ldlm_cb_set_arg    *arg  = ca->ca_set_arg;
668         ENTRY;
669
670         LASSERT(lock != NULL);
671         if (rc != 0) {
672                 rc = ldlm_handle_ast_error(lock, req, rc,
673                                            arg->type == LDLM_BL_CALLBACK
674                                            ? "blocking" : "completion");
675                 if (rc == -ERESTART)
676                         cfs_atomic_inc(&arg->restart);
677         }
678         LDLM_LOCK_RELEASE(lock);
679
680         if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
681                 cfs_waitq_signal(&arg->waitq);
682
683         ldlm_csa_put(arg);
684         RETURN(0);
685 }
686
687 static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
688                                           struct ldlm_cb_set_arg *arg,
689                                           struct ldlm_lock *lock,
690                                           int instant_cancel)
691 {
692         int rc = 0;
693         ENTRY;
694
695         if (unlikely(instant_cancel)) {
696                 rc = ptl_send_rpc(req, 1);
697                 ptlrpc_req_finished(req);
698                 if (rc == 0)
699                         cfs_atomic_inc(&arg->restart);
700         } else {
701                 LDLM_LOCK_GET(lock);
702                 cfs_atomic_inc(&arg->rpcs);
703                 cfs_atomic_inc(&arg->refcount);
704                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
705         }
706
707         RETURN(rc);
708 }
709
710 /**
711  * Check if there are requests in the export request list which prevent
712  * the lock canceling and make these requests high priority ones.
713  */
714 static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
715 {
716         struct ptlrpc_request *req;
717         ENTRY;
718
719         if (lock->l_export == NULL) {
720                 LDLM_DEBUG(lock, "client lock: no-op");
721                 RETURN_EXIT;
722         }
723
724         cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
725         cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
726                                 rq_exp_list) {
727                 /* Do not process requests that were not yet added to there
728                  * incoming queue or were already removed from there for
729                  * processing */
730                 if (!req->rq_hp && !cfs_list_empty(&req->rq_list) &&
731                     req->rq_ops->hpreq_lock_match &&
732                     req->rq_ops->hpreq_lock_match(req, lock))
733                         ptlrpc_hpreq_reorder(req);
734         }
735         cfs_spin_unlock_bh(&lock->l_export->exp_rpc_lock);
736         EXIT;
737 }
738
739 /*
740  * ->l_blocking_ast() method for server-side locks. This is invoked when newly
741  * enqueued server lock conflicts with given one.
742  *
743  * Sends blocking ast rpc to the client owning that lock; arms timeout timer
744  * to wait for client response.
745  */
746 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
747                              struct ldlm_lock_desc *desc,
748                              void *data, int flag)
749 {
750         struct ldlm_cb_async_args *ca;
751         struct ldlm_cb_set_arg *arg = data;
752         struct ldlm_request    *body;
753         struct ptlrpc_request  *req;
754         int                     instant_cancel = 0;
755         int                     rc = 0;
756         ENTRY;
757
758         if (flag == LDLM_CB_CANCELING)
759                 /* Don't need to do anything here. */
760                 RETURN(0);
761
762         LASSERT(lock);
763         LASSERT(data != NULL);
764         if (lock->l_export->exp_obd->obd_recovering != 0)
765                 LDLM_ERROR(lock, "BUG 6063: lock collide during recovery");
766
767         ldlm_lock_reorder_req(lock);
768
769         req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
770                                         &RQF_LDLM_BL_CALLBACK,
771                                         LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK);
772         if (req == NULL)
773                 RETURN(-ENOMEM);
774
775         CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
776         ca = ptlrpc_req_async_args(req);
777         ca->ca_set_arg = arg;
778         ca->ca_lock = lock;
779
780         req->rq_interpret_reply = ldlm_cb_interpret;
781         req->rq_no_resend = 1;
782
783         lock_res(lock->l_resource);
784         if (lock->l_granted_mode != lock->l_req_mode) {
785                 /* this blocking AST will be communicated as part of the
786                  * completion AST instead */
787                 unlock_res(lock->l_resource);
788                 ptlrpc_req_finished(req);
789                 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
790                 RETURN(0);
791         }
792
793         if (lock->l_destroyed) {
794                 /* What's the point? */
795                 unlock_res(lock->l_resource);
796                 ptlrpc_req_finished(req);
797                 RETURN(0);
798         }
799
800         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
801                 instant_cancel = 1;
802
803         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
804         body->lock_handle[0] = lock->l_remote_handle;
805         body->lock_desc = *desc;
806         body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
807
808         LDLM_DEBUG(lock, "server preparing blocking AST");
809
810         ptlrpc_request_set_replen(req);
811         if (instant_cancel) {
812                 unlock_res(lock->l_resource);
813                 ldlm_lock_cancel(lock);
814         } else {
815                 LASSERT(lock->l_granted_mode == lock->l_req_mode);
816                 ldlm_add_waiting_lock(lock);
817                 unlock_res(lock->l_resource);
818         }
819
820         req->rq_send_state = LUSTRE_IMP_FULL;
821         /* ptlrpc_request_alloc_pack already set timeout */
822         if (AT_OFF)
823                 req->rq_timeout = ldlm_get_rq_timeout();
824
825         if (lock->l_export && lock->l_export->exp_nid_stats &&
826             lock->l_export->exp_nid_stats->nid_ldlm_stats)
827                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
828                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
829
830         rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
831
832         RETURN(rc);
833 }
834
835 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
836 {
837         struct ldlm_cb_set_arg *arg = data;
838         struct ldlm_request    *body;
839         struct ptlrpc_request  *req;
840         struct ldlm_cb_async_args *ca;
841         long                    total_enqueue_wait;
842         int                     instant_cancel = 0;
843         int                     rc = 0;
844         ENTRY;
845
846         LASSERT(lock != NULL);
847         LASSERT(data != NULL);
848
849         total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(),
850                                           lock->l_last_activity);
851
852         req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
853                                     &RQF_LDLM_CP_CALLBACK);
854         if (req == NULL)
855                 RETURN(-ENOMEM);
856
857         /* server namespace, doesn't need lock */
858         if (lock->l_resource->lr_lvb_len) {
859                  req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT,
860                                       lock->l_resource->lr_lvb_len);
861         }
862
863         rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
864         if (rc) {
865                 ptlrpc_request_free(req);
866                 RETURN(rc);
867         }
868
869         CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
870         ca = ptlrpc_req_async_args(req);
871         ca->ca_set_arg = arg;
872         ca->ca_lock = lock;
873
874         req->rq_interpret_reply = ldlm_cb_interpret;
875         req->rq_no_resend = 1;
876         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
877
878         body->lock_handle[0] = lock->l_remote_handle;
879         body->lock_flags = flags;
880         ldlm_lock2desc(lock, &body->lock_desc);
881         if (lock->l_resource->lr_lvb_len) {
882                 void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
883
884                 lock_res(lock->l_resource);
885                 memcpy(lvb, lock->l_resource->lr_lvb_data,
886                        lock->l_resource->lr_lvb_len);
887                 unlock_res(lock->l_resource);
888         }
889
890         LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
891                    total_enqueue_wait);
892
893         /* Server-side enqueue wait time estimate, used in
894             __ldlm_add_waiting_lock to set future enqueue timers */
895         if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
896                 at_measured(ldlm_lock_to_ns_at(lock),
897                             total_enqueue_wait);
898         else
899                 /* bz18618. Don't add lock enqueue time we spend waiting for a
900                    previous callback to fail. Locks waiting legitimately will
901                    get extended by ldlm_refresh_waiting_lock regardless of the
902                    estimate, so it's okay to underestimate here. */
903                 LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. "
904                        "It is likely that a previous callback timed out.",
905                        total_enqueue_wait,
906                        at_get(ldlm_lock_to_ns_at(lock)));
907
908         ptlrpc_request_set_replen(req);
909
910         req->rq_send_state = LUSTRE_IMP_FULL;
911         /* ptlrpc_request_pack already set timeout */
912         if (AT_OFF)
913                 req->rq_timeout = ldlm_get_rq_timeout();
914
915         /* We only send real blocking ASTs after the lock is granted */
916         lock_res_and_lock(lock);
917         if (lock->l_flags & LDLM_FL_AST_SENT) {
918                 body->lock_flags |= LDLM_FL_AST_SENT;
919                 /* copy ast flags like LDLM_FL_DISCARD_DATA */
920                 body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
921
922                 /* We might get here prior to ldlm_handle_enqueue setting
923                  * LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
924                  * into waiting list, but this is safe and similar code in
925                  * ldlm_handle_enqueue will call ldlm_lock_cancel() still,
926                  * that would not only cancel the lock, but will also remove
927                  * it from waiting list */
928                 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
929                         unlock_res_and_lock(lock);
930                         ldlm_lock_cancel(lock);
931                         instant_cancel = 1;
932                         lock_res_and_lock(lock);
933                 } else {
934                         /* start the lock-timeout clock */
935                         ldlm_add_waiting_lock(lock);
936                 }
937         }
938         unlock_res_and_lock(lock);
939
940         if (lock->l_export && lock->l_export->exp_nid_stats &&
941             lock->l_export->exp_nid_stats->nid_ldlm_stats)
942                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
943                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
944
945         rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
946
947         RETURN(rc);
948 }
949
950 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
951 {
952         struct ldlm_resource  *res = lock->l_resource;
953         struct ldlm_request   *body;
954         struct ptlrpc_request *req;
955         int                    rc;
956         ENTRY;
957
958         LASSERT(lock != NULL);
959
960         req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
961                                         &RQF_LDLM_GL_CALLBACK,
962                                         LUSTRE_DLM_VERSION, LDLM_GL_CALLBACK);
963
964         if (req == NULL)
965                 RETURN(-ENOMEM);
966
967         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
968         body->lock_handle[0] = lock->l_remote_handle;
969         ldlm_lock2desc(lock, &body->lock_desc);
970
971         /* server namespace, doesn't need lock */
972         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
973                              lock->l_resource->lr_lvb_len);
974         res = lock->l_resource;
975         ptlrpc_request_set_replen(req);
976
977
978         req->rq_send_state = LUSTRE_IMP_FULL;
979         /* ptlrpc_request_alloc_pack already set timeout */
980         if (AT_OFF)
981                 req->rq_timeout = ldlm_get_rq_timeout();
982
983         if (lock->l_export && lock->l_export->exp_nid_stats &&
984             lock->l_export->exp_nid_stats->nid_ldlm_stats)
985                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
986                                      LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
987
988         rc = ptlrpc_queue_wait(req);
989         /* Update the LVB from disk if the AST failed (this is a legal race)
990          *
991          * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA.
992          * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA
993          *   when inode is cleared. LU-274
994          */
995         if (rc == -ELDLM_NO_LOCK_DATA) {
996                 LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
997                 ldlm_res_lvbo_update(res, NULL, 1);
998         } else if (rc != 0) {
999                 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
1000         } else {
1001                 rc = ldlm_res_lvbo_update(res, req, 1);
1002         }
1003
1004         ptlrpc_req_finished(req);
1005         if (rc == -ERESTART)
1006                 ldlm_reprocess_all(res);
1007
1008         RETURN(rc);
1009 }
1010
1011 static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
1012                        struct lprocfs_stats *srv_stats)
1013 {
1014         int lock_type = 0, op = 0;
1015
1016         lock_type = dlm_req->lock_desc.l_resource.lr_type;
1017
1018         switch (lock_type) {
1019         case LDLM_PLAIN:
1020                 op = PTLRPC_LAST_CNTR + LDLM_PLAIN_ENQUEUE;
1021                 break;
1022         case LDLM_EXTENT:
1023                 if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT)
1024                         op = PTLRPC_LAST_CNTR + LDLM_GLIMPSE_ENQUEUE;
1025                 else
1026                         op = PTLRPC_LAST_CNTR + LDLM_EXTENT_ENQUEUE;
1027                 break;
1028         case LDLM_FLOCK:
1029                 op = PTLRPC_LAST_CNTR + LDLM_FLOCK_ENQUEUE;
1030                 break;
1031         case LDLM_IBITS:
1032                 op = PTLRPC_LAST_CNTR + LDLM_IBITS_ENQUEUE;
1033                 break;
1034         default:
1035                 op = 0;
1036                 break;
1037         }
1038
1039         if (op)
1040                 lprocfs_counter_incr(srv_stats, op);
1041
1042         return;
1043 }
1044
1045 /*
1046  * Main server-side entry point into LDLM. This is called by ptlrpc service
1047  * threads to carry out client lock enqueueing requests.
1048  */
1049 int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
1050                          struct ptlrpc_request *req,
1051                          const struct ldlm_request *dlm_req,
1052                          const struct ldlm_callback_suite *cbs)
1053 {
1054         struct ldlm_reply *dlm_rep;
1055         __u32 flags;
1056         ldlm_error_t err = ELDLM_OK;
1057         struct ldlm_lock *lock = NULL;
1058         void *cookie = NULL;
1059         int rc = 0;
1060         ENTRY;
1061
1062         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
1063
1064         ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF);
1065         flags = dlm_req->lock_flags;
1066
1067         LASSERT(req->rq_export);
1068
1069         if (req->rq_rqbd->rqbd_service->srv_stats)
1070                 ldlm_svc_get_eopc(dlm_req,
1071                                   req->rq_rqbd->rqbd_service->srv_stats);
1072
1073         if (req->rq_export && req->rq_export->exp_nid_stats &&
1074             req->rq_export->exp_nid_stats->nid_ldlm_stats)
1075                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1076                                      LDLM_ENQUEUE - LDLM_FIRST_OPC);
1077
1078         if (unlikely(dlm_req->lock_desc.l_resource.lr_type < LDLM_MIN_TYPE ||
1079                      dlm_req->lock_desc.l_resource.lr_type >= LDLM_MAX_TYPE)) {
1080                 DEBUG_REQ(D_ERROR, req, "invalid lock request type %d",
1081                           dlm_req->lock_desc.l_resource.lr_type);
1082                 GOTO(out, rc = -EFAULT);
1083         }
1084
1085         if (unlikely(dlm_req->lock_desc.l_req_mode <= LCK_MINMODE ||
1086                      dlm_req->lock_desc.l_req_mode >= LCK_MAXMODE ||
1087                      dlm_req->lock_desc.l_req_mode &
1088                      (dlm_req->lock_desc.l_req_mode-1))) {
1089                 DEBUG_REQ(D_ERROR, req, "invalid lock request mode %d",
1090                           dlm_req->lock_desc.l_req_mode);
1091                 GOTO(out, rc = -EFAULT);
1092         }
1093
1094         if (req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) {
1095                 if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
1096                              LDLM_PLAIN)) {
1097                         DEBUG_REQ(D_ERROR, req,
1098                                   "PLAIN lock request from IBITS client?");
1099                         GOTO(out, rc = -EPROTO);
1100                 }
1101         } else if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
1102                             LDLM_IBITS)) {
1103                 DEBUG_REQ(D_ERROR, req,
1104                           "IBITS lock request from unaware client?");
1105                 GOTO(out, rc = -EPROTO);
1106         }
1107
1108 #if 0
1109         /* FIXME this makes it impossible to use LDLM_PLAIN locks -- check
1110            against server's _CONNECT_SUPPORTED flags? (I don't want to use
1111            ibits for mgc/mgs) */
1112
1113         /* INODEBITS_INTEROP: Perform conversion from plain lock to
1114          * inodebits lock if client does not support them. */
1115         if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS) &&
1116             (dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN)) {
1117                 dlm_req->lock_desc.l_resource.lr_type = LDLM_IBITS;
1118                 dlm_req->lock_desc.l_policy_data.l_inodebits.bits =
1119                         MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1120                 if (dlm_req->lock_desc.l_req_mode == LCK_PR)
1121                         dlm_req->lock_desc.l_req_mode = LCK_CR;
1122         }
1123 #endif
1124
1125         if (unlikely(flags & LDLM_FL_REPLAY)) {
1126                 /* Find an existing lock in the per-export lock hash */
1127                 lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
1128                                        (void *)&dlm_req->lock_handle[0]);
1129                 if (lock != NULL) {
1130                         DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
1131                                   LPX64, lock->l_handle.h_cookie);
1132                         GOTO(existing_lock, rc = 0);
1133                 }
1134         }
1135
1136         /* The lock's callback data might be set in the policy function */
1137         lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
1138                                 dlm_req->lock_desc.l_resource.lr_type,
1139                                 dlm_req->lock_desc.l_req_mode,
1140                                 cbs, NULL, 0);
1141
1142         if (!lock)
1143                 GOTO(out, rc = -ENOMEM);
1144
1145         lock->l_last_activity = cfs_time_current_sec();
1146         lock->l_remote_handle = dlm_req->lock_handle[0];
1147         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
1148
1149         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
1150         /* Don't enqueue a lock onto the export if it is been disonnected
1151          * due to eviction (bug 3822) or server umount (bug 24324).
1152          * Cancel it now instead. */
1153         if (req->rq_export->exp_disconnected) {
1154                 LDLM_ERROR(lock, "lock on disconnected export %p",
1155                            req->rq_export);
1156                 GOTO(out, rc = -ENOTCONN);
1157         }
1158
1159         lock->l_export = class_export_lock_get(req->rq_export, lock);
1160         if (lock->l_export->exp_lock_hash)
1161                 cfs_hash_add(lock->l_export->exp_lock_hash,
1162                              &lock->l_remote_handle,
1163                              &lock->l_exp_hash);
1164
1165 existing_lock:
1166
1167         if (flags & LDLM_FL_HAS_INTENT) {
1168                 /* In this case, the reply buffer is allocated deep in
1169                  * local_lock_enqueue by the policy function. */
1170                 cookie = req;
1171         } else {
1172                 /* based on the assumption that lvb size never changes during
1173                  * resource life time otherwise it need resource->lr_lock's
1174                  * protection */
1175                 if (lock->l_resource->lr_lvb_len) {
1176                         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
1177                                              RCL_SERVER,
1178                                              lock->l_resource->lr_lvb_len);
1179                 }
1180
1181                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
1182                         GOTO(out, rc = -ENOMEM);
1183
1184                 rc = req_capsule_server_pack(&req->rq_pill);
1185                 if (rc)
1186                         GOTO(out, rc);
1187         }
1188
1189         if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
1190                 ldlm_convert_policy_to_local(req->rq_export,
1191                                           dlm_req->lock_desc.l_resource.lr_type,
1192                                           &dlm_req->lock_desc.l_policy_data,
1193                                           &lock->l_policy_data);
1194         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
1195                 lock->l_req_extent = lock->l_policy_data.l_extent;
1196
1197         err = ldlm_lock_enqueue(ns, &lock, cookie, (int *)&flags);
1198         if (err)
1199                 GOTO(out, err);
1200
1201         dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1202         dlm_rep->lock_flags = flags;
1203
1204         ldlm_lock2desc(lock, &dlm_rep->lock_desc);
1205         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
1206
1207         /* We never send a blocking AST until the lock is granted, but
1208          * we can tell it right now */
1209         lock_res_and_lock(lock);
1210
1211         /* Now take into account flags to be inherited from original lock
1212            request both in reply to client and in our own lock flags. */
1213         dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
1214         lock->l_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
1215
1216         /* Don't move a pending lock onto the export if it has already been
1217          * disconnected due to eviction (bug 5683) or server umount (bug 24324).
1218          * Cancel it now instead. */
1219         if (unlikely(req->rq_export->exp_disconnected ||
1220                      OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) {
1221                 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
1222                 rc = -ENOTCONN;
1223         } else if (lock->l_flags & LDLM_FL_AST_SENT) {
1224                 dlm_rep->lock_flags |= LDLM_FL_AST_SENT;
1225                 if (lock->l_granted_mode == lock->l_req_mode) {
1226                         /*
1227                          * Only cancel lock if it was granted, because it would
1228                          * be destroyed immediately and would never be granted
1229                          * in the future, causing timeouts on client.  Not
1230                          * granted lock will be cancelled immediately after
1231                          * sending completion AST.
1232                          */
1233                         if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) {
1234                                 unlock_res_and_lock(lock);
1235                                 ldlm_lock_cancel(lock);
1236                                 lock_res_and_lock(lock);
1237                         } else
1238                                 ldlm_add_waiting_lock(lock);
1239                 }
1240         }
1241         /* Make sure we never ever grant usual metadata locks to liblustre
1242            clients */
1243         if ((dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN ||
1244             dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) &&
1245              req->rq_export->exp_libclient) {
1246                 if (unlikely(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) ||
1247                              !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){
1248                         CERROR("Granting sync lock to libclient. "
1249                                "req fl %d, rep fl %d, lock fl "LPX64"\n",
1250                                dlm_req->lock_flags, dlm_rep->lock_flags,
1251                                lock->l_flags);
1252                         LDLM_ERROR(lock, "sync lock");
1253                         if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) {
1254                                 struct ldlm_intent *it;
1255
1256                                 it = req_capsule_client_get(&req->rq_pill,
1257                                                             &RMF_LDLM_INTENT);
1258                                 if (it != NULL) {
1259                                         CERROR("This is intent %s ("LPU64")\n",
1260                                                ldlm_it2str(it->opc), it->opc);
1261                                 }
1262                         }
1263                 }
1264         }
1265
1266         unlock_res_and_lock(lock);
1267
1268         EXIT;
1269  out:
1270         req->rq_status = rc ?: err; /* return either error - bug 11190 */
1271         if (!req->rq_packed_final) {
1272                 err = lustre_pack_reply(req, 1, NULL, NULL);
1273                 if (rc == 0)
1274                         rc = err;
1275         }
1276
1277         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
1278          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
1279         if (lock) {
1280                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
1281                            "(err=%d, rc=%d)", err, rc);
1282
1283                 if (rc == 0) {
1284                         if (lock->l_resource->lr_lvb_len > 0) {
1285                                 /* MDT path won't handle lr_lvb_data, so
1286                                  * lock/unlock better be contained in the
1287                                  * if block */
1288                                 void *lvb;
1289
1290                                 lvb = req_capsule_server_get(&req->rq_pill,
1291                                                              &RMF_DLM_LVB);
1292                                 LASSERTF(lvb != NULL, "req %p, lock %p\n",
1293                                          req, lock);
1294                                 lock_res(lock->l_resource);
1295                                 memcpy(lvb, lock->l_resource->lr_lvb_data,
1296                                        lock->l_resource->lr_lvb_len);
1297                                 unlock_res(lock->l_resource);
1298                         }
1299                 } else {
1300                         lock_res_and_lock(lock);
1301                         ldlm_resource_unlink_lock(lock);
1302                         ldlm_lock_destroy_nolock(lock);
1303                         unlock_res_and_lock(lock);
1304                 }
1305
1306                 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
1307                         ldlm_reprocess_all(lock->l_resource);
1308
1309                 LDLM_LOCK_RELEASE(lock);
1310         }
1311
1312         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
1313                           lock, rc);
1314
1315         return rc;
1316 }
1317
1318 int ldlm_handle_enqueue(struct ptlrpc_request *req,
1319                         ldlm_completion_callback completion_callback,
1320                         ldlm_blocking_callback blocking_callback,
1321                         ldlm_glimpse_callback glimpse_callback)
1322 {
1323         struct ldlm_request *dlm_req;
1324         struct ldlm_callback_suite cbs = {
1325                 .lcs_completion = completion_callback,
1326                 .lcs_blocking   = blocking_callback,
1327                 .lcs_glimpse    = glimpse_callback
1328         };
1329         int rc;
1330
1331         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1332         if (dlm_req != NULL) {
1333                 rc = ldlm_handle_enqueue0(req->rq_export->exp_obd->obd_namespace,
1334                                           req, dlm_req, &cbs);
1335         } else {
1336                 rc = -EFAULT;
1337         }
1338         return rc;
1339 }
1340
1341 int ldlm_handle_convert0(struct ptlrpc_request *req,
1342                          const struct ldlm_request *dlm_req)
1343 {
1344         struct ldlm_reply *dlm_rep;
1345         struct ldlm_lock *lock;
1346         int rc;
1347         ENTRY;
1348
1349         if (req->rq_export && req->rq_export->exp_nid_stats &&
1350             req->rq_export->exp_nid_stats->nid_ldlm_stats)
1351                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1352                                      LDLM_CONVERT - LDLM_FIRST_OPC);
1353
1354         rc = req_capsule_server_pack(&req->rq_pill);
1355         if (rc)
1356                 RETURN(rc);
1357
1358         dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1359         dlm_rep->lock_flags = dlm_req->lock_flags;
1360
1361         lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
1362         if (!lock) {
1363                 req->rq_status = EINVAL;
1364         } else {
1365                 void *res = NULL;
1366
1367                 LDLM_DEBUG(lock, "server-side convert handler START");
1368
1369                 lock->l_last_activity = cfs_time_current_sec();
1370                 res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
1371                                         &dlm_rep->lock_flags);
1372                 if (res) {
1373                         if (ldlm_del_waiting_lock(lock))
1374                                 LDLM_DEBUG(lock, "converted waiting lock");
1375                         req->rq_status = 0;
1376                 } else {
1377                         req->rq_status = EDEADLOCK;
1378                 }
1379         }
1380
1381         if (lock) {
1382                 if (!req->rq_status)
1383                         ldlm_reprocess_all(lock->l_resource);
1384                 LDLM_DEBUG(lock, "server-side convert handler END");
1385                 LDLM_LOCK_PUT(lock);
1386         } else
1387                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
1388
1389         RETURN(0);
1390 }
1391
1392 int ldlm_handle_convert(struct ptlrpc_request *req)
1393 {
1394         int rc;
1395         struct ldlm_request *dlm_req;
1396
1397         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1398         if (dlm_req != NULL) {
1399                 rc = ldlm_handle_convert0(req, dlm_req);
1400         } else {
1401                 CERROR ("Can't unpack dlm_req\n");
1402                 rc = -EFAULT;
1403         }
1404         return rc;
1405 }
1406
1407 /* Cancel all the locks whos handles are packed into ldlm_request */
1408 int ldlm_request_cancel(struct ptlrpc_request *req,
1409                         const struct ldlm_request *dlm_req, int first)
1410 {
1411         struct ldlm_resource *res, *pres = NULL;
1412         struct ldlm_lock *lock;
1413         int i, count, done = 0;
1414         ENTRY;
1415
1416         count = dlm_req->lock_count ? dlm_req->lock_count : 1;
1417         if (first >= count)
1418                 RETURN(0);
1419
1420         /* There is no lock on the server at the replay time,
1421          * skip lock cancelling to make replay tests to pass. */
1422         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1423                 RETURN(0);
1424
1425         LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, "
1426                           "starting at %d", count, first);
1427
1428         for (i = first; i < count; i++) {
1429                 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
1430                 if (!lock) {
1431                         LDLM_DEBUG_NOLOCK("server-side cancel handler stale "
1432                                           "lock (cookie "LPU64")",
1433                                           dlm_req->lock_handle[i].cookie);
1434                         continue;
1435                 }
1436
1437                 res = lock->l_resource;
1438                 done++;
1439
1440                 if (res != pres) {
1441                         if (pres != NULL) {
1442                                 ldlm_reprocess_all(pres);
1443                                 LDLM_RESOURCE_DELREF(pres);
1444                                 ldlm_resource_putref(pres);
1445                         }
1446                         if (res != NULL) {
1447                                 ldlm_resource_getref(res);
1448                                 LDLM_RESOURCE_ADDREF(res);
1449                                 ldlm_res_lvbo_update(res, NULL, 1);
1450                         }
1451                         pres = res;
1452                 }
1453                 ldlm_lock_cancel(lock);
1454                 LDLM_LOCK_PUT(lock);
1455         }
1456         if (pres != NULL) {
1457                 ldlm_reprocess_all(pres);
1458                 LDLM_RESOURCE_DELREF(pres);
1459                 ldlm_resource_putref(pres);
1460         }
1461         LDLM_DEBUG_NOLOCK("server-side cancel handler END");
1462         RETURN(done);
1463 }
1464
1465 int ldlm_handle_cancel(struct ptlrpc_request *req)
1466 {
1467         struct ldlm_request *dlm_req;
1468         int rc;
1469         ENTRY;
1470
1471         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1472         if (dlm_req == NULL) {
1473                 CDEBUG(D_INFO, "bad request buffer for cancel\n");
1474                 RETURN(-EFAULT);
1475         }
1476
1477         if (req->rq_export && req->rq_export->exp_nid_stats &&
1478             req->rq_export->exp_nid_stats->nid_ldlm_stats)
1479                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1480                                      LDLM_CANCEL - LDLM_FIRST_OPC);
1481
1482         rc = req_capsule_server_pack(&req->rq_pill);
1483         if (rc)
1484                 RETURN(rc);
1485
1486         if (!ldlm_request_cancel(req, dlm_req, 0))
1487                 req->rq_status = ESTALE;
1488
1489         RETURN(ptlrpc_reply(req));
1490 }
1491 #endif /* HAVE_SERVER_SUPPORT */
1492
1493 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
1494                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
1495 {
1496         int do_ast;
1497         ENTRY;
1498
1499         LDLM_DEBUG(lock, "client blocking AST callback handler");
1500
1501         lock_res_and_lock(lock);
1502         lock->l_flags |= LDLM_FL_CBPENDING;
1503
1504         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)
1505                 lock->l_flags |= LDLM_FL_CANCEL;
1506
1507         do_ast = (!lock->l_readers && !lock->l_writers);
1508         unlock_res_and_lock(lock);
1509
1510         if (do_ast) {
1511                 CDEBUG(D_DLMTRACE, "Lock %p already unused, calling callback (%p)\n",
1512                        lock, lock->l_blocking_ast);
1513                 if (lock->l_blocking_ast != NULL)
1514                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
1515                                              LDLM_CB_BLOCKING);
1516         } else {
1517                 CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
1518                        lock);
1519         }
1520
1521         LDLM_DEBUG(lock, "client blocking callback handler END");
1522         LDLM_LOCK_RELEASE(lock);
1523         EXIT;
1524 }
1525
1526 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
1527                                     struct ldlm_namespace *ns,
1528                                     struct ldlm_request *dlm_req,
1529                                     struct ldlm_lock *lock)
1530 {
1531         CFS_LIST_HEAD(ast_list);
1532         ENTRY;
1533
1534         LDLM_DEBUG(lock, "client completion callback handler START");
1535
1536         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
1537                 int to = cfs_time_seconds(1);
1538                 while (to > 0) {
1539                         cfs_schedule_timeout_and_set_state(
1540                                 CFS_TASK_INTERRUPTIBLE, to);
1541                         if (lock->l_granted_mode == lock->l_req_mode ||
1542                             lock->l_destroyed)
1543                                 break;
1544                 }
1545         }
1546
1547         lock_res_and_lock(lock);
1548         if (lock->l_destroyed ||
1549             lock->l_granted_mode == lock->l_req_mode) {
1550                 /* bug 11300: the lock has already been granted */
1551                 unlock_res_and_lock(lock);
1552                 LDLM_DEBUG(lock, "Double grant race happened");
1553                 LDLM_LOCK_RELEASE(lock);
1554                 EXIT;
1555                 return;
1556         }
1557
1558         /* If we receive the completion AST before the actual enqueue returned,
1559          * then we might need to switch lock modes, resources, or extents. */
1560         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
1561                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
1562                 LDLM_DEBUG(lock, "completion AST, new lock mode");
1563         }
1564
1565         if (lock->l_resource->lr_type != LDLM_PLAIN) {
1566                 ldlm_convert_policy_to_local(req->rq_export,
1567                                           dlm_req->lock_desc.l_resource.lr_type,
1568                                           &dlm_req->lock_desc.l_policy_data,
1569                                           &lock->l_policy_data);
1570                 LDLM_DEBUG(lock, "completion AST, new policy data");
1571         }
1572
1573         ldlm_resource_unlink_lock(lock);
1574         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
1575                    &lock->l_resource->lr_name,
1576                    sizeof(lock->l_resource->lr_name)) != 0) {
1577                 unlock_res_and_lock(lock);
1578                 if (ldlm_lock_change_resource(ns, lock,
1579                                 &dlm_req->lock_desc.l_resource.lr_name) != 0) {
1580                         LDLM_ERROR(lock, "Failed to allocate resource");
1581                         LDLM_LOCK_RELEASE(lock);
1582                         EXIT;
1583                         return;
1584                 }
1585                 LDLM_DEBUG(lock, "completion AST, new resource");
1586                 CERROR("change resource!\n");
1587                 lock_res_and_lock(lock);
1588         }
1589
1590         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
1591                 /* BL_AST locks are not needed in lru.
1592                  * let ldlm_cancel_lru() be fast. */
1593                 ldlm_lock_remove_from_lru(lock);
1594                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
1595                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
1596         }
1597
1598         if (lock->l_lvb_len) {
1599                 if (req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
1600                                          RCL_CLIENT) < lock->l_lvb_len) {
1601                         LDLM_ERROR(lock, "completion AST did not contain "
1602                                    "expected LVB!");
1603                 } else {
1604                         void *lvb = req_capsule_client_get(&req->rq_pill,
1605                                                            &RMF_DLM_LVB);
1606                         memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
1607                 }
1608         }
1609
1610         ldlm_grant_lock(lock, &ast_list);
1611         unlock_res_and_lock(lock);
1612
1613         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
1614
1615         /* Let Enqueue to call osc_lock_upcall() and initialize
1616          * l_ast_data */
1617         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
1618
1619         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
1620
1621         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
1622                           lock);
1623         LDLM_LOCK_RELEASE(lock);
1624         EXIT;
1625 }
1626
1627 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
1628                                     struct ldlm_namespace *ns,
1629                                     struct ldlm_request *dlm_req,
1630                                     struct ldlm_lock *lock)
1631 {
1632         int rc = -ENOSYS;
1633         ENTRY;
1634
1635         LDLM_DEBUG(lock, "client glimpse AST callback handler");
1636
1637         if (lock->l_glimpse_ast != NULL)
1638                 rc = lock->l_glimpse_ast(lock, req);
1639
1640         if (req->rq_repmsg != NULL) {
1641                 ptlrpc_reply(req);
1642         } else {
1643                 req->rq_status = rc;
1644                 ptlrpc_error(req);
1645         }
1646
1647         lock_res_and_lock(lock);
1648         if (lock->l_granted_mode == LCK_PW &&
1649             !lock->l_readers && !lock->l_writers &&
1650             cfs_time_after(cfs_time_current(),
1651                            cfs_time_add(lock->l_last_used,
1652                                         cfs_time_seconds(10)))) {
1653                 unlock_res_and_lock(lock);
1654                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
1655                         ldlm_handle_bl_callback(ns, NULL, lock);
1656
1657                 EXIT;
1658                 return;
1659         }
1660         unlock_res_and_lock(lock);
1661         LDLM_LOCK_RELEASE(lock);
1662         EXIT;
1663 }
1664
1665 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
1666 {
1667         if (req->rq_no_reply)
1668                 return 0;
1669
1670         req->rq_status = rc;
1671         if (!req->rq_packed_final) {
1672                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1673                 if (rc)
1674                         return rc;
1675         }
1676         return ptlrpc_reply(req);
1677 }
1678
1679 #ifdef __KERNEL__
1680 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi, int mode)
1681 {
1682         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1683         ENTRY;
1684
1685         cfs_spin_lock(&blp->blp_lock);
1686         if (blwi->blwi_lock && blwi->blwi_lock->l_flags & LDLM_FL_DISCARD_DATA) {
1687                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
1688                 cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
1689         } else {
1690                 /* other blocking callbacks are added to the regular list */
1691                 cfs_list_add_tail(&blwi->blwi_entry, &blp->blp_list);
1692         }
1693         cfs_spin_unlock(&blp->blp_lock);
1694
1695         cfs_waitq_signal(&blp->blp_waitq);
1696
1697         /* can not use blwi->blwi_mode as blwi could be already freed in
1698            LDLM_ASYNC mode */
1699         if (mode == LDLM_SYNC)
1700                 cfs_wait_for_completion(&blwi->blwi_comp);
1701
1702         RETURN(0);
1703 }
1704
1705 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
1706                              struct ldlm_namespace *ns,
1707                              struct ldlm_lock_desc *ld,
1708                              cfs_list_t *cancels, int count,
1709                              struct ldlm_lock *lock,
1710                              int mode)
1711 {
1712         cfs_init_completion(&blwi->blwi_comp);
1713         CFS_INIT_LIST_HEAD(&blwi->blwi_head);
1714
1715         if (cfs_memory_pressure_get())
1716                 blwi->blwi_mem_pressure = 1;
1717
1718         blwi->blwi_ns = ns;
1719         blwi->blwi_mode = mode;
1720         if (ld != NULL)
1721                 blwi->blwi_ld = *ld;
1722         if (count) {
1723                 cfs_list_add(&blwi->blwi_head, cancels);
1724                 cfs_list_del_init(cancels);
1725                 blwi->blwi_count = count;
1726         } else {
1727                 blwi->blwi_lock = lock;
1728         }
1729 }
1730
1731 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
1732                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock,
1733                              cfs_list_t *cancels, int count, int mode)
1734 {
1735         ENTRY;
1736
1737         if (cancels && count == 0)
1738                 RETURN(0);
1739
1740         if (mode == LDLM_SYNC) {
1741                 /* if it is synchronous call do minimum mem alloc, as it could
1742                  * be triggered from kernel shrinker
1743                  */
1744                 struct ldlm_bl_work_item blwi;
1745                 memset(&blwi, 0, sizeof(blwi));
1746                 init_blwi(&blwi, ns, ld, cancels, count, lock, LDLM_SYNC);
1747                 RETURN(__ldlm_bl_to_thread(&blwi, LDLM_SYNC));
1748         } else {
1749                 struct ldlm_bl_work_item *blwi;
1750                 OBD_ALLOC(blwi, sizeof(*blwi));
1751                 if (blwi == NULL)
1752                         RETURN(-ENOMEM);
1753                 init_blwi(blwi, ns, ld, cancels, count, lock, LDLM_ASYNC);
1754
1755                 RETURN(__ldlm_bl_to_thread(blwi, LDLM_ASYNC));
1756         }
1757 }
1758
1759 #endif
1760
1761 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
1762                            struct ldlm_lock *lock)
1763 {
1764 #ifdef __KERNEL__
1765         RETURN(ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LDLM_ASYNC));
1766 #else
1767         RETURN(-ENOSYS);
1768 #endif
1769 }
1770
1771 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
1772                            cfs_list_t *cancels, int count, int mode)
1773 {
1774 #ifdef __KERNEL__
1775         RETURN(ldlm_bl_to_thread(ns, ld, NULL, cancels, count, mode));
1776 #else
1777         RETURN(-ENOSYS);
1778 #endif
1779 }
1780
1781 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
1782 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
1783 {
1784         struct obd_device *obd = req->rq_export->exp_obd;
1785         char *key;
1786         void *val;
1787         int keylen, vallen;
1788         int rc = -ENOSYS;
1789         ENTRY;
1790
1791         DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
1792
1793         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
1794
1795         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
1796         if (key == NULL) {
1797                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
1798                 RETURN(-EFAULT);
1799         }
1800         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
1801                                       RCL_CLIENT);
1802         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
1803         if (val == NULL) {
1804                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
1805                 RETURN(-EFAULT);
1806         }
1807         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
1808                                       RCL_CLIENT);
1809
1810         /* We are responsible for swabbing contents of val */
1811
1812         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
1813                 /* Pass it on to mdc (the "export" in this case) */
1814                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
1815                                         req->rq_export,
1816                                         sizeof(KEY_HSM_COPYTOOL_SEND),
1817                                         KEY_HSM_COPYTOOL_SEND,
1818                                         vallen, val, NULL);
1819         else
1820                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
1821
1822         return rc;
1823 }
1824
1825 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
1826                                         const char *msg, int rc,
1827                                         struct lustre_handle *handle)
1828 {
1829         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
1830                   "%s: [nid %s] [rc %d] [lock "LPX64"]",
1831                   msg, libcfs_id2str(req->rq_peer), rc,
1832                   handle ? handle->cookie : 0);
1833         if (req->rq_no_reply)
1834                 CWARN("No reply was sent, maybe cause bug 21636.\n");
1835         else if (rc)
1836                 CWARN("Send reply failed, maybe cause bug 21636.\n");
1837 }
1838
1839 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
1840 static int ldlm_callback_handler(struct ptlrpc_request *req)
1841 {
1842         struct ldlm_namespace *ns;
1843         struct ldlm_request *dlm_req;
1844         struct ldlm_lock *lock;
1845         int rc;
1846         ENTRY;
1847
1848         /* Requests arrive in sender's byte order.  The ptlrpc service
1849          * handler has already checked and, if necessary, byte-swapped the
1850          * incoming request message body, but I am responsible for the
1851          * message buffers. */
1852
1853         /* do nothing for sec context finalize */
1854         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
1855                 RETURN(0);
1856
1857         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1858
1859         if (req->rq_export == NULL) {
1860                 rc = ldlm_callback_reply(req, -ENOTCONN);
1861                 ldlm_callback_errmsg(req, "Operate on unconnected server",
1862                                      rc, NULL);
1863                 RETURN(0);
1864         }
1865
1866         LASSERT(req->rq_export != NULL);
1867         LASSERT(req->rq_export->exp_obd != NULL);
1868
1869         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1870         case LDLM_BL_CALLBACK:
1871                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK))
1872                         RETURN(0);
1873                 break;
1874         case LDLM_CP_CALLBACK:
1875                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK))
1876                         RETURN(0);
1877                 break;
1878         case LDLM_GL_CALLBACK:
1879                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK))
1880                         RETURN(0);
1881                 break;
1882         case LDLM_SET_INFO:
1883                 rc = ldlm_handle_setinfo(req);
1884                 ldlm_callback_reply(req, rc);
1885                 RETURN(0);
1886         case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
1887                 CERROR("shouldn't be handling OBD_LOG_CANCEL on DLM thread\n");
1888                 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
1889                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
1890                         RETURN(0);
1891                 rc = llog_origin_handle_cancel(req);
1892                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
1893                         RETURN(0);
1894                 ldlm_callback_reply(req, rc);
1895                 RETURN(0);
1896         case OBD_QC_CALLBACK:
1897                 req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
1898                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
1899                         RETURN(0);
1900                 rc = target_handle_qc_callback(req);
1901                 ldlm_callback_reply(req, rc);
1902                 RETURN(0);
1903         case QUOTA_DQACQ:
1904         case QUOTA_DQREL:
1905                 /* reply in handler */
1906                 req_capsule_set(&req->rq_pill, &RQF_MDS_QUOTA_DQACQ);
1907                 rc = target_handle_dqacq_callback(req);
1908                 RETURN(0);
1909         case LLOG_ORIGIN_HANDLE_CREATE:
1910                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
1911                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
1912                         RETURN(0);
1913                 rc = llog_origin_handle_create(req);
1914                 ldlm_callback_reply(req, rc);
1915                 RETURN(0);
1916         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1917                 req_capsule_set(&req->rq_pill,
1918                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
1919                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
1920                         RETURN(0);
1921                 rc = llog_origin_handle_next_block(req);
1922                 ldlm_callback_reply(req, rc);
1923                 RETURN(0);
1924         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1925                 req_capsule_set(&req->rq_pill,
1926                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
1927                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
1928                         RETURN(0);
1929                 rc = llog_origin_handle_read_header(req);
1930                 ldlm_callback_reply(req, rc);
1931                 RETURN(0);
1932         case LLOG_ORIGIN_HANDLE_CLOSE:
1933                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
1934                         RETURN(0);
1935                 rc = llog_origin_handle_close(req);
1936                 ldlm_callback_reply(req, rc);
1937                 RETURN(0);
1938         default:
1939                 CERROR("unknown opcode %u\n",
1940                        lustre_msg_get_opc(req->rq_reqmsg));
1941                 ldlm_callback_reply(req, -EPROTO);
1942                 RETURN(0);
1943         }
1944
1945         ns = req->rq_export->exp_obd->obd_namespace;
1946         LASSERT(ns != NULL);
1947
1948         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
1949
1950         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1951         if (dlm_req == NULL) {
1952                 rc = ldlm_callback_reply(req, -EPROTO);
1953                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
1954                                      NULL);
1955                 RETURN(0);
1956         }
1957
1958         /* Force a known safe race, send a cancel to the server for a lock
1959          * which the server has already started a blocking callback on. */
1960         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
1961             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
1962                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0]);
1963                 if (rc < 0)
1964                         CERROR("ldlm_cli_cancel: %d\n", rc);
1965         }
1966
1967         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
1968         if (!lock) {
1969                 CDEBUG(D_DLMTRACE, "callback on lock "LPX64" - lock "
1970                        "disappeared\n", dlm_req->lock_handle[0].cookie);
1971                 rc = ldlm_callback_reply(req, -EINVAL);
1972                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
1973                                      &dlm_req->lock_handle[0]);
1974                 RETURN(0);
1975         }
1976
1977         if ((lock->l_flags & LDLM_FL_FAIL_LOC) &&
1978             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
1979                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
1980
1981         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
1982         lock_res_and_lock(lock);
1983         lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
1984         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
1985                 /* If somebody cancels lock and cache is already dropped,
1986                  * or lock is failed before cp_ast received on client,
1987                  * we can tell the server we have no lock. Otherwise, we
1988                  * should send cancel after dropping the cache. */
1989                 if (((lock->l_flags & LDLM_FL_CANCELING) &&
1990                     (lock->l_flags & LDLM_FL_BL_DONE)) ||
1991                     (lock->l_flags & LDLM_FL_FAILED)) {
1992                         LDLM_DEBUG(lock, "callback on lock "
1993                                    LPX64" - lock disappeared\n",
1994                                    dlm_req->lock_handle[0].cookie);
1995                         unlock_res_and_lock(lock);
1996                         LDLM_LOCK_RELEASE(lock);
1997                         rc = ldlm_callback_reply(req, -EINVAL);
1998                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
1999                                              &dlm_req->lock_handle[0]);
2000                         RETURN(0);
2001                 }
2002                 /* BL_AST locks are not needed in lru.
2003                  * let ldlm_cancel_lru() be fast. */
2004                 ldlm_lock_remove_from_lru(lock);
2005                 lock->l_flags |= LDLM_FL_BL_AST;
2006         }
2007         unlock_res_and_lock(lock);
2008
2009         /* We want the ost thread to get this reply so that it can respond
2010          * to ost requests (write cache writeback) that might be triggered
2011          * in the callback.
2012          *
2013          * But we'd also like to be able to indicate in the reply that we're
2014          * cancelling right now, because it's unused, or have an intent result
2015          * in the reply, so we might have to push the responsibility for sending
2016          * the reply down into the AST handlers, alas. */
2017
2018         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2019         case LDLM_BL_CALLBACK:
2020                 CDEBUG(D_INODE, "blocking ast\n");
2021                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
2022                 if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)) {
2023                         rc = ldlm_callback_reply(req, 0);
2024                         if (req->rq_no_reply || rc)
2025                                 ldlm_callback_errmsg(req, "Normal process", rc,
2026                                                      &dlm_req->lock_handle[0]);
2027                 }
2028                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
2029                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
2030                 break;
2031         case LDLM_CP_CALLBACK:
2032                 CDEBUG(D_INODE, "completion ast\n");
2033                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
2034                 ldlm_callback_reply(req, 0);
2035                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
2036                 break;
2037         case LDLM_GL_CALLBACK:
2038                 CDEBUG(D_INODE, "glimpse ast\n");
2039                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
2040                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
2041                 break;
2042         default:
2043                 LBUG();                         /* checked above */
2044         }
2045
2046         RETURN(0);
2047 }
2048
2049 #ifdef HAVE_SERVER_SUPPORT
2050 static int ldlm_cancel_handler(struct ptlrpc_request *req)
2051 {
2052         int rc;
2053         ENTRY;
2054
2055         /* Requests arrive in sender's byte order.  The ptlrpc service
2056          * handler has already checked and, if necessary, byte-swapped the
2057          * incoming request message body, but I am responsible for the
2058          * message buffers. */
2059
2060         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2061
2062         if (req->rq_export == NULL) {
2063                 struct ldlm_request *dlm_req;
2064
2065                 CERROR("%s from %s arrived at %lu with bad export cookie "
2066                        LPU64"\n",
2067                        ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
2068                        libcfs_nid2str(req->rq_peer.nid),
2069                        req->rq_arrival_time.tv_sec,
2070                        lustre_msg_get_handle(req->rq_reqmsg)->cookie);
2071
2072                 if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
2073                         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
2074                         dlm_req = req_capsule_client_get(&req->rq_pill,
2075                                                          &RMF_DLM_REQ);
2076                         if (dlm_req != NULL)
2077                                 ldlm_lock_dump_handle(D_ERROR,
2078                                                       &dlm_req->lock_handle[0]);
2079                 }
2080                 ldlm_callback_reply(req, -ENOTCONN);
2081                 RETURN(0);
2082         }
2083
2084         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2085
2086         /* XXX FIXME move this back to mds/handler.c, bug 249 */
2087         case LDLM_CANCEL:
2088                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2089                 CDEBUG(D_INODE, "cancel\n");
2090                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
2091                         RETURN(0);
2092                 rc = ldlm_handle_cancel(req);
2093                 if (rc)
2094                         break;
2095                 RETURN(0);
2096         case OBD_LOG_CANCEL:
2097                 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
2098                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
2099                         RETURN(0);
2100                 rc = llog_origin_handle_cancel(req);
2101                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
2102                         RETURN(0);
2103                 ldlm_callback_reply(req, rc);
2104                 RETURN(0);
2105         default:
2106                 CERROR("invalid opcode %d\n",
2107                        lustre_msg_get_opc(req->rq_reqmsg));
2108                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
2109                 ldlm_callback_reply(req, -EINVAL);
2110         }
2111
2112         RETURN(0);
2113 }
2114
2115 static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req,
2116                                         struct ldlm_lock *lock)
2117 {
2118         struct ldlm_request *dlm_req;
2119         struct lustre_handle lockh;
2120         int rc = 0;
2121         int i;
2122         ENTRY;
2123
2124         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2125         if (dlm_req == NULL)
2126                 RETURN(0);
2127
2128         ldlm_lock2handle(lock, &lockh);
2129         for (i = 0; i < dlm_req->lock_count; i++) {
2130                 if (lustre_handle_equal(&dlm_req->lock_handle[i],
2131                                         &lockh)) {
2132                         DEBUG_REQ(D_RPCTRACE, req,
2133                                   "Prio raised by lock "LPX64".", lockh.cookie);
2134
2135                         rc = 1;
2136                         break;
2137                 }
2138         }
2139
2140         RETURN(rc);
2141
2142 }
2143
2144 static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req)
2145 {
2146         struct ldlm_request *dlm_req;
2147         int rc = 0;
2148         int i;
2149         ENTRY;
2150
2151         /* no prolong in recovery */
2152         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
2153                 RETURN(0);
2154
2155         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2156         if (dlm_req == NULL)
2157                 RETURN(-EFAULT);
2158
2159         for (i = 0; i < dlm_req->lock_count; i++) {
2160                 struct ldlm_lock *lock;
2161
2162                 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
2163                 if (lock == NULL)
2164                         continue;
2165
2166                 rc = !!(lock->l_flags & LDLM_FL_AST_SENT);
2167                 if (rc)
2168                         LDLM_DEBUG(lock, "hpreq cancel lock");
2169                 LDLM_LOCK_PUT(lock);
2170
2171                 if (rc)
2172                         break;
2173         }
2174
2175         RETURN(rc);
2176 }
2177
2178 static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = {
2179         .hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
2180         .hpreq_check      = ldlm_cancel_hpreq_check
2181 };
2182
2183 static int ldlm_hpreq_handler(struct ptlrpc_request *req)
2184 {
2185         ENTRY;
2186
2187         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2188
2189         if (req->rq_export == NULL)
2190                 RETURN(0);
2191
2192         if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) {
2193                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2194                 req->rq_ops = &ldlm_cancel_hpreq_ops;
2195         }
2196         RETURN(0);
2197 }
2198
2199 int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
2200                         cfs_hlist_node_t *hnode, void *data)
2201
2202 {
2203         cfs_list_t         *rpc_list = data;
2204         struct ldlm_lock   *lock = cfs_hash_object(hs, hnode);
2205
2206         lock_res_and_lock(lock);
2207
2208         if (lock->l_req_mode != lock->l_granted_mode) {
2209                 unlock_res_and_lock(lock);
2210                 return 0;
2211         }
2212
2213         LASSERT(lock->l_resource);
2214         if (lock->l_resource->lr_type != LDLM_IBITS &&
2215             lock->l_resource->lr_type != LDLM_PLAIN) {
2216                 unlock_res_and_lock(lock);
2217                 return 0;
2218         }
2219
2220         if (lock->l_flags & LDLM_FL_AST_SENT) {
2221                 unlock_res_and_lock(lock);
2222                 return 0;
2223         }
2224
2225         LASSERT(lock->l_blocking_ast);
2226         LASSERT(!lock->l_blocking_lock);
2227
2228         lock->l_flags |= LDLM_FL_AST_SENT;
2229         if (lock->l_export && lock->l_export->exp_lock_hash &&
2230             !cfs_hlist_unhashed(&lock->l_exp_hash))
2231                 cfs_hash_del(lock->l_export->exp_lock_hash,
2232                              &lock->l_remote_handle, &lock->l_exp_hash);
2233         cfs_list_add_tail(&lock->l_rk_ast, rpc_list);
2234         LDLM_LOCK_GET(lock);
2235
2236         unlock_res_and_lock(lock);
2237         return 0;
2238 }
2239
2240 void ldlm_revoke_export_locks(struct obd_export *exp)
2241 {
2242         cfs_list_t  rpc_list;
2243         ENTRY;
2244
2245         CFS_INIT_LIST_HEAD(&rpc_list);
2246         cfs_hash_for_each_empty(exp->exp_lock_hash,
2247                                 ldlm_revoke_lock_cb, &rpc_list);
2248         ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
2249                           LDLM_WORK_REVOKE_AST);
2250
2251         EXIT;
2252 }
2253 #endif /* HAVE_SERVER_SUPPORT */
2254
2255 #ifdef __KERNEL__
2256 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
2257 {
2258         struct ldlm_bl_work_item *blwi = NULL;
2259         static unsigned int num_bl = 0;
2260
2261         cfs_spin_lock(&blp->blp_lock);
2262         /* process a request from the blp_list at least every blp_num_threads */
2263         if (!cfs_list_empty(&blp->blp_list) &&
2264             (cfs_list_empty(&blp->blp_prio_list) || num_bl == 0))
2265                 blwi = cfs_list_entry(blp->blp_list.next,
2266                                       struct ldlm_bl_work_item, blwi_entry);
2267         else
2268                 if (!cfs_list_empty(&blp->blp_prio_list))
2269                         blwi = cfs_list_entry(blp->blp_prio_list.next,
2270                                               struct ldlm_bl_work_item,
2271                                               blwi_entry);
2272
2273         if (blwi) {
2274                 if (++num_bl >= cfs_atomic_read(&blp->blp_num_threads))
2275                         num_bl = 0;
2276                 cfs_list_del(&blwi->blwi_entry);
2277         }
2278         cfs_spin_unlock(&blp->blp_lock);
2279
2280         return blwi;
2281 }
2282
2283 /* This only contains temporary data until the thread starts */
2284 struct ldlm_bl_thread_data {
2285         char                    bltd_name[CFS_CURPROC_COMM_MAX];
2286         struct ldlm_bl_pool     *bltd_blp;
2287         cfs_completion_t        bltd_comp;
2288         int                     bltd_num;
2289 };
2290
2291 static int ldlm_bl_thread_main(void *arg);
2292
2293 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
2294 {
2295         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
2296         int rc;
2297
2298         cfs_init_completion(&bltd.bltd_comp);
2299         rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0);
2300         if (rc < 0) {
2301                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
2302                        cfs_atomic_read(&blp->blp_num_threads), rc);
2303                 return rc;
2304         }
2305         cfs_wait_for_completion(&bltd.bltd_comp);
2306
2307         return 0;
2308 }
2309
2310 static int ldlm_bl_thread_main(void *arg)
2311 {
2312         struct ldlm_bl_pool *blp;
2313         ENTRY;
2314
2315         {
2316                 struct ldlm_bl_thread_data *bltd = arg;
2317
2318                 blp = bltd->bltd_blp;
2319
2320                 bltd->bltd_num =
2321                         cfs_atomic_inc_return(&blp->blp_num_threads) - 1;
2322                 cfs_atomic_inc(&blp->blp_busy_threads);
2323
2324                 snprintf(bltd->bltd_name, sizeof(bltd->bltd_name) - 1,
2325                         "ldlm_bl_%02d", bltd->bltd_num);
2326                 cfs_daemonize(bltd->bltd_name);
2327
2328                 cfs_complete(&bltd->bltd_comp);
2329                 /* cannot use bltd after this, it is only on caller's stack */
2330         }
2331
2332         while (1) {
2333                 struct l_wait_info lwi = { 0 };
2334                 struct ldlm_bl_work_item *blwi = NULL;
2335                 int busy;
2336
2337                 blwi = ldlm_bl_get_work(blp);
2338
2339                 if (blwi == NULL) {
2340                         cfs_atomic_dec(&blp->blp_busy_threads);
2341                         l_wait_event_exclusive(blp->blp_waitq,
2342                                          (blwi = ldlm_bl_get_work(blp)) != NULL,
2343                                          &lwi);
2344                         busy = cfs_atomic_inc_return(&blp->blp_busy_threads);
2345                 } else {
2346                         busy = cfs_atomic_read(&blp->blp_busy_threads);
2347                 }
2348
2349                 if (blwi->blwi_ns == NULL)
2350                         /* added by ldlm_cleanup() */
2351                         break;
2352
2353                 /* Not fatal if racy and have a few too many threads */
2354                 if (unlikely(busy < blp->blp_max_threads &&
2355                              busy >= cfs_atomic_read(&blp->blp_num_threads) &&
2356                              !blwi->blwi_mem_pressure))
2357                         /* discard the return value, we tried */
2358                         ldlm_bl_thread_start(blp);
2359
2360                 if (blwi->blwi_mem_pressure)
2361                         cfs_memory_pressure_set();
2362
2363                 if (blwi->blwi_count) {
2364                         int count;
2365                         /* The special case when we cancel locks in lru
2366                          * asynchronously, we pass the list of locks here.
2367                          * Thus locks are marked LDLM_FL_CANCELING, but NOT
2368                          * canceled locally yet. */
2369                         count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
2370                                                            blwi->blwi_count,
2371                                                            LCF_BL_AST);
2372                         ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL, 0);
2373                 } else {
2374                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
2375                                                 blwi->blwi_lock);
2376                 }
2377                 if (blwi->blwi_mem_pressure)
2378                         cfs_memory_pressure_clr();
2379
2380                 if (blwi->blwi_mode == LDLM_ASYNC)
2381                         OBD_FREE(blwi, sizeof(*blwi));
2382                 else
2383                         cfs_complete(&blwi->blwi_comp);
2384         }
2385
2386         cfs_atomic_dec(&blp->blp_busy_threads);
2387         cfs_atomic_dec(&blp->blp_num_threads);
2388         cfs_complete(&blp->blp_comp);
2389         RETURN(0);
2390 }
2391
2392 #endif
2393
2394 static int ldlm_setup(void);
2395 static int ldlm_cleanup(void);
2396
2397 int ldlm_get_ref(void)
2398 {
2399         int rc = 0;
2400         ENTRY;
2401         cfs_mutex_lock(&ldlm_ref_mutex);
2402         if (++ldlm_refcount == 1) {
2403                 rc = ldlm_setup();
2404                 if (rc)
2405                         ldlm_refcount--;
2406         }
2407         cfs_mutex_unlock(&ldlm_ref_mutex);
2408
2409         RETURN(rc);
2410 }
2411
2412 void ldlm_put_ref(void)
2413 {
2414         ENTRY;
2415         cfs_mutex_lock(&ldlm_ref_mutex);
2416         if (ldlm_refcount == 1) {
2417                 int rc = ldlm_cleanup();
2418                 if (rc)
2419                         CERROR("ldlm_cleanup failed: %d\n", rc);
2420                 else
2421                         ldlm_refcount--;
2422         } else {
2423                 ldlm_refcount--;
2424         }
2425         cfs_mutex_unlock(&ldlm_ref_mutex);
2426
2427         EXIT;
2428 }
2429
2430 /*
2431  * Export handle<->lock hash operations.
2432  */
2433 static unsigned
2434 ldlm_export_lock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
2435 {
2436         return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
2437 }
2438
2439 static void *
2440 ldlm_export_lock_key(cfs_hlist_node_t *hnode)
2441 {
2442         struct ldlm_lock *lock;
2443
2444         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2445         return &lock->l_remote_handle;
2446 }
2447
2448 static void
2449 ldlm_export_lock_keycpy(cfs_hlist_node_t *hnode, void *key)
2450 {
2451         struct ldlm_lock     *lock;
2452
2453         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2454         lock->l_remote_handle = *(struct lustre_handle *)key;
2455 }
2456
2457 static int
2458 ldlm_export_lock_keycmp(const void *key, cfs_hlist_node_t *hnode)
2459 {
2460         return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
2461 }
2462
2463 static void *
2464 ldlm_export_lock_object(cfs_hlist_node_t *hnode)
2465 {
2466         return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2467 }
2468
2469 static void
2470 ldlm_export_lock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
2471 {
2472         struct ldlm_lock *lock;
2473
2474         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2475         LDLM_LOCK_GET(lock);
2476 }
2477
2478 static void
2479 ldlm_export_lock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
2480 {
2481         struct ldlm_lock *lock;
2482
2483         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2484         LDLM_LOCK_RELEASE(lock);
2485 }
2486
2487 static cfs_hash_ops_t ldlm_export_lock_ops = {
2488         .hs_hash        = ldlm_export_lock_hash,
2489         .hs_key         = ldlm_export_lock_key,
2490         .hs_keycmp      = ldlm_export_lock_keycmp,
2491         .hs_keycpy      = ldlm_export_lock_keycpy,
2492         .hs_object      = ldlm_export_lock_object,
2493         .hs_get         = ldlm_export_lock_get,
2494         .hs_put         = ldlm_export_lock_put,
2495         .hs_put_locked  = ldlm_export_lock_put,
2496 };
2497
2498 int ldlm_init_export(struct obd_export *exp)
2499 {
2500         ENTRY;
2501
2502         exp->exp_lock_hash =
2503                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
2504                                 HASH_EXP_LOCK_CUR_BITS,
2505                                 HASH_EXP_LOCK_MAX_BITS,
2506                                 HASH_EXP_LOCK_BKT_BITS, 0,
2507                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
2508                                 &ldlm_export_lock_ops,
2509                                 CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY |
2510                                 CFS_HASH_NBLK_CHANGE);
2511
2512         if (!exp->exp_lock_hash)
2513                 RETURN(-ENOMEM);
2514
2515         RETURN(0);
2516 }
2517 EXPORT_SYMBOL(ldlm_init_export);
2518
2519 void ldlm_destroy_export(struct obd_export *exp)
2520 {
2521         ENTRY;
2522         cfs_hash_putref(exp->exp_lock_hash);
2523         exp->exp_lock_hash = NULL;
2524         EXIT;
2525 }
2526 EXPORT_SYMBOL(ldlm_destroy_export);
2527
2528 static int ldlm_setup(void)
2529 {
2530         struct ldlm_bl_pool *blp;
2531         int rc = 0;
2532         int ldlm_min_threads = LDLM_THREADS_AUTO_MIN;
2533         int ldlm_max_threads = LDLM_THREADS_AUTO_MAX;
2534 #ifdef __KERNEL__
2535         int i;
2536 #endif
2537         ENTRY;
2538
2539         if (ldlm_state != NULL)
2540                 RETURN(-EALREADY);
2541
2542         OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
2543         if (ldlm_state == NULL)
2544                 RETURN(-ENOMEM);
2545
2546 #ifdef LPROCFS
2547         rc = ldlm_proc_setup();
2548         if (rc != 0)
2549                 GOTO(out_free, rc);
2550 #endif
2551
2552 #ifdef __KERNEL__
2553         if (ldlm_num_threads) {
2554                 /* If ldlm_num_threads is set, it is the min and the max. */
2555                 if (ldlm_num_threads > LDLM_THREADS_AUTO_MAX)
2556                         ldlm_num_threads = LDLM_THREADS_AUTO_MAX;
2557                 if (ldlm_num_threads < LDLM_THREADS_AUTO_MIN)
2558                         ldlm_num_threads = LDLM_THREADS_AUTO_MIN;
2559                 ldlm_min_threads = ldlm_max_threads = ldlm_num_threads;
2560         }
2561 #endif
2562
2563         ldlm_state->ldlm_cb_service =
2564                 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
2565                                 LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
2566                                 LDLM_CB_REPLY_PORTAL, 2,
2567                                 ldlm_callback_handler, "ldlm_cbd",
2568                                 ldlm_svc_proc_dir, NULL,
2569                                 ldlm_min_threads, ldlm_max_threads,
2570                                 "ldlm_cb",
2571                                 LCT_MD_THREAD|LCT_DT_THREAD, NULL);
2572
2573         if (!ldlm_state->ldlm_cb_service) {
2574                 CERROR("failed to start service\n");
2575                 GOTO(out_proc, rc = -ENOMEM);
2576         }
2577
2578 #ifdef HAVE_SERVER_SUPPORT
2579         ldlm_state->ldlm_cancel_service =
2580                 ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
2581                                 LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL,
2582                                 LDLM_CANCEL_REPLY_PORTAL, 6,
2583                                 ldlm_cancel_handler, "ldlm_canceld",
2584                                 ldlm_svc_proc_dir, NULL,
2585                                 ldlm_min_threads, ldlm_max_threads,
2586                                 "ldlm_cn",
2587                                 LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
2588                                 ldlm_hpreq_handler);
2589
2590         if (!ldlm_state->ldlm_cancel_service) {
2591                 CERROR("failed to start service\n");
2592                 GOTO(out_proc, rc = -ENOMEM);
2593         }
2594 #endif
2595
2596         OBD_ALLOC(blp, sizeof(*blp));
2597         if (blp == NULL)
2598                 GOTO(out_proc, rc = -ENOMEM);
2599         ldlm_state->ldlm_bl_pool = blp;
2600
2601         cfs_spin_lock_init(&blp->blp_lock);
2602         CFS_INIT_LIST_HEAD(&blp->blp_list);
2603         CFS_INIT_LIST_HEAD(&blp->blp_prio_list);
2604         cfs_waitq_init(&blp->blp_waitq);
2605         cfs_atomic_set(&blp->blp_num_threads, 0);
2606         cfs_atomic_set(&blp->blp_busy_threads, 0);
2607         blp->blp_min_threads = ldlm_min_threads;
2608         blp->blp_max_threads = ldlm_max_threads;
2609
2610 #ifdef __KERNEL__
2611         for (i = 0; i < blp->blp_min_threads; i++) {
2612                 rc = ldlm_bl_thread_start(blp);
2613                 if (rc < 0)
2614                         GOTO(out_thread, rc);
2615         }
2616
2617 # ifdef HAVE_SERVER_SUPPORT
2618         rc = ptlrpc_start_threads(ldlm_state->ldlm_cancel_service);
2619         if (rc)
2620                 GOTO(out_thread, rc);
2621 # endif
2622
2623         rc = ptlrpc_start_threads(ldlm_state->ldlm_cb_service);
2624         if (rc)
2625                 GOTO(out_thread, rc);
2626
2627         CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
2628         expired_lock_thread.elt_state = ELT_STOPPED;
2629         cfs_waitq_init(&expired_lock_thread.elt_waitq);
2630
2631         CFS_INIT_LIST_HEAD(&waiting_locks_list);
2632         cfs_spin_lock_init(&waiting_locks_spinlock);
2633         cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
2634
2635         rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS);
2636         if (rc < 0) {
2637                 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
2638                 GOTO(out_thread, rc);
2639         }
2640
2641         cfs_wait_event(expired_lock_thread.elt_waitq,
2642                        expired_lock_thread.elt_state == ELT_READY);
2643 #endif
2644
2645 #ifdef __KERNEL__
2646         rc = ldlm_pools_init();
2647         if (rc)
2648                 GOTO(out_thread, rc);
2649 #endif
2650         RETURN(0);
2651
2652 #ifdef __KERNEL__
2653  out_thread:
2654 # ifdef HAVE_SERVER_SUPPORT
2655         ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
2656 # endif
2657         ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
2658 #endif
2659
2660  out_proc:
2661 #ifdef LPROCFS
2662         ldlm_proc_cleanup();
2663  out_free:
2664 #endif
2665         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
2666         ldlm_state = NULL;
2667         return rc;
2668 }
2669
2670 static int ldlm_cleanup(void)
2671 {
2672 #ifdef __KERNEL__
2673         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
2674 #endif
2675         ENTRY;
2676
2677         if (!cfs_list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
2678             !cfs_list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
2679                 CERROR("ldlm still has namespaces; clean these up first.\n");
2680                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
2681                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
2682                 RETURN(-EBUSY);
2683         }
2684
2685 #ifdef __KERNEL__
2686         ldlm_pools_fini();
2687 #endif
2688
2689 #ifdef __KERNEL__
2690         while (cfs_atomic_read(&blp->blp_num_threads) > 0) {
2691                 struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
2692
2693                 cfs_init_completion(&blp->blp_comp);
2694
2695                 cfs_spin_lock(&blp->blp_lock);
2696                 cfs_list_add_tail(&blwi.blwi_entry, &blp->blp_list);
2697                 cfs_waitq_signal(&blp->blp_waitq);
2698                 cfs_spin_unlock(&blp->blp_lock);
2699
2700                 cfs_wait_for_completion(&blp->blp_comp);
2701         }
2702         OBD_FREE(blp, sizeof(*blp));
2703
2704         ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
2705 # ifdef HAVE_SERVER_SUPPORT
2706         ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
2707 # endif
2708         ldlm_proc_cleanup();
2709
2710         expired_lock_thread.elt_state = ELT_TERMINATE;
2711         cfs_waitq_signal(&expired_lock_thread.elt_waitq);
2712         cfs_wait_event(expired_lock_thread.elt_waitq,
2713                        expired_lock_thread.elt_state == ELT_STOPPED);
2714 #else /* !__KERNEL__ */
2715         ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
2716 # ifdef HAVE_SERVER_SUPPORT
2717         ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
2718 # endif
2719 #endif /* __KERNEL__ */
2720
2721         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
2722         ldlm_state = NULL;
2723
2724         RETURN(0);
2725 }
2726
2727 int ldlm_init(void)
2728 {
2729         cfs_mutex_init(&ldlm_ref_mutex);
2730         cfs_mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
2731         cfs_mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
2732         ldlm_resource_slab = cfs_mem_cache_create("ldlm_resources",
2733                                                sizeof(struct ldlm_resource), 0,
2734                                                CFS_SLAB_HWCACHE_ALIGN);
2735         if (ldlm_resource_slab == NULL)
2736                 return -ENOMEM;
2737
2738         ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
2739                               sizeof(struct ldlm_lock), 0,
2740                               CFS_SLAB_HWCACHE_ALIGN | CFS_SLAB_DESTROY_BY_RCU);
2741         if (ldlm_lock_slab == NULL) {
2742                 cfs_mem_cache_destroy(ldlm_resource_slab);
2743                 return -ENOMEM;
2744         }
2745
2746         ldlm_interval_slab = cfs_mem_cache_create("interval_node",
2747                                         sizeof(struct ldlm_interval),
2748                                         0, CFS_SLAB_HWCACHE_ALIGN);
2749         if (ldlm_interval_slab == NULL) {
2750                 cfs_mem_cache_destroy(ldlm_resource_slab);
2751                 cfs_mem_cache_destroy(ldlm_lock_slab);
2752                 return -ENOMEM;
2753         }
2754 #if LUSTRE_TRACKS_LOCK_EXP_REFS
2755         class_export_dump_hook = ldlm_dump_export_locks;
2756 #endif
2757         return 0;
2758 }
2759
2760 void ldlm_exit(void)
2761 {
2762         int rc;
2763         if (ldlm_refcount)
2764                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
2765         rc = cfs_mem_cache_destroy(ldlm_resource_slab);
2766         LASSERTF(rc == 0, "couldn't free ldlm resource slab\n");
2767 #ifdef __KERNEL__
2768         /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
2769          * synchronize_rcu() to wait a grace period elapsed, so that
2770          * ldlm_lock_free() get a chance to be called. */
2771         synchronize_rcu();
2772 #endif
2773         rc = cfs_mem_cache_destroy(ldlm_lock_slab);
2774         LASSERTF(rc == 0, "couldn't free ldlm lock slab\n");
2775         rc = cfs_mem_cache_destroy(ldlm_interval_slab);
2776         LASSERTF(rc == 0, "couldn't free interval node slab\n");
2777 }
2778
2779 /* ldlm_extent.c */
2780 EXPORT_SYMBOL(ldlm_extent_shift_kms);
2781
2782 /* ldlm_lock.c */
2783 #ifdef HAVE_SERVER_SUPPORT
2784 EXPORT_SYMBOL(ldlm_get_processing_policy);
2785 #endif
2786 EXPORT_SYMBOL(ldlm_lock2desc);
2787 EXPORT_SYMBOL(ldlm_register_intent);
2788 EXPORT_SYMBOL(ldlm_lockname);
2789 EXPORT_SYMBOL(ldlm_typename);
2790 EXPORT_SYMBOL(ldlm_lock2handle);
2791 EXPORT_SYMBOL(__ldlm_handle2lock);
2792 EXPORT_SYMBOL(ldlm_lock_get);
2793 EXPORT_SYMBOL(ldlm_lock_put);
2794 EXPORT_SYMBOL(ldlm_lock_match);
2795 EXPORT_SYMBOL(ldlm_lock_cancel);
2796 EXPORT_SYMBOL(ldlm_lock_addref);
2797 EXPORT_SYMBOL(ldlm_lock_addref_try);
2798 EXPORT_SYMBOL(ldlm_lock_decref);
2799 EXPORT_SYMBOL(ldlm_lock_decref_and_cancel);
2800 EXPORT_SYMBOL(ldlm_lock_change_resource);
2801 EXPORT_SYMBOL(ldlm_it2str);
2802 EXPORT_SYMBOL(ldlm_lock_dump_handle);
2803 EXPORT_SYMBOL(ldlm_reprocess_all_ns);
2804 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
2805 EXPORT_SYMBOL(ldlm_lock_allow_match);
2806 EXPORT_SYMBOL(ldlm_lock_downgrade);
2807 EXPORT_SYMBOL(ldlm_lock_convert);
2808
2809 /* ldlm_request.c */
2810 EXPORT_SYMBOL(ldlm_completion_ast_async);
2811 EXPORT_SYMBOL(ldlm_blocking_ast_nocheck);
2812 EXPORT_SYMBOL(ldlm_completion_ast);
2813 EXPORT_SYMBOL(ldlm_blocking_ast);
2814 EXPORT_SYMBOL(ldlm_glimpse_ast);
2815 EXPORT_SYMBOL(ldlm_expired_completion_wait);
2816 EXPORT_SYMBOL(ldlm_prep_enqueue_req);
2817 EXPORT_SYMBOL(ldlm_prep_elc_req);
2818 EXPORT_SYMBOL(ldlm_cli_convert);
2819 EXPORT_SYMBOL(ldlm_cli_enqueue);
2820 EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
2821 EXPORT_SYMBOL(ldlm_cli_enqueue_local);
2822 EXPORT_SYMBOL(ldlm_cli_cancel);
2823 EXPORT_SYMBOL(ldlm_cli_cancel_unused);
2824 EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource);
2825 EXPORT_SYMBOL(ldlm_cli_cancel_req);
2826 EXPORT_SYMBOL(ldlm_replay_locks);
2827 EXPORT_SYMBOL(ldlm_resource_foreach);
2828 EXPORT_SYMBOL(ldlm_namespace_foreach);
2829 EXPORT_SYMBOL(ldlm_resource_iterate);
2830 EXPORT_SYMBOL(ldlm_cancel_resource_local);
2831 EXPORT_SYMBOL(ldlm_cli_cancel_list_local);
2832 EXPORT_SYMBOL(ldlm_cli_cancel_list);
2833
2834 /* ldlm_lockd.c */
2835 #ifdef HAVE_SERVER_SUPPORT
2836 EXPORT_SYMBOL(ldlm_server_blocking_ast);
2837 EXPORT_SYMBOL(ldlm_server_completion_ast);
2838 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
2839 EXPORT_SYMBOL(ldlm_handle_enqueue);
2840 EXPORT_SYMBOL(ldlm_handle_enqueue0);
2841 EXPORT_SYMBOL(ldlm_handle_cancel);
2842 EXPORT_SYMBOL(ldlm_request_cancel);
2843 EXPORT_SYMBOL(ldlm_handle_convert);
2844 EXPORT_SYMBOL(ldlm_handle_convert0);
2845 EXPORT_SYMBOL(ldlm_revoke_export_locks);
2846 #endif
2847 EXPORT_SYMBOL(ldlm_del_waiting_lock);
2848 EXPORT_SYMBOL(ldlm_get_ref);
2849 EXPORT_SYMBOL(ldlm_put_ref);
2850 EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
2851
2852 /* ldlm_resource.c */
2853 EXPORT_SYMBOL(ldlm_namespace_new);
2854 EXPORT_SYMBOL(ldlm_namespace_cleanup);
2855 EXPORT_SYMBOL(ldlm_namespace_free);
2856 EXPORT_SYMBOL(ldlm_namespace_dump);
2857 EXPORT_SYMBOL(ldlm_dump_all_namespaces);
2858 EXPORT_SYMBOL(ldlm_resource_get);
2859 EXPORT_SYMBOL(ldlm_resource_putref);
2860 EXPORT_SYMBOL(ldlm_resource_unlink_lock);
2861
2862 /* ldlm_lib.c */
2863 EXPORT_SYMBOL(client_import_add_conn);
2864 EXPORT_SYMBOL(client_import_del_conn);
2865 EXPORT_SYMBOL(client_obd_setup);
2866 EXPORT_SYMBOL(client_obd_cleanup);
2867 EXPORT_SYMBOL(client_connect_import);
2868 EXPORT_SYMBOL(client_disconnect_export);
2869 EXPORT_SYMBOL(target_send_reply);
2870 EXPORT_SYMBOL(target_pack_pool_reply);
2871
2872 #ifdef HAVE_SERVER_SUPPORT
2873 EXPORT_SYMBOL(server_disconnect_export);
2874 EXPORT_SYMBOL(target_stop_recovery_thread);
2875 EXPORT_SYMBOL(target_handle_connect);
2876 EXPORT_SYMBOL(target_cleanup_recovery);
2877 EXPORT_SYMBOL(target_destroy_export);
2878 EXPORT_SYMBOL(target_cancel_recovery_timer);
2879 EXPORT_SYMBOL(target_queue_recovery_request);
2880 EXPORT_SYMBOL(target_handle_ping);
2881 EXPORT_SYMBOL(target_handle_disconnect);
2882 #endif
2883
2884 /* l_lock.c */
2885 EXPORT_SYMBOL(lock_res_and_lock);
2886 EXPORT_SYMBOL(unlock_res_and_lock);