Whamcloud - gitweb
LU-9066 ldlm: don't evict client on umount if AST fails
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ldlm/ldlm_lockd.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LDLM
39
40 #include <linux/kthread.h>
41 #include <linux/list.h>
42 #include <libcfs/libcfs.h>
43 #include <lustre/lustre_errno.h>
44 #include <lustre_dlm.h>
45 #include <obd_class.h>
46 #include "ldlm_internal.h"
47
48 static int ldlm_num_threads;
49 module_param(ldlm_num_threads, int, 0444);
50 MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
51
52 static char *ldlm_cpts;
53 module_param(ldlm_cpts, charp, 0444);
54 MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
55
56 static struct mutex     ldlm_ref_mutex;
57 static int ldlm_refcount;
58
59 struct kobject *ldlm_kobj;
60 struct kset *ldlm_ns_kset;
61 struct kset *ldlm_svc_kset;
62
63 /* LDLM state */
64
65 static struct ldlm_state *ldlm_state;
66
67 static inline cfs_time_t round_timeout(cfs_time_t timeout)
68 {
69         return cfs_time_seconds((int)cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
70 }
71
72 /* timeout for initial callback (AST) reply (bz10399) */
73 static inline unsigned int ldlm_get_rq_timeout(void)
74 {
75         /* Non-AT value */
76         unsigned int timeout = min(ldlm_timeout, obd_timeout / 3);
77
78         return timeout < 1 ? 1 : timeout;
79 }
80
81 #define ELT_STOPPED   0
82 #define ELT_READY     1
83 #define ELT_TERMINATE 2
84
85 struct ldlm_bl_pool {
86         spinlock_t              blp_lock;
87
88         /*
89          * blp_prio_list is used for callbacks that should be handled
90          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
91          * see bug 13843
92          */
93         struct list_head              blp_prio_list;
94
95         /*
96          * blp_list is used for all other callbacks which are likely
97          * to take longer to process.
98          */
99         struct list_head              blp_list;
100
101         wait_queue_head_t       blp_waitq;
102         struct completion       blp_comp;
103         atomic_t            blp_num_threads;
104         atomic_t            blp_busy_threads;
105         int                     blp_min_threads;
106         int                     blp_max_threads;
107 };
108
109 struct ldlm_bl_work_item {
110         struct list_head        blwi_entry;
111         struct ldlm_namespace   *blwi_ns;
112         struct ldlm_lock_desc   blwi_ld;
113         struct ldlm_lock        *blwi_lock;
114         struct list_head        blwi_head;
115         int                     blwi_count;
116         struct completion       blwi_comp;
117         enum ldlm_cancel_flags  blwi_flags;
118         int                     blwi_mem_pressure;
119 };
120
121 #ifdef HAVE_SERVER_SUPPORT
122
123 /**
124  * Protects both waiting_locks_list and expired_lock_thread.
125  */
126 static spinlock_t waiting_locks_spinlock;   /* BH lock (timer) */
127
128 /**
129  * List for contended locks.
130  *
131  * As soon as a lock is contended, it gets placed on this list and
132  * expected time to get a response is filled in the lock. A special
133  * thread walks the list looking for locks that should be released and
134  * schedules client evictions for those that have not been released in
135  * time.
136  *
137  * All access to it should be under waiting_locks_spinlock.
138  */
139 static struct list_head waiting_locks_list;
140 static struct timer_list waiting_locks_timer;
141
142 static struct expired_lock_thread {
143         wait_queue_head_t       elt_waitq;
144         int                     elt_state;
145         int                     elt_dump;
146         struct list_head                elt_expired_locks;
147 } expired_lock_thread;
148
149 static inline int have_expired_locks(void)
150 {
151         int need_to_run;
152
153         ENTRY;
154         spin_lock_bh(&waiting_locks_spinlock);
155         need_to_run = !list_empty(&expired_lock_thread.elt_expired_locks);
156         spin_unlock_bh(&waiting_locks_spinlock);
157
158         RETURN(need_to_run);
159 }
160
161 /**
162  * Check expired lock list for expired locks and time them out.
163  */
164 static int expired_lock_main(void *arg)
165 {
166         struct list_head *expired = &expired_lock_thread.elt_expired_locks;
167         struct l_wait_info lwi = { 0 };
168         int do_dump;
169
170         ENTRY;
171
172         expired_lock_thread.elt_state = ELT_READY;
173         wake_up(&expired_lock_thread.elt_waitq);
174
175         while (1) {
176                 l_wait_event(expired_lock_thread.elt_waitq,
177                              have_expired_locks() ||
178                              expired_lock_thread.elt_state == ELT_TERMINATE,
179                              &lwi);
180
181                 spin_lock_bh(&waiting_locks_spinlock);
182                 if (expired_lock_thread.elt_dump) {
183                         spin_unlock_bh(&waiting_locks_spinlock);
184
185                         /* from waiting_locks_callback, but not in timer */
186                         libcfs_debug_dumplog();
187
188                         spin_lock_bh(&waiting_locks_spinlock);
189                         expired_lock_thread.elt_dump = 0;
190                 }
191
192                 do_dump = 0;
193
194                 while (!list_empty(expired)) {
195                         struct obd_export *export;
196                         struct ldlm_lock *lock;
197
198                         lock = list_entry(expired->next, struct ldlm_lock,
199                                           l_pending_chain);
200                         if ((void *)lock < LP_POISON + PAGE_SIZE &&
201                             (void *)lock >= LP_POISON) {
202                                 spin_unlock_bh(&waiting_locks_spinlock);
203                                 CERROR("free lock on elt list %p\n", lock);
204                                 LBUG();
205                         }
206                         list_del_init(&lock->l_pending_chain);
207                         if ((void *)lock->l_export <
208                              LP_POISON + PAGE_SIZE &&
209                             (void *)lock->l_export >= LP_POISON) {
210                                 CERROR("lock with free export on elt list %p\n",
211                                        lock->l_export);
212                                 lock->l_export = NULL;
213                                 LDLM_ERROR(lock, "free export");
214                                 /* release extra ref grabbed by
215                                  * ldlm_add_waiting_lock() or
216                                  * ldlm_failed_ast() */
217                                 LDLM_LOCK_RELEASE(lock);
218                                 continue;
219                         }
220
221                         if (ldlm_is_destroyed(lock)) {
222                                 /* release the lock refcount where
223                                  * waiting_locks_callback() founds */
224                                 LDLM_LOCK_RELEASE(lock);
225                                 continue;
226                         }
227                         export = class_export_lock_get(lock->l_export, lock);
228                         spin_unlock_bh(&waiting_locks_spinlock);
229
230                         spin_lock_bh(&export->exp_bl_list_lock);
231                         list_del_init(&lock->l_exp_list);
232                         spin_unlock_bh(&export->exp_bl_list_lock);
233
234                         do_dump++;
235                         class_fail_export(export);
236                         class_export_lock_put(export, lock);
237
238                         /* release extra ref grabbed by ldlm_add_waiting_lock()
239                          * or ldlm_failed_ast() */
240                         LDLM_LOCK_RELEASE(lock);
241
242                         spin_lock_bh(&waiting_locks_spinlock);
243                 }
244                 spin_unlock_bh(&waiting_locks_spinlock);
245
246                 if (do_dump && obd_dump_on_eviction) {
247                         CERROR("dump the log upon eviction\n");
248                         libcfs_debug_dumplog();
249                 }
250
251                 if (expired_lock_thread.elt_state == ELT_TERMINATE)
252                         break;
253         }
254
255         expired_lock_thread.elt_state = ELT_STOPPED;
256         wake_up(&expired_lock_thread.elt_waitq);
257         RETURN(0);
258 }
259
260 static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
261 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds);
262
263 /**
264  * Check if there is a request in the export request list
265  * which prevents the lock canceling.
266  */
267 static int ldlm_lock_busy(struct ldlm_lock *lock)
268 {
269         struct ptlrpc_request *req;
270         int match = 0;
271         ENTRY;
272
273         if (lock->l_export == NULL)
274                 return 0;
275
276         spin_lock_bh(&lock->l_export->exp_rpc_lock);
277         list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
278                                 rq_exp_list) {
279                 if (req->rq_ops->hpreq_lock_match) {
280                         match = req->rq_ops->hpreq_lock_match(req, lock);
281                         if (match)
282                                 break;
283                 }
284         }
285         spin_unlock_bh(&lock->l_export->exp_rpc_lock);
286         RETURN(match);
287 }
288
289 /* This is called from within a timer interrupt and cannot schedule */
290 static void waiting_locks_callback(unsigned long unused)
291 {
292         struct ldlm_lock        *lock;
293         int                     need_dump = 0;
294
295         spin_lock_bh(&waiting_locks_spinlock);
296         while (!list_empty(&waiting_locks_list)) {
297                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
298                                       l_pending_chain);
299                 if (cfs_time_after(lock->l_callback_timeout,
300                                    cfs_time_current()) ||
301                     (lock->l_req_mode == LCK_GROUP))
302                         break;
303
304                 /* Check if we need to prolong timeout */
305                 if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
306                     ldlm_lock_busy(lock)) {
307                         int cont = 1;
308
309                         if (lock->l_pending_chain.next == &waiting_locks_list)
310                                 cont = 0;
311
312                         LDLM_LOCK_GET(lock);
313
314                         spin_unlock_bh(&waiting_locks_spinlock);
315                         LDLM_DEBUG(lock, "prolong the busy lock");
316                         ldlm_refresh_waiting_lock(lock,
317                                                   ldlm_bl_timeout(lock) >> 1);
318                         spin_lock_bh(&waiting_locks_spinlock);
319
320                         if (!cont) {
321                                 LDLM_LOCK_RELEASE(lock);
322                                 break;
323                         }
324
325                         LDLM_LOCK_RELEASE(lock);
326                         continue;
327                 }
328                 ldlm_lock_to_ns(lock)->ns_timeouts++;
329                 LDLM_ERROR(lock, "lock callback timer expired after %llds: "
330                            "evicting client at %s ",
331                            ktime_get_real_seconds() - lock->l_last_activity,
332                            libcfs_nid2str(
333                                    lock->l_export->exp_connection->c_peer.nid));
334
335                 /* no needs to take an extra ref on the lock since it was in
336                  * the waiting_locks_list and ldlm_add_waiting_lock()
337                  * already grabbed a ref */
338                 list_del(&lock->l_pending_chain);
339                 list_add(&lock->l_pending_chain,
340                              &expired_lock_thread.elt_expired_locks);
341                 need_dump = 1;
342         }
343
344         if (!list_empty(&expired_lock_thread.elt_expired_locks)) {
345                 if (obd_dump_on_timeout && need_dump)
346                         expired_lock_thread.elt_dump = __LINE__;
347
348                 wake_up(&expired_lock_thread.elt_waitq);
349         }
350
351         /*
352          * Make sure the timer will fire again if we have any locks
353          * left.
354          */
355         if (!list_empty(&waiting_locks_list)) {
356                 cfs_time_t timeout_rounded;
357                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
358                                       l_pending_chain);
359                 timeout_rounded = (cfs_time_t)round_timeout(lock->l_callback_timeout);
360                 mod_timer(&waiting_locks_timer, timeout_rounded);
361         }
362         spin_unlock_bh(&waiting_locks_spinlock);
363 }
364
365 /**
366  * Add lock to the list of contended locks.
367  *
368  * Indicate that we're waiting for a client to call us back cancelling a given
369  * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
370  * timer to fire appropriately.  (We round up to the next second, to avoid
371  * floods of timer firings during periods of high lock contention and traffic).
372  * As done by ldlm_add_waiting_lock(), the caller must grab a lock reference
373  * if it has been added to the waiting list (1 is returned).
374  *
375  * Called with the namespace lock held.
376  */
377 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds)
378 {
379         cfs_time_t timeout;
380         cfs_time_t timeout_rounded;
381
382         if (!list_empty(&lock->l_pending_chain))
383                 return 0;
384
385         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
386             OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
387                 seconds = 1;
388
389         timeout = cfs_time_shift(seconds);
390         if (likely(cfs_time_after(timeout, lock->l_callback_timeout)))
391                 lock->l_callback_timeout = timeout;
392
393         timeout_rounded = round_timeout(lock->l_callback_timeout);
394
395         if (cfs_time_before(timeout_rounded, waiting_locks_timer.expires) ||
396             !timer_pending(&waiting_locks_timer)) {
397                 mod_timer(&waiting_locks_timer, timeout_rounded);
398         }
399         /* if the new lock has a shorter timeout than something earlier on
400            the list, we'll wait the longer amount of time; no big deal. */
401         /* FIFO */
402         list_add_tail(&lock->l_pending_chain, &waiting_locks_list);
403         return 1;
404 }
405
406 static void ldlm_add_blocked_lock(struct ldlm_lock *lock)
407 {
408         spin_lock_bh(&lock->l_export->exp_bl_list_lock);
409         if (list_empty(&lock->l_exp_list)) {
410                 if (lock->l_granted_mode != lock->l_req_mode)
411                         list_add_tail(&lock->l_exp_list,
412                                       &lock->l_export->exp_bl_list);
413                 else
414                         list_add(&lock->l_exp_list,
415                                  &lock->l_export->exp_bl_list);
416         }
417         spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
418
419         /* A blocked lock is added. Adjust the position in
420          * the stale list if the export is in the list.
421          * If export is stale and not in the list - it is being
422          * processed and will be placed on the right position
423          * on obd_stale_export_put(). */
424         if (!list_empty(&lock->l_export->exp_stale_list))
425                 obd_stale_export_adjust(lock->l_export);
426 }
427
428 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
429 {
430         int ret;
431         int timeout = ldlm_bl_timeout(lock);
432
433         /* NB: must be called with hold of lock_res_and_lock() */
434         LASSERT(ldlm_is_res_locked(lock));
435         LASSERT(!ldlm_is_cancel_on_block(lock));
436
437         /* Do not put cross-MDT lock in the waiting list, since we
438          * will not evict it due to timeout for now */
439         if (lock->l_export != NULL &&
440             (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS))
441                 return 0;
442
443         spin_lock_bh(&waiting_locks_spinlock);
444         if (ldlm_is_cancel(lock)) {
445                 spin_unlock_bh(&waiting_locks_spinlock);
446                 return 0;
447         }
448
449         if (ldlm_is_destroyed(lock)) {
450                 static cfs_time_t next;
451
452                 spin_unlock_bh(&waiting_locks_spinlock);
453                 LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
454                 if (cfs_time_after(cfs_time_current(), next)) {
455                         next = cfs_time_shift(14400);
456                         libcfs_debug_dumpstack(NULL);
457                 }
458                 return 0;
459         }
460
461         ldlm_set_waited(lock);
462         lock->l_last_activity = ktime_get_real_seconds();
463         ret = __ldlm_add_waiting_lock(lock, timeout);
464         if (ret) {
465                 /* grab ref on the lock if it has been added to the
466                  * waiting list */
467                 LDLM_LOCK_GET(lock);
468         }
469         spin_unlock_bh(&waiting_locks_spinlock);
470
471         if (ret)
472                 ldlm_add_blocked_lock(lock);
473
474         LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
475                    ret == 0 ? "not re-" : "", timeout,
476                    AT_OFF ? "off" : "on");
477         return ret;
478 }
479
480 /**
481  * Remove a lock from the pending list, likely because it had its cancellation
482  * callback arrive without incident.  This adjusts the lock-timeout timer if
483  * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
484  * As done by ldlm_del_waiting_lock(), the caller must release the lock
485  * reference when the lock is removed from any list (1 is returned).
486  *
487  * Called with namespace lock held.
488  */
489 static int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
490 {
491         struct list_head *list_next;
492
493         if (list_empty(&lock->l_pending_chain))
494                 return 0;
495
496         list_next = lock->l_pending_chain.next;
497         if (lock->l_pending_chain.prev == &waiting_locks_list) {
498                 /* Removing the head of the list, adjust timer. */
499                 if (list_next == &waiting_locks_list) {
500                         /* No more, just cancel. */
501                         del_timer(&waiting_locks_timer);
502                 } else {
503                         struct ldlm_lock *next;
504                         next = list_entry(list_next, struct ldlm_lock,
505                                               l_pending_chain);
506                         mod_timer(&waiting_locks_timer,
507                                   round_timeout(next->l_callback_timeout));
508                 }
509         }
510         list_del_init(&lock->l_pending_chain);
511
512         return 1;
513 }
514
515 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
516 {
517         int ret;
518
519         if (lock->l_export == NULL) {
520                 /* We don't have a "waiting locks list" on clients. */
521                 CDEBUG(D_DLMTRACE, "Client lock %p : no-op\n", lock);
522                 return 0;
523         }
524
525         spin_lock_bh(&waiting_locks_spinlock);
526         ret = __ldlm_del_waiting_lock(lock);
527         ldlm_clear_waited(lock);
528         spin_unlock_bh(&waiting_locks_spinlock);
529
530         /* remove the lock out of export blocking list */
531         spin_lock_bh(&lock->l_export->exp_bl_list_lock);
532         list_del_init(&lock->l_exp_list);
533         spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
534
535         if (ret) {
536                 /* release lock ref if it has indeed been removed
537                  * from a list */
538                 LDLM_LOCK_RELEASE(lock);
539         }
540
541         LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
542         return ret;
543 }
544
545 /**
546  * Prolong the contended lock waiting time.
547  *
548  * Called with namespace lock held.
549  */
550 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
551 {
552         if (lock->l_export == NULL) {
553                 /* We don't have a "waiting locks list" on clients. */
554                 LDLM_DEBUG(lock, "client lock: no-op");
555                 return 0;
556         }
557
558         if (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS) {
559                 /* We don't have a "waiting locks list" on OSP. */
560                 LDLM_DEBUG(lock, "MDS-MDS lock: no-op");
561                 return 0;
562         }
563
564         spin_lock_bh(&waiting_locks_spinlock);
565
566         if (list_empty(&lock->l_pending_chain)) {
567                 spin_unlock_bh(&waiting_locks_spinlock);
568                 LDLM_DEBUG(lock, "wasn't waiting");
569                 return 0;
570         }
571
572         /* we remove/add the lock to the waiting list, so no needs to
573          * release/take a lock reference */
574         __ldlm_del_waiting_lock(lock);
575         __ldlm_add_waiting_lock(lock, timeout);
576         spin_unlock_bh(&waiting_locks_spinlock);
577
578         LDLM_DEBUG(lock, "refreshed");
579         return 1;
580 }
581 EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
582
583 #else /* HAVE_SERVER_SUPPORT */
584
585 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
586 {
587         RETURN(0);
588 }
589
590 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
591 {
592         RETURN(0);
593 }
594
595 #endif /* !HAVE_SERVER_SUPPORT */
596
597 #ifdef HAVE_SERVER_SUPPORT
598
599 /**
600  * Calculate the per-export Blocking timeout (covering BL AST, data flush,
601  * lock cancel, and their replies). Used for lock callback timeout and AST
602  * re-send period.
603  *
604  * \param[in] lock        lock which is getting the blocking callback
605  *
606  * \retval            timeout in seconds to wait for the client reply
607  */
608 unsigned int ldlm_bl_timeout(struct ldlm_lock *lock)
609 {
610         unsigned int timeout;
611
612         if (AT_OFF)
613                 return obd_timeout / 2;
614
615         /* Since these are non-updating timeouts, we should be conservative.
616          * Take more than usually, 150%
617          * It would be nice to have some kind of "early reply" mechanism for
618          * lock callbacks too... */
619         timeout = at_get(&lock->l_export->exp_bl_lock_at);
620         return max(timeout + (timeout >> 1), ldlm_enqueue_min);
621 }
622 EXPORT_SYMBOL(ldlm_bl_timeout);
623
624 /**
625  * Perform lock cleanup if AST sending failed.
626  */
627 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
628                             const char *ast_type)
629 {
630         LCONSOLE_ERROR_MSG(0x138, "%s: A client on nid %s was evicted due "
631                            "to a lock %s callback time out: rc %d\n",
632                            lock->l_export->exp_obd->obd_name,
633                            obd_export_nid2str(lock->l_export), ast_type, rc);
634
635         if (obd_dump_on_timeout)
636                 libcfs_debug_dumplog();
637         spin_lock_bh(&waiting_locks_spinlock);
638         if (__ldlm_del_waiting_lock(lock) == 0)
639                 /* the lock was not in any list, grab an extra ref before adding
640                  * the lock to the expired list */
641                 LDLM_LOCK_GET(lock);
642         list_add(&lock->l_pending_chain,
643                      &expired_lock_thread.elt_expired_locks);
644         wake_up(&expired_lock_thread.elt_waitq);
645         spin_unlock_bh(&waiting_locks_spinlock);
646 }
647
648 /**
649  * Perform lock cleanup if AST reply came with error.
650  */
651 static int ldlm_handle_ast_error(struct ldlm_lock *lock,
652                                  struct ptlrpc_request *req, int rc,
653                                  const char *ast_type)
654 {
655         struct lnet_process_id peer = req->rq_import->imp_connection->c_peer;
656
657         if (!req->rq_replied || (rc && rc != -EINVAL)) {
658                 if (lock->l_export && lock->l_export->exp_libclient) {
659                         LDLM_DEBUG(lock,
660                                    "%s AST (req@%p x%llu) to liblustre client (nid %s) timeout, just cancelling lock",
661                                    ast_type, req, req->rq_xid,
662                                    libcfs_nid2str(peer.nid));
663                         ldlm_lock_cancel(lock);
664                         rc = -ERESTART;
665                 } else if (ldlm_is_cancel(lock)) {
666                         LDLM_DEBUG(lock,
667                                    "%s AST (req@%p x%llu) timeout from nid %s, but cancel was received (AST reply lost?)",
668                                    ast_type, req, req->rq_xid,
669                                    libcfs_nid2str(peer.nid));
670                         ldlm_lock_cancel(lock);
671                         rc = -ERESTART;
672                 } else if (rc == -ENODEV || rc == -ESHUTDOWN ||
673                            (rc == -EIO &&
674                             req->rq_import->imp_state == LUSTRE_IMP_CLOSED)) {
675                         /* Upon umount process the AST fails because cannot be
676                          * sent. This shouldn't lead to the client eviction.
677                          * -ENODEV error is returned by ptl_send_rpc() for
678                          *  new request in such import.
679                          * -SHUTDOWN is returned by ptlrpc_import_delay_req()
680                          *  if imp_invalid is set or obd_no_recov.
681                          * Meanwhile there is also check for LUSTRE_IMP_CLOSED
682                          * in ptlrpc_import_delay_req() as well with -EIO code.
683                          * In all such cases errors are ignored.
684                          */
685                         LDLM_DEBUG(lock, "%s AST can't be sent due to a server"
686                                          " %s failure or umount process: rc = %d\n",
687                                          ast_type,
688                                          req->rq_import->imp_obd->obd_name, rc);
689                 } else {
690                         LDLM_ERROR(lock,
691                                    "client (nid %s) %s %s AST (req@%p x%llu status %d rc %d), evict it",
692                                    libcfs_nid2str(peer.nid),
693                                    req->rq_replied ? "returned error from" :
694                                    "failed to reply to",
695                                    ast_type, req, req->rq_xid,
696                                    (req->rq_repmsg != NULL) ?
697                                    lustre_msg_get_status(req->rq_repmsg) : 0,
698                                    rc);
699                         ldlm_failed_ast(lock, rc, ast_type);
700                 }
701                 return rc;
702         }
703
704         if (rc == -EINVAL) {
705                 struct ldlm_resource *res = lock->l_resource;
706
707                 LDLM_DEBUG(lock,
708                            "client (nid %s) returned %d from %s AST (req@%p x%llu) - normal race",
709                            libcfs_nid2str(peer.nid),
710                            req->rq_repmsg ?
711                            lustre_msg_get_status(req->rq_repmsg) : -1,
712                            ast_type, req, req->rq_xid);
713                 if (res) {
714                         /* update lvbo to return proper attributes.
715                          * see bug 23174 */
716                         ldlm_resource_getref(res);
717                         ldlm_res_lvbo_update(res, NULL, 1);
718                         ldlm_resource_putref(res);
719                 }
720                 ldlm_lock_cancel(lock);
721                 rc = -ERESTART;
722         }
723
724         return rc;
725 }
726
727 static int ldlm_cb_interpret(const struct lu_env *env,
728                              struct ptlrpc_request *req, void *data, int rc)
729 {
730         struct ldlm_cb_async_args *ca   = data;
731         struct ldlm_lock          *lock = ca->ca_lock;
732         struct ldlm_cb_set_arg    *arg  = ca->ca_set_arg;
733         ENTRY;
734
735         LASSERT(lock != NULL);
736
737         switch (arg->type) {
738         case LDLM_GL_CALLBACK:
739                 /* Update the LVB from disk if the AST failed
740                  * (this is a legal race)
741                  *
742                  * - Glimpse callback of local lock just returns
743                  *   -ELDLM_NO_LOCK_DATA.
744                  * - Glimpse callback of remote lock might return
745                  *   -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274
746                  */
747                 if (unlikely(arg->gl_interpret_reply)) {
748                         rc = arg->gl_interpret_reply(env, req, data, rc);
749                 } else if (rc == -ELDLM_NO_LOCK_DATA) {
750                         LDLM_DEBUG(lock, "lost race - client has a lock but no "
751                                    "inode");
752                         ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
753                 } else if (rc != 0) {
754                         rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
755                 } else {
756                         rc = ldlm_res_lvbo_update(lock->l_resource, req, 1);
757                 }
758                 break;
759         case LDLM_BL_CALLBACK:
760                 if (rc != 0)
761                         rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
762                 break;
763         case LDLM_CP_CALLBACK:
764                 if (rc != 0)
765                         rc = ldlm_handle_ast_error(lock, req, rc, "completion");
766                 break;
767         default:
768                 LDLM_ERROR(lock, "invalid opcode for lock callback %d",
769                            arg->type);
770                 LBUG();
771         }
772
773         /* release extra reference taken in ldlm_ast_fini() */
774         LDLM_LOCK_RELEASE(lock);
775
776         if (rc == -ERESTART)
777                 atomic_inc(&arg->restart);
778
779         RETURN(0);
780 }
781
782 static void ldlm_update_resend(struct ptlrpc_request *req, void *data)
783 {
784         struct ldlm_cb_async_args *ca   = data;
785         struct ldlm_lock          *lock = ca->ca_lock;
786
787         ldlm_refresh_waiting_lock(lock, ldlm_bl_timeout(lock));
788 }
789
790 static inline int ldlm_ast_fini(struct ptlrpc_request *req,
791                                 struct ldlm_cb_set_arg *arg,
792                                 struct ldlm_lock *lock,
793                                 int instant_cancel)
794 {
795         int rc = 0;
796         ENTRY;
797
798         if (unlikely(instant_cancel)) {
799                 rc = ptl_send_rpc(req, 1);
800                 ptlrpc_req_finished(req);
801                 if (rc == 0)
802                         atomic_inc(&arg->restart);
803         } else {
804                 LDLM_LOCK_GET(lock);
805                 ptlrpc_set_add_req(arg->set, req);
806         }
807
808         RETURN(rc);
809 }
810
811 /**
812  * Check if there are requests in the export request list which prevent
813  * the lock canceling and make these requests high priority ones.
814  */
815 static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
816 {
817         struct ptlrpc_request *req;
818         ENTRY;
819
820         if (lock->l_export == NULL) {
821                 LDLM_DEBUG(lock, "client lock: no-op");
822                 RETURN_EXIT;
823         }
824
825         spin_lock_bh(&lock->l_export->exp_rpc_lock);
826         list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
827                             rq_exp_list) {
828                 /* Do not process requests that were not yet added to there
829                  * incoming queue or were already removed from there for
830                  * processing. We evaluate ptlrpc_nrs_req_can_move() without
831                  * holding svcpt->scp_req_lock, and then redo the check with
832                  * the lock held once we need to obtain a reliable result.
833                  */
834                 if (ptlrpc_nrs_req_can_move(req) &&
835                     req->rq_ops->hpreq_lock_match &&
836                     req->rq_ops->hpreq_lock_match(req, lock))
837                         ptlrpc_nrs_req_hp_move(req);
838         }
839         spin_unlock_bh(&lock->l_export->exp_rpc_lock);
840         EXIT;
841 }
842
843 /**
844  * ->l_blocking_ast() method for server-side locks. This is invoked when newly
845  * enqueued server lock conflicts with given one.
846  *
847  * Sends blocking AST RPC to the client owning that lock; arms timeout timer
848  * to wait for client response.
849  */
850 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
851                              struct ldlm_lock_desc *desc,
852                              void *data, int flag)
853 {
854         struct ldlm_cb_async_args *ca;
855         struct ldlm_cb_set_arg *arg = data;
856         struct ldlm_request    *body;
857         struct ptlrpc_request  *req;
858         int                     instant_cancel = 0;
859         int                     rc = 0;
860         ENTRY;
861
862         if (flag == LDLM_CB_CANCELING)
863                 /* Don't need to do anything here. */
864                 RETURN(0);
865
866         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) {
867                 LDLM_DEBUG(lock, "dropping BL AST");
868                 RETURN(0);
869         }
870
871         LASSERT(lock);
872         LASSERT(data != NULL);
873         if (lock->l_export->exp_obd->obd_recovering != 0)
874                 LDLM_ERROR(lock, "BUG 6063: lock collide during recovery");
875
876         ldlm_lock_reorder_req(lock);
877
878         req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
879                                         &RQF_LDLM_BL_CALLBACK,
880                                         LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK);
881         if (req == NULL)
882                 RETURN(-ENOMEM);
883
884         CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
885         ca = ptlrpc_req_async_args(req);
886         ca->ca_set_arg = arg;
887         ca->ca_lock = lock;
888
889         req->rq_interpret_reply = ldlm_cb_interpret;
890
891         lock_res_and_lock(lock);
892         if (ldlm_is_destroyed(lock)) {
893                 /* What's the point? */
894                 unlock_res_and_lock(lock);
895                 ptlrpc_req_finished(req);
896                 RETURN(0);
897         }
898
899         if (lock->l_granted_mode != lock->l_req_mode) {
900                 /* this blocking AST will be communicated as part of the
901                  * completion AST instead */
902                 ldlm_add_blocked_lock(lock);
903                 ldlm_set_waited(lock);
904                 unlock_res_and_lock(lock);
905
906                 ptlrpc_req_finished(req);
907                 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
908                 RETURN(0);
909         }
910
911         if (ldlm_is_cancel_on_block(lock))
912                 instant_cancel = 1;
913
914         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
915         body->lock_handle[0] = lock->l_remote_handle;
916         body->lock_desc = *desc;
917         body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_FL_AST_MASK);
918
919         LDLM_DEBUG(lock, "server preparing blocking AST");
920
921         ptlrpc_request_set_replen(req);
922         ldlm_set_cbpending(lock);
923         if (instant_cancel) {
924                 unlock_res_and_lock(lock);
925                 ldlm_lock_cancel(lock);
926
927                 req->rq_no_resend = 1;
928         } else {
929                 LASSERT(lock->l_granted_mode == lock->l_req_mode);
930                 ldlm_add_waiting_lock(lock);
931                 unlock_res_and_lock(lock);
932
933                 /* Do not resend after lock callback timeout */
934                 req->rq_delay_limit = ldlm_bl_timeout(lock);
935                 req->rq_resend_cb = ldlm_update_resend;
936         }
937
938         req->rq_send_state = LUSTRE_IMP_FULL;
939         /* ptlrpc_request_alloc_pack already set timeout */
940         if (AT_OFF)
941                 req->rq_timeout = ldlm_get_rq_timeout();
942
943         lock->l_last_activity = ktime_get_real_seconds();
944
945         if (lock->l_export && lock->l_export->exp_nid_stats &&
946             lock->l_export->exp_nid_stats->nid_ldlm_stats)
947                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
948                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
949
950         rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
951
952         RETURN(rc);
953 }
954
955 /**
956  * ->l_completion_ast callback for a remote lock in server namespace.
957  *
958  *  Sends AST to the client notifying it of lock granting.  If initial
959  *  lock response was not sent yet, instead of sending another RPC, just
960  *  mark the lock as granted and client will understand
961  */
962 int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
963 {
964         struct ldlm_cb_set_arg *arg = data;
965         struct ldlm_request    *body;
966         struct ptlrpc_request  *req;
967         struct ldlm_cb_async_args *ca;
968         int                     instant_cancel = 0;
969         int                     rc = 0;
970         int                     lvb_len;
971         ENTRY;
972
973         LASSERT(lock != NULL);
974         LASSERT(data != NULL);
975
976         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_CP_AST)) {
977                 LDLM_DEBUG(lock, "dropping CP AST");
978                 RETURN(0);
979         }
980
981         req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
982                                     &RQF_LDLM_CP_CALLBACK);
983         if (req == NULL)
984                 RETURN(-ENOMEM);
985
986         /* server namespace, doesn't need lock */
987         lvb_len = ldlm_lvbo_size(lock);
988         /* LU-3124 & LU-2187: to not return layout in completion AST because
989          * it may deadlock for LU-2187, or client may not have enough space
990          * for large layout. The layout will be returned to client with an
991          * extra RPC to fetch xattr.lov */
992         if (ldlm_has_layout(lock))
993                 lvb_len = 0;
994
995         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, lvb_len);
996         rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
997         if (rc) {
998                 ptlrpc_request_free(req);
999                 RETURN(rc);
1000         }
1001
1002         CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
1003         ca = ptlrpc_req_async_args(req);
1004         ca->ca_set_arg = arg;
1005         ca->ca_lock = lock;
1006
1007         req->rq_interpret_reply = ldlm_cb_interpret;
1008         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1009
1010         body->lock_handle[0] = lock->l_remote_handle;
1011         body->lock_flags = ldlm_flags_to_wire(flags);
1012         ldlm_lock2desc(lock, &body->lock_desc);
1013         if (lvb_len > 0) {
1014                 void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
1015
1016                 lvb_len = ldlm_lvbo_fill(lock, lvb, lvb_len);
1017                 if (lvb_len < 0) {
1018                         /* We still need to send the RPC to wake up the blocked
1019                          * enqueue thread on the client.
1020                          *
1021                          * Consider old client, there is no better way to notify
1022                          * the failure, just zero-sized the LVB, then the client
1023                          * will fail out as "-EPROTO". */
1024                         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, 0,
1025                                            RCL_CLIENT);
1026                         instant_cancel = 1;
1027                 } else {
1028                         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len,
1029                                            RCL_CLIENT);
1030                 }
1031         }
1032
1033         lock->l_last_activity = ktime_get_real_seconds();
1034
1035         LDLM_DEBUG(lock, "server preparing completion AST");
1036
1037         ptlrpc_request_set_replen(req);
1038
1039         req->rq_send_state = LUSTRE_IMP_FULL;
1040         /* ptlrpc_request_pack already set timeout */
1041         if (AT_OFF)
1042                 req->rq_timeout = ldlm_get_rq_timeout();
1043
1044         /* We only send real blocking ASTs after the lock is granted */
1045         lock_res_and_lock(lock);
1046         if (ldlm_is_ast_sent(lock)) {
1047                 body->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT);
1048                 /* Copy AST flags like LDLM_FL_DISCARD_DATA. */
1049                 body->lock_flags |= ldlm_flags_to_wire(lock->l_flags &
1050                                                        LDLM_FL_AST_MASK);
1051
1052                 /* We might get here prior to ldlm_handle_enqueue setting
1053                  * LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
1054                  * into waiting list, but this is safe and similar code in
1055                  * ldlm_handle_enqueue will call ldlm_lock_cancel() still,
1056                  * that would not only cancel the lock, but will also remove
1057                  * it from waiting list */
1058                 if (ldlm_is_cancel_on_block(lock)) {
1059                         unlock_res_and_lock(lock);
1060                         ldlm_lock_cancel(lock);
1061
1062                         instant_cancel = 1;
1063                         req->rq_no_resend = 1;
1064
1065                         lock_res_and_lock(lock);
1066                 } else {
1067                         /* start the lock-timeout clock */
1068                         ldlm_add_waiting_lock(lock);
1069                         /* Do not resend after lock callback timeout */
1070                         req->rq_delay_limit = ldlm_bl_timeout(lock);
1071                         req->rq_resend_cb = ldlm_update_resend;
1072                 }
1073         }
1074         unlock_res_and_lock(lock);
1075
1076         if (lock->l_export && lock->l_export->exp_nid_stats &&
1077             lock->l_export->exp_nid_stats->nid_ldlm_stats)
1078                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
1079                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
1080
1081         rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
1082
1083         RETURN(lvb_len < 0 ? lvb_len : rc);
1084 }
1085
1086 /**
1087  * Server side ->l_glimpse_ast handler for client locks.
1088  *
1089  * Sends glimpse AST to the client and waits for reply. Then updates
1090  * lvbo with the result.
1091  */
1092 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
1093 {
1094         struct ldlm_cb_set_arg          *arg = data;
1095         struct ldlm_request             *body;
1096         struct ptlrpc_request           *req;
1097         struct ldlm_cb_async_args       *ca;
1098         int                              rc;
1099         struct req_format               *req_fmt;
1100         ENTRY;
1101
1102         LASSERT(lock != NULL);
1103
1104         if (arg->gl_desc != NULL)
1105                 /* There is a glimpse descriptor to pack */
1106                 req_fmt = &RQF_LDLM_GL_DESC_CALLBACK;
1107         else
1108                 req_fmt = &RQF_LDLM_GL_CALLBACK;
1109
1110         req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
1111                                         req_fmt, LUSTRE_DLM_VERSION,
1112                                         LDLM_GL_CALLBACK);
1113
1114         if (req == NULL)
1115                 RETURN(-ENOMEM);
1116
1117         if (arg->gl_desc != NULL) {
1118                 /* copy the GL descriptor */
1119                 union ldlm_gl_desc      *desc;
1120                 desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
1121                 *desc = *arg->gl_desc;
1122         }
1123
1124         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1125         body->lock_handle[0] = lock->l_remote_handle;
1126         ldlm_lock2desc(lock, &body->lock_desc);
1127
1128         CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
1129         ca = ptlrpc_req_async_args(req);
1130         ca->ca_set_arg = arg;
1131         ca->ca_lock = lock;
1132
1133         /* server namespace, doesn't need lock */
1134         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
1135                              ldlm_lvbo_size(lock));
1136         ptlrpc_request_set_replen(req);
1137
1138         req->rq_send_state = LUSTRE_IMP_FULL;
1139         /* ptlrpc_request_alloc_pack already set timeout */
1140         if (AT_OFF)
1141                 req->rq_timeout = ldlm_get_rq_timeout();
1142
1143         lock->l_last_activity = ktime_get_real_seconds();
1144
1145         req->rq_interpret_reply = ldlm_cb_interpret;
1146
1147         if (lock->l_export && lock->l_export->exp_nid_stats &&
1148             lock->l_export->exp_nid_stats->nid_ldlm_stats)
1149                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
1150                                      LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
1151
1152         rc = ldlm_ast_fini(req, arg, lock, 0);
1153
1154         RETURN(rc);
1155 }
1156
1157 int ldlm_glimpse_locks(struct ldlm_resource *res,
1158                        struct list_head *gl_work_list)
1159 {
1160         int     rc;
1161         ENTRY;
1162
1163         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
1164                                LDLM_WORK_GL_AST);
1165         if (rc == -ERESTART)
1166                 ldlm_reprocess_all(res);
1167
1168         RETURN(rc);
1169 }
1170 EXPORT_SYMBOL(ldlm_glimpse_locks);
1171
1172 /* return LDLM lock associated with a lock callback request */
1173 struct ldlm_lock *ldlm_request_lock(struct ptlrpc_request *req)
1174 {
1175         struct ldlm_cb_async_args       *ca;
1176         struct ldlm_lock                *lock;
1177         ENTRY;
1178
1179         ca = ptlrpc_req_async_args(req);
1180         lock = ca->ca_lock;
1181         if (lock == NULL)
1182                 RETURN(ERR_PTR(-EFAULT));
1183
1184         RETURN(lock);
1185 }
1186 EXPORT_SYMBOL(ldlm_request_lock);
1187
1188 static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
1189                        struct lprocfs_stats *srv_stats)
1190 {
1191         int lock_type = 0, op = 0;
1192
1193         lock_type = dlm_req->lock_desc.l_resource.lr_type;
1194
1195         switch (lock_type) {
1196         case LDLM_PLAIN:
1197                 op = PTLRPC_LAST_CNTR + LDLM_PLAIN_ENQUEUE;
1198                 break;
1199         case LDLM_EXTENT:
1200                 if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT)
1201                         op = PTLRPC_LAST_CNTR + LDLM_GLIMPSE_ENQUEUE;
1202                 else
1203                         op = PTLRPC_LAST_CNTR + LDLM_EXTENT_ENQUEUE;
1204                 break;
1205         case LDLM_FLOCK:
1206                 op = PTLRPC_LAST_CNTR + LDLM_FLOCK_ENQUEUE;
1207                 break;
1208         case LDLM_IBITS:
1209                 op = PTLRPC_LAST_CNTR + LDLM_IBITS_ENQUEUE;
1210                 break;
1211         default:
1212                 op = 0;
1213                 break;
1214         }
1215
1216         if (op)
1217                 lprocfs_counter_incr(srv_stats, op);
1218
1219         return;
1220 }
1221
1222 /**
1223  * Main server-side entry point into LDLM for enqueue. This is called by ptlrpc
1224  * service threads to carry out client lock enqueueing requests.
1225  */
1226 int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
1227                          struct ptlrpc_request *req,
1228                          const struct ldlm_request *dlm_req,
1229                          const struct ldlm_callback_suite *cbs)
1230 {
1231         struct ldlm_reply *dlm_rep;
1232         __u64 flags;
1233         enum ldlm_error err = ELDLM_OK;
1234         struct ldlm_lock *lock = NULL;
1235         void *cookie = NULL;
1236         int rc = 0;
1237         struct ldlm_resource *res = NULL;
1238         ENTRY;
1239
1240         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
1241
1242         ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF, LATF_SKIP);
1243         flags = ldlm_flags_from_wire(dlm_req->lock_flags);
1244
1245         LASSERT(req->rq_export);
1246
1247         if (ptlrpc_req2svc(req)->srv_stats != NULL)
1248                 ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats);
1249
1250         if (req->rq_export && req->rq_export->exp_nid_stats &&
1251             req->rq_export->exp_nid_stats->nid_ldlm_stats)
1252                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1253                                      LDLM_ENQUEUE - LDLM_FIRST_OPC);
1254
1255         if (unlikely(dlm_req->lock_desc.l_resource.lr_type < LDLM_MIN_TYPE ||
1256                      dlm_req->lock_desc.l_resource.lr_type >= LDLM_MAX_TYPE)) {
1257                 DEBUG_REQ(D_ERROR, req, "invalid lock request type %d",
1258                           dlm_req->lock_desc.l_resource.lr_type);
1259                 GOTO(out, rc = -EFAULT);
1260         }
1261
1262         if (unlikely(dlm_req->lock_desc.l_req_mode <= LCK_MINMODE ||
1263                      dlm_req->lock_desc.l_req_mode >= LCK_MAXMODE ||
1264                      dlm_req->lock_desc.l_req_mode &
1265                      (dlm_req->lock_desc.l_req_mode-1))) {
1266                 DEBUG_REQ(D_ERROR, req, "invalid lock request mode %d",
1267                           dlm_req->lock_desc.l_req_mode);
1268                 GOTO(out, rc = -EFAULT);
1269         }
1270
1271         if (exp_connect_flags(req->rq_export) & OBD_CONNECT_IBITS) {
1272                 if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
1273                              LDLM_PLAIN)) {
1274                         DEBUG_REQ(D_ERROR, req,
1275                                   "PLAIN lock request from IBITS client?");
1276                         GOTO(out, rc = -EPROTO);
1277                 }
1278         } else if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
1279                             LDLM_IBITS)) {
1280                 DEBUG_REQ(D_ERROR, req,
1281                           "IBITS lock request from unaware client?");
1282                 GOTO(out, rc = -EPROTO);
1283         }
1284
1285         if (unlikely((flags & LDLM_FL_REPLAY) ||
1286                      (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) {
1287                 /* Find an existing lock in the per-export lock hash */
1288                 /* In the function below, .hs_keycmp resolves to
1289                  * ldlm_export_lock_keycmp() */
1290                 /* coverity[overrun-buffer-val] */
1291                 lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
1292                                        (void *)&dlm_req->lock_handle[0]);
1293                 if (lock != NULL) {
1294                         DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie %#llx",
1295                                   lock->l_handle.h_cookie);
1296                         flags |= LDLM_FL_RESENT;
1297                         GOTO(existing_lock, rc = 0);
1298                 }
1299         } else {
1300                 if (ldlm_reclaim_full()) {
1301                         DEBUG_REQ(D_DLMTRACE, req, "Too many granted locks, "
1302                                   "reject current enqueue request and let the "
1303                                   "client retry later.\n");
1304                         GOTO(out, rc = -EINPROGRESS);
1305                 }
1306         }
1307
1308         /* The lock's callback data might be set in the policy function */
1309         lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
1310                                 dlm_req->lock_desc.l_resource.lr_type,
1311                                 dlm_req->lock_desc.l_req_mode,
1312                                 cbs, NULL, 0, LVB_T_NONE);
1313         if (IS_ERR(lock)) {
1314                 rc = PTR_ERR(lock);
1315                 lock = NULL;
1316                 GOTO(out, rc);
1317         }
1318
1319         lock->l_remote_handle = dlm_req->lock_handle[0];
1320         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
1321
1322         /* Initialize resource lvb but not for a lock being replayed since
1323          * Client already got lvb sent in this case.
1324          * This must occur early since some policy methods assume resource
1325          * lvb is available (lr_lvb_data != NULL).
1326          */
1327         res = lock->l_resource;
1328         if (!(flags & LDLM_FL_REPLAY)) {
1329                 /* non-replayed lock, delayed lvb init may need to be done */
1330                 rc = ldlm_lvbo_init(res);
1331                 if (rc < 0) {
1332                         LDLM_DEBUG(lock, "delayed lvb init failed (rc %d)", rc);
1333                         GOTO(out, rc);
1334                 }
1335         }
1336
1337         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
1338         /* Don't enqueue a lock onto the export if it is been disonnected
1339          * due to eviction (bug 3822) or server umount (bug 24324).
1340          * Cancel it now instead. */
1341         if (req->rq_export->exp_disconnected) {
1342                 LDLM_ERROR(lock, "lock on disconnected export %p",
1343                            req->rq_export);
1344                 GOTO(out, rc = -ENOTCONN);
1345         }
1346
1347         lock->l_export = class_export_lock_get(req->rq_export, lock);
1348         if (lock->l_export->exp_lock_hash)
1349                 cfs_hash_add(lock->l_export->exp_lock_hash,
1350                              &lock->l_remote_handle,
1351                              &lock->l_exp_hash);
1352
1353         /* Inherit the enqueue flags before the operation, because we do not
1354          * keep the res lock on return and next operations (BL AST) may proceed
1355          * without them. */
1356         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
1357                                               LDLM_FL_INHERIT_MASK);
1358
1359         ldlm_convert_policy_to_local(req->rq_export,
1360                                      dlm_req->lock_desc.l_resource.lr_type,
1361                                      &dlm_req->lock_desc.l_policy_data,
1362                                      &lock->l_policy_data);
1363         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
1364                 lock->l_req_extent = lock->l_policy_data.l_extent;
1365
1366 existing_lock:
1367
1368         if (flags & LDLM_FL_HAS_INTENT) {
1369                 /* In this case, the reply buffer is allocated deep in
1370                  * local_lock_enqueue by the policy function. */
1371                 cookie = req;
1372         } else {
1373                 /* based on the assumption that lvb size never changes during
1374                  * resource life time otherwise it need resource->lr_lock's
1375                  * protection */
1376                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
1377                                      RCL_SERVER, ldlm_lvbo_size(lock));
1378
1379                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
1380                         GOTO(out, rc = -ENOMEM);
1381
1382                 rc = req_capsule_server_pack(&req->rq_pill);
1383                 if (rc)
1384                         GOTO(out, rc);
1385         }
1386
1387         err = ldlm_lock_enqueue(ns, &lock, cookie, &flags);
1388         if (err) {
1389                 if ((int)err < 0)
1390                         rc = (int)err;
1391                 GOTO(out, err);
1392         }
1393
1394         dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1395
1396         ldlm_lock2desc(lock, &dlm_rep->lock_desc);
1397         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
1398
1399         if (lock && lock->l_resource->lr_type == LDLM_EXTENT)
1400                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
1401
1402         /* We never send a blocking AST until the lock is granted, but
1403          * we can tell it right now */
1404         lock_res_and_lock(lock);
1405
1406         /* Now take into account flags to be inherited from original lock
1407            request both in reply to client and in our own lock flags. */
1408         dlm_rep->lock_flags = ldlm_flags_to_wire(flags);
1409         lock->l_flags |= flags & LDLM_FL_INHERIT_MASK;
1410
1411         /* Don't move a pending lock onto the export if it has already been
1412          * disconnected due to eviction (bug 5683) or server umount (bug 24324).
1413          * Cancel it now instead. */
1414         if (unlikely(req->rq_export->exp_disconnected ||
1415                      OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) {
1416                 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
1417                 rc = -ENOTCONN;
1418         } else if (ldlm_is_ast_sent(lock)) {
1419                 dlm_rep->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT);
1420                 if (lock->l_granted_mode == lock->l_req_mode) {
1421                         /*
1422                          * Only cancel lock if it was granted, because it would
1423                          * be destroyed immediately and would never be granted
1424                          * in the future, causing timeouts on client.  Not
1425                          * granted lock will be cancelled immediately after
1426                          * sending completion AST.
1427                          */
1428                         if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) {
1429                                 unlock_res_and_lock(lock);
1430                                 ldlm_lock_cancel(lock);
1431                                 lock_res_and_lock(lock);
1432                         } else
1433                                 ldlm_add_waiting_lock(lock);
1434                 }
1435         }
1436         /* Make sure we never ever grant usual metadata locks to liblustre
1437            clients */
1438         if ((dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN ||
1439             dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) &&
1440              req->rq_export->exp_libclient) {
1441                 if (unlikely(!ldlm_is_cancel_on_block(lock) ||
1442                              !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){
1443                         CERROR("Granting sync lock to libclient. "
1444                                "req fl %d, rep fl %d, lock fl %#llx\n",
1445                                dlm_req->lock_flags, dlm_rep->lock_flags,
1446                                lock->l_flags);
1447                         LDLM_ERROR(lock, "sync lock");
1448                         if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) {
1449                                 struct ldlm_intent *it;
1450
1451                                 it = req_capsule_client_get(&req->rq_pill,
1452                                                             &RMF_LDLM_INTENT);
1453                                 if (it != NULL) {
1454                                         CERROR("This is intent %s (%llu)\n",
1455                                                ldlm_it2str(it->opc), it->opc);
1456                                 }
1457                         }
1458                 }
1459         }
1460
1461         unlock_res_and_lock(lock);
1462
1463         EXIT;
1464  out:
1465         req->rq_status = rc ?: err; /* return either error - bug 11190 */
1466         if (!req->rq_packed_final) {
1467                 err = lustre_pack_reply(req, 1, NULL, NULL);
1468                 if (rc == 0)
1469                         rc = err;
1470         }
1471
1472         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
1473          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
1474         if (lock != NULL) {
1475                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
1476                            "(err=%d, rc=%d)", err, rc);
1477
1478                 if (rc == 0) {
1479                         if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
1480                                                   RCL_SERVER) &&
1481                             ldlm_lvbo_size(lock) > 0) {
1482                                 void *buf;
1483                                 int buflen;
1484
1485                                 buf = req_capsule_server_get(&req->rq_pill,
1486                                                              &RMF_DLM_LVB);
1487                                 LASSERTF(buf != NULL, "req %p, lock %p\n",
1488                                          req, lock);
1489                                 buflen = req_capsule_get_size(&req->rq_pill,
1490                                                 &RMF_DLM_LVB, RCL_SERVER);
1491                                 /* non-replayed lock, delayed lvb init may
1492                                  * need to be occur now */
1493                                 if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
1494                                         buflen = ldlm_lvbo_fill(lock, buf,
1495                                                                 buflen);
1496                                         if (buflen >= 0)
1497                                                 req_capsule_shrink(
1498                                                         &req->rq_pill,
1499                                                         &RMF_DLM_LVB,
1500                                                         buflen, RCL_SERVER);
1501                                         else
1502                                                 rc = buflen;
1503                                 } else if (flags & LDLM_FL_REPLAY) {
1504                                         /* no LVB resend upon replay */
1505                                         if (buflen > 0)
1506                                                 req_capsule_shrink(
1507                                                         &req->rq_pill,
1508                                                         &RMF_DLM_LVB,
1509                                                         0, RCL_SERVER);
1510                                         else
1511                                                 rc = buflen;
1512                                 } else {
1513                                         rc = buflen;
1514                                 }
1515                         }
1516                 }
1517
1518                 if (rc != 0 && !(flags & LDLM_FL_RESENT)) {
1519                         if (lock->l_export) {
1520                                 ldlm_lock_cancel(lock);
1521                         } else {
1522                                 lock_res_and_lock(lock);
1523                                 ldlm_resource_unlink_lock(lock);
1524                                 ldlm_lock_destroy_nolock(lock);
1525                                 unlock_res_and_lock(lock);
1526
1527                         }
1528                 }
1529
1530                 if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
1531                         ldlm_reprocess_all(lock->l_resource);
1532
1533                 LDLM_LOCK_RELEASE(lock);
1534         }
1535
1536         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
1537                           lock, rc);
1538
1539         return rc;
1540 }
1541
1542 /**
1543  * Old-style LDLM main entry point for server code enqueue.
1544  */
1545 int ldlm_handle_enqueue(struct ptlrpc_request *req,
1546                         ldlm_completion_callback completion_callback,
1547                         ldlm_blocking_callback blocking_callback,
1548                         ldlm_glimpse_callback glimpse_callback)
1549 {
1550         struct ldlm_request *dlm_req;
1551         struct ldlm_callback_suite cbs = {
1552                 .lcs_completion = completion_callback,
1553                 .lcs_blocking   = blocking_callback,
1554                 .lcs_glimpse    = glimpse_callback
1555         };
1556         int rc;
1557
1558         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1559         if (dlm_req != NULL) {
1560                 rc = ldlm_handle_enqueue0(req->rq_export->exp_obd->obd_namespace,
1561                                           req, dlm_req, &cbs);
1562         } else {
1563                 rc = -EFAULT;
1564         }
1565         return rc;
1566 }
1567
1568 /**
1569  * Main LDLM entry point for server code to process lock conversion requests.
1570  */
1571 int ldlm_handle_convert0(struct ptlrpc_request *req,
1572                          const struct ldlm_request *dlm_req)
1573 {
1574         struct ldlm_reply *dlm_rep;
1575         struct ldlm_lock *lock;
1576         int rc;
1577         ENTRY;
1578
1579         if (req->rq_export && req->rq_export->exp_nid_stats &&
1580             req->rq_export->exp_nid_stats->nid_ldlm_stats)
1581                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1582                                      LDLM_CONVERT - LDLM_FIRST_OPC);
1583
1584         rc = req_capsule_server_pack(&req->rq_pill);
1585         if (rc)
1586                 RETURN(rc);
1587
1588         dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1589         dlm_rep->lock_flags = dlm_req->lock_flags;
1590
1591         lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
1592         if (!lock) {
1593                 req->rq_status = LUSTRE_EINVAL;
1594         } else {
1595                 void *res = NULL;
1596
1597                 LDLM_DEBUG(lock, "server-side convert handler START");
1598
1599                 res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
1600                                         &dlm_rep->lock_flags);
1601                 if (res) {
1602                         if (ldlm_del_waiting_lock(lock))
1603                                 LDLM_DEBUG(lock, "converted waiting lock");
1604                         req->rq_status = 0;
1605                 } else {
1606                         req->rq_status = LUSTRE_EDEADLK;
1607                 }
1608         }
1609
1610         if (lock) {
1611                 if (!req->rq_status)
1612                         ldlm_reprocess_all(lock->l_resource);
1613                 LDLM_DEBUG(lock, "server-side convert handler END");
1614                 LDLM_LOCK_PUT(lock);
1615         } else
1616                 LDLM_DEBUG_NOLOCK("server-side convert handler END");
1617
1618         RETURN(0);
1619 }
1620
1621 /**
1622  * Old-style main LDLM entry point for server code to process lock conversion
1623  * requests.
1624  */
1625 int ldlm_handle_convert(struct ptlrpc_request *req)
1626 {
1627         int rc;
1628         struct ldlm_request *dlm_req;
1629
1630         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1631         if (dlm_req != NULL) {
1632                 rc = ldlm_handle_convert0(req, dlm_req);
1633         } else {
1634                 CERROR ("Can't unpack dlm_req\n");
1635                 rc = -EFAULT;
1636         }
1637         return rc;
1638 }
1639
1640 /**
1641  * Cancel all the locks whose handles are packed into ldlm_request
1642  *
1643  * Called by server code expecting such combined cancel activity
1644  * requests.
1645  */
1646 int ldlm_request_cancel(struct ptlrpc_request *req,
1647                         const struct ldlm_request *dlm_req,
1648                         int first, enum lustre_at_flags flags)
1649 {
1650         struct ldlm_resource *res, *pres = NULL;
1651         struct ldlm_lock *lock;
1652         int i, count, done = 0;
1653         ENTRY;
1654
1655         count = dlm_req->lock_count ? dlm_req->lock_count : 1;
1656         if (first >= count)
1657                 RETURN(0);
1658
1659         if (count == 1 && dlm_req->lock_handle[0].cookie == 0)
1660                 RETURN(0);
1661
1662         /* There is no lock on the server at the replay time,
1663          * skip lock cancelling to make replay tests to pass. */
1664         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1665                 RETURN(0);
1666
1667         LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, "
1668                           "starting at %d", count, first);
1669
1670         for (i = first; i < count; i++) {
1671                 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
1672                 if (!lock) {
1673                         LDLM_DEBUG_NOLOCK("server-side cancel handler stale "
1674                                           "lock (cookie %llu)",
1675                                           dlm_req->lock_handle[i].cookie);
1676                         continue;
1677                 }
1678
1679                 res = lock->l_resource;
1680                 done++;
1681
1682                 /* This code is an optimization to only attempt lock
1683                  * granting on the resource (that could be CPU-expensive)
1684                  * after we are done cancelling lock in that resource. */
1685                 if (res != pres) {
1686                         if (pres != NULL) {
1687                                 ldlm_reprocess_all(pres);
1688                                 LDLM_RESOURCE_DELREF(pres);
1689                                 ldlm_resource_putref(pres);
1690                         }
1691                         if (res != NULL) {
1692                                 ldlm_resource_getref(res);
1693                                 LDLM_RESOURCE_ADDREF(res);
1694                                 ldlm_res_lvbo_update(res, NULL, 1);
1695                         }
1696                         pres = res;
1697                 }
1698
1699                 if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock)) {
1700                         time64_t delay = ktime_get_real_seconds() -
1701                                          lock->l_last_activity;
1702                         LDLM_DEBUG(lock, "server cancels blocked lock after %llds",
1703                                    (s64)delay);
1704                         at_measured(&lock->l_export->exp_bl_lock_at, delay);
1705                 }
1706                 ldlm_lock_cancel(lock);
1707                 LDLM_LOCK_PUT(lock);
1708         }
1709         if (pres != NULL) {
1710                 ldlm_reprocess_all(pres);
1711                 LDLM_RESOURCE_DELREF(pres);
1712                 ldlm_resource_putref(pres);
1713         }
1714         LDLM_DEBUG_NOLOCK("server-side cancel handler END");
1715         RETURN(done);
1716 }
1717 EXPORT_SYMBOL(ldlm_request_cancel);
1718
1719 /**
1720  * Main LDLM entry point for server code to cancel locks.
1721  *
1722  * Typically gets called from service handler on LDLM_CANCEL opc.
1723  */
1724 int ldlm_handle_cancel(struct ptlrpc_request *req)
1725 {
1726         struct ldlm_request *dlm_req;
1727         int rc;
1728         ENTRY;
1729
1730         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1731         if (dlm_req == NULL) {
1732                 CDEBUG(D_INFO, "bad request buffer for cancel\n");
1733                 RETURN(-EFAULT);
1734         }
1735
1736         if (req->rq_export && req->rq_export->exp_nid_stats &&
1737             req->rq_export->exp_nid_stats->nid_ldlm_stats)
1738                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1739                                      LDLM_CANCEL - LDLM_FIRST_OPC);
1740
1741         rc = req_capsule_server_pack(&req->rq_pill);
1742         if (rc)
1743                 RETURN(rc);
1744
1745         if (!ldlm_request_cancel(req, dlm_req, 0, LATF_STATS))
1746                 req->rq_status = LUSTRE_ESTALE;
1747
1748         RETURN(ptlrpc_reply(req));
1749 }
1750 #endif /* HAVE_SERVER_SUPPORT */
1751
1752 /**
1753  * Callback handler for receiving incoming blocking ASTs.
1754  *
1755  * This can only happen on client side.
1756  */
1757 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
1758                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
1759 {
1760         int do_ast;
1761         ENTRY;
1762
1763         LDLM_DEBUG(lock, "client blocking AST callback handler");
1764
1765         lock_res_and_lock(lock);
1766         ldlm_set_cbpending(lock);
1767
1768         if (ldlm_is_cancel_on_block(lock))
1769                 ldlm_set_cancel(lock);
1770
1771         do_ast = (!lock->l_readers && !lock->l_writers);
1772         unlock_res_and_lock(lock);
1773
1774         if (do_ast) {
1775                 CDEBUG(D_DLMTRACE, "Lock %p already unused, calling callback (%p)\n",
1776                        lock, lock->l_blocking_ast);
1777                 if (lock->l_blocking_ast != NULL)
1778                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
1779                                              LDLM_CB_BLOCKING);
1780         } else {
1781                 CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
1782                        lock);
1783         }
1784
1785         LDLM_DEBUG(lock, "client blocking callback handler END");
1786         LDLM_LOCK_RELEASE(lock);
1787         EXIT;
1788 }
1789
1790 /**
1791  * Callback handler for receiving incoming completion ASTs.
1792  *
1793  * This only can happen on client side.
1794  */
1795 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
1796                                     struct ldlm_namespace *ns,
1797                                     struct ldlm_request *dlm_req,
1798                                     struct ldlm_lock *lock)
1799 {
1800         struct list_head ast_list;
1801         int lvb_len;
1802         int rc = 0;
1803         ENTRY;
1804
1805         LDLM_DEBUG(lock, "client completion callback handler START");
1806
1807         INIT_LIST_HEAD(&ast_list);
1808         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
1809                 int to = cfs_time_seconds(1);
1810                 while (to > 0) {
1811                         set_current_state(TASK_INTERRUPTIBLE);
1812                         schedule_timeout(to);
1813                         if (lock->l_granted_mode == lock->l_req_mode ||
1814                             ldlm_is_destroyed(lock))
1815                                 break;
1816                 }
1817         }
1818
1819         lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
1820         if (lvb_len < 0) {
1821                 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
1822                 GOTO(out, rc = lvb_len);
1823         } else if (lvb_len > 0) {
1824                 if (lock->l_lvb_len > 0) {
1825                         /* for extent lock, lvb contains ost_lvb{}. */
1826                         LASSERT(lock->l_lvb_data != NULL);
1827
1828                         if (unlikely(lock->l_lvb_len < lvb_len)) {
1829                                 LDLM_ERROR(lock, "Replied LVB is larger than "
1830                                            "expectation, expected = %d, "
1831                                            "replied = %d",
1832                                            lock->l_lvb_len, lvb_len);
1833                                 GOTO(out, rc = -EINVAL);
1834                         }
1835                 }
1836         }
1837
1838         lock_res_and_lock(lock);
1839         if (ldlm_is_destroyed(lock) ||
1840             lock->l_granted_mode == lock->l_req_mode) {
1841                 /* bug 11300: the lock has already been granted */
1842                 unlock_res_and_lock(lock);
1843                 LDLM_DEBUG(lock, "Double grant race happened");
1844                 GOTO(out, rc = 0);
1845         }
1846
1847         /* If we receive the completion AST before the actual enqueue returned,
1848          * then we might need to switch lock modes, resources, or extents. */
1849         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
1850                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
1851                 LDLM_DEBUG(lock, "completion AST, new lock mode");
1852         }
1853
1854         if (lock->l_resource->lr_type != LDLM_PLAIN) {
1855                 ldlm_convert_policy_to_local(req->rq_export,
1856                                           dlm_req->lock_desc.l_resource.lr_type,
1857                                           &dlm_req->lock_desc.l_policy_data,
1858                                           &lock->l_policy_data);
1859                 LDLM_DEBUG(lock, "completion AST, new policy data");
1860         }
1861
1862         ldlm_resource_unlink_lock(lock);
1863         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
1864                    &lock->l_resource->lr_name,
1865                    sizeof(lock->l_resource->lr_name)) != 0) {
1866                 unlock_res_and_lock(lock);
1867                 rc = ldlm_lock_change_resource(ns, lock,
1868                                 &dlm_req->lock_desc.l_resource.lr_name);
1869                 if (rc < 0) {
1870                         LDLM_ERROR(lock, "Failed to allocate resource");
1871                         GOTO(out, rc);
1872                 }
1873                 LDLM_DEBUG(lock, "completion AST, new resource");
1874                 CERROR("change resource!\n");
1875                 lock_res_and_lock(lock);
1876         }
1877
1878         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
1879                 /* BL_AST locks are not needed in LRU.
1880                  * Let ldlm_cancel_lru() be fast. */
1881                 ldlm_lock_remove_from_lru(lock);
1882                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
1883                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
1884         }
1885
1886         if (lock->l_lvb_len > 0) {
1887                 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
1888                                    lock->l_lvb_data, lvb_len);
1889                 if (rc < 0) {
1890                         unlock_res_and_lock(lock);
1891                         GOTO(out, rc);
1892                 }
1893         }
1894
1895         ldlm_grant_lock(lock, &ast_list);
1896         unlock_res_and_lock(lock);
1897
1898         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
1899
1900         /* Let Enqueue to call osc_lock_upcall() and initialize
1901          * l_ast_data */
1902         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
1903
1904         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
1905
1906         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
1907                           lock);
1908         GOTO(out, rc);
1909
1910 out:
1911         if (rc < 0) {
1912                 lock_res_and_lock(lock);
1913                 ldlm_set_failed(lock);
1914                 unlock_res_and_lock(lock);
1915                 wake_up(&lock->l_waitq);
1916         }
1917         LDLM_LOCK_RELEASE(lock);
1918 }
1919
1920 /**
1921  * Callback handler for receiving incoming glimpse ASTs.
1922  *
1923  * This only can happen on client side.  After handling the glimpse AST
1924  * we also consider dropping the lock here if it is unused locally for a
1925  * long time.
1926  */
1927 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
1928                                     struct ldlm_namespace *ns,
1929                                     struct ldlm_request *dlm_req,
1930                                     struct ldlm_lock *lock)
1931 {
1932         int rc = -ENOSYS;
1933         ENTRY;
1934
1935         LDLM_DEBUG(lock, "client glimpse AST callback handler");
1936
1937         if (lock->l_glimpse_ast != NULL)
1938                 rc = lock->l_glimpse_ast(lock, req);
1939
1940         if (req->rq_repmsg != NULL) {
1941                 ptlrpc_reply(req);
1942         } else {
1943                 req->rq_status = rc;
1944                 ptlrpc_error(req);
1945         }
1946
1947         lock_res_and_lock(lock);
1948         if (lock->l_granted_mode == LCK_PW &&
1949             !lock->l_readers && !lock->l_writers &&
1950             cfs_time_after(cfs_time_current(),
1951                            cfs_time_add(lock->l_last_used,
1952                                         cfs_time_seconds(10)))) {
1953                 unlock_res_and_lock(lock);
1954                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
1955                         ldlm_handle_bl_callback(ns, NULL, lock);
1956
1957                 EXIT;
1958                 return;
1959         }
1960         unlock_res_and_lock(lock);
1961         LDLM_LOCK_RELEASE(lock);
1962         EXIT;
1963 }
1964
1965 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
1966 {
1967         if (req->rq_no_reply)
1968                 return 0;
1969
1970         req->rq_status = rc;
1971         if (!req->rq_packed_final) {
1972                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1973                 if (rc)
1974                         return rc;
1975         }
1976         return ptlrpc_reply(req);
1977 }
1978
1979 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
1980                                enum ldlm_cancel_flags cancel_flags)
1981 {
1982         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1983         ENTRY;
1984
1985         spin_lock(&blp->blp_lock);
1986         if (blwi->blwi_lock &&
1987             ldlm_is_discard_data(blwi->blwi_lock)) {
1988                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
1989                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
1990         } else {
1991                 /* other blocking callbacks are added to the regular list */
1992                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
1993         }
1994         spin_unlock(&blp->blp_lock);
1995
1996         wake_up(&blp->blp_waitq);
1997
1998         /* can not check blwi->blwi_flags as blwi could be already freed in
1999            LCF_ASYNC mode */
2000         if (!(cancel_flags & LCF_ASYNC))
2001                 wait_for_completion(&blwi->blwi_comp);
2002
2003         RETURN(0);
2004 }
2005
2006 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
2007                              struct ldlm_namespace *ns,
2008                              struct ldlm_lock_desc *ld,
2009                              struct list_head *cancels, int count,
2010                              struct ldlm_lock *lock,
2011                              enum ldlm_cancel_flags cancel_flags)
2012 {
2013         init_completion(&blwi->blwi_comp);
2014         INIT_LIST_HEAD(&blwi->blwi_head);
2015
2016         if (memory_pressure_get())
2017                 blwi->blwi_mem_pressure = 1;
2018
2019         blwi->blwi_ns = ns;
2020         blwi->blwi_flags = cancel_flags;
2021         if (ld != NULL)
2022                 blwi->blwi_ld = *ld;
2023         if (count) {
2024                 list_add(&blwi->blwi_head, cancels);
2025                 list_del_init(cancels);
2026                 blwi->blwi_count = count;
2027         } else {
2028                 blwi->blwi_lock = lock;
2029         }
2030 }
2031
2032 /**
2033  * Queues a list of locks \a cancels containing \a count locks
2034  * for later processing by a blocking thread.  If \a count is zero,
2035  * then the lock referenced as \a lock is queued instead.
2036  *
2037  * The blocking thread would then call ->l_blocking_ast callback in the lock.
2038  * If list addition fails an error is returned and caller is supposed to
2039  * call ->l_blocking_ast itself.
2040  */
2041 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
2042                              struct ldlm_lock_desc *ld,
2043                              struct ldlm_lock *lock,
2044                              struct list_head *cancels, int count,
2045                              enum ldlm_cancel_flags cancel_flags)
2046 {
2047         ENTRY;
2048
2049         if (cancels && count == 0)
2050                 RETURN(0);
2051
2052         if (cancel_flags & LCF_ASYNC) {
2053                 struct ldlm_bl_work_item *blwi;
2054
2055                 OBD_ALLOC(blwi, sizeof(*blwi));
2056                 if (blwi == NULL)
2057                         RETURN(-ENOMEM);
2058                 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
2059
2060                 RETURN(__ldlm_bl_to_thread(blwi, cancel_flags));
2061         } else {
2062                 /* if it is synchronous call do minimum mem alloc, as it could
2063                  * be triggered from kernel shrinker
2064                  */
2065                 struct ldlm_bl_work_item blwi;
2066
2067                 memset(&blwi, 0, sizeof(blwi));
2068                 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
2069                 RETURN(__ldlm_bl_to_thread(&blwi, cancel_flags));
2070         }
2071 }
2072
2073
2074 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
2075                            struct ldlm_lock *lock)
2076 {
2077         return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
2078 }
2079
2080 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
2081                            struct list_head *cancels, int count,
2082                            enum ldlm_cancel_flags cancel_flags)
2083 {
2084         return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
2085 }
2086
2087 int ldlm_bl_thread_wakeup(void)
2088 {
2089         wake_up(&ldlm_state->ldlm_bl_pool->blp_waitq);
2090         return 0;
2091 }
2092
2093 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
2094 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
2095 {
2096         struct obd_device *obd = req->rq_export->exp_obd;
2097         char *key;
2098         void *val;
2099         int keylen, vallen;
2100         int rc = -ENOSYS;
2101         ENTRY;
2102
2103         DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
2104
2105         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
2106
2107         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2108         if (key == NULL) {
2109                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
2110                 RETURN(-EFAULT);
2111         }
2112         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
2113                                       RCL_CLIENT);
2114         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
2115         if (val == NULL) {
2116                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
2117                 RETURN(-EFAULT);
2118         }
2119         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
2120                                       RCL_CLIENT);
2121
2122         /* We are responsible for swabbing contents of val */
2123
2124         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
2125                 /* Pass it on to mdc (the "export" in this case) */
2126                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
2127                                         req->rq_export,
2128                                         sizeof(KEY_HSM_COPYTOOL_SEND),
2129                                         KEY_HSM_COPYTOOL_SEND,
2130                                         vallen, val, NULL);
2131         else
2132                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
2133
2134         return rc;
2135 }
2136
2137 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
2138                                         const char *msg, int rc,
2139                                         const struct lustre_handle *handle)
2140 {
2141         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
2142                   "%s: [nid %s] [rc %d] [lock %#llx]",
2143                   msg, libcfs_id2str(req->rq_peer), rc,
2144                   handle ? handle->cookie : 0);
2145         if (req->rq_no_reply)
2146                 CWARN("No reply was sent, maybe cause bug 21636.\n");
2147         else if (rc)
2148                 CWARN("Send reply failed, maybe cause bug 21636.\n");
2149 }
2150
2151 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
2152 static int ldlm_callback_handler(struct ptlrpc_request *req)
2153 {
2154         struct ldlm_namespace *ns;
2155         struct ldlm_request *dlm_req;
2156         struct ldlm_lock *lock;
2157         int rc;
2158         ENTRY;
2159
2160         /* Requests arrive in sender's byte order.  The ptlrpc service
2161          * handler has already checked and, if necessary, byte-swapped the
2162          * incoming request message body, but I am responsible for the
2163          * message buffers. */
2164
2165         /* do nothing for sec context finalize */
2166         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
2167                 RETURN(0);
2168
2169         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2170
2171         if (req->rq_export == NULL) {
2172                 rc = ldlm_callback_reply(req, -ENOTCONN);
2173                 ldlm_callback_errmsg(req, "Operate on unconnected server",
2174                                      rc, NULL);
2175                 RETURN(0);
2176         }
2177
2178         LASSERT(req->rq_export != NULL);
2179         LASSERT(req->rq_export->exp_obd != NULL);
2180
2181         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2182         case LDLM_BL_CALLBACK:
2183                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) {
2184                         if (cfs_fail_err)
2185                                 ldlm_callback_reply(req, -(int)cfs_fail_err);
2186                         RETURN(0);
2187                 }
2188                 break;
2189         case LDLM_CP_CALLBACK:
2190                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
2191                         RETURN(0);
2192                 break;
2193         case LDLM_GL_CALLBACK:
2194                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
2195                         RETURN(0);
2196                 break;
2197         case LDLM_SET_INFO:
2198                 rc = ldlm_handle_setinfo(req);
2199                 ldlm_callback_reply(req, rc);
2200                 RETURN(0);
2201         case LLOG_ORIGIN_HANDLE_CREATE:
2202                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
2203                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
2204                         RETURN(0);
2205                 rc = llog_origin_handle_open(req);
2206                 ldlm_callback_reply(req, rc);
2207                 RETURN(0);
2208         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
2209                 req_capsule_set(&req->rq_pill,
2210                                 &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
2211                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
2212                         RETURN(0);
2213                 rc = llog_origin_handle_next_block(req);
2214                 ldlm_callback_reply(req, rc);
2215                 RETURN(0);
2216         case LLOG_ORIGIN_HANDLE_READ_HEADER:
2217                 req_capsule_set(&req->rq_pill,
2218                                 &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
2219                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
2220                         RETURN(0);
2221                 rc = llog_origin_handle_read_header(req);
2222                 ldlm_callback_reply(req, rc);
2223                 RETURN(0);
2224         case LLOG_ORIGIN_HANDLE_CLOSE:
2225                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
2226                         RETURN(0);
2227                 rc = llog_origin_handle_close(req);
2228                 ldlm_callback_reply(req, rc);
2229                 RETURN(0);
2230         default:
2231                 CERROR("unknown opcode %u\n",
2232                        lustre_msg_get_opc(req->rq_reqmsg));
2233                 ldlm_callback_reply(req, -EPROTO);
2234                 RETURN(0);
2235         }
2236
2237         ns = req->rq_export->exp_obd->obd_namespace;
2238         LASSERT(ns != NULL);
2239
2240         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
2241
2242         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2243         if (dlm_req == NULL) {
2244                 rc = ldlm_callback_reply(req, -EPROTO);
2245                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
2246                                      NULL);
2247                 RETURN(0);
2248         }
2249
2250         /* Force a known safe race, send a cancel to the server for a lock
2251          * which the server has already started a blocking callback on. */
2252         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
2253             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
2254                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
2255                 if (rc < 0)
2256                         CERROR("ldlm_cli_cancel: %d\n", rc);
2257         }
2258
2259         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
2260         if (!lock) {
2261                 CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock "
2262                        "disappeared\n", dlm_req->lock_handle[0].cookie);
2263                 rc = ldlm_callback_reply(req, -EINVAL);
2264                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
2265                                      &dlm_req->lock_handle[0]);
2266                 RETURN(0);
2267         }
2268
2269         if (ldlm_is_fail_loc(lock) &&
2270             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
2271                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
2272
2273         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
2274         lock_res_and_lock(lock);
2275         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
2276                                               LDLM_FL_AST_MASK);
2277         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
2278                 /* If somebody cancels lock and cache is already dropped,
2279                  * or lock is failed before cp_ast received on client,
2280                  * we can tell the server we have no lock. Otherwise, we
2281                  * should send cancel after dropping the cache. */
2282                 if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
2283                      ldlm_is_failed(lock)) {
2284                         LDLM_DEBUG(lock, "callback on lock %llx - lock disappeared",
2285                                    dlm_req->lock_handle[0].cookie);
2286                         unlock_res_and_lock(lock);
2287                         LDLM_LOCK_RELEASE(lock);
2288                         rc = ldlm_callback_reply(req, -EINVAL);
2289                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
2290                                              &dlm_req->lock_handle[0]);
2291                         RETURN(0);
2292                 }
2293                 /* BL_AST locks are not needed in LRU.
2294                  * Let ldlm_cancel_lru() be fast. */
2295                 ldlm_lock_remove_from_lru(lock);
2296                 ldlm_set_bl_ast(lock);
2297         }
2298         unlock_res_and_lock(lock);
2299
2300         /* We want the ost thread to get this reply so that it can respond
2301          * to ost requests (write cache writeback) that might be triggered
2302          * in the callback.
2303          *
2304          * But we'd also like to be able to indicate in the reply that we're
2305          * cancelling right now, because it's unused, or have an intent result
2306          * in the reply, so we might have to push the responsibility for sending
2307          * the reply down into the AST handlers, alas. */
2308
2309         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2310         case LDLM_BL_CALLBACK:
2311                 CDEBUG(D_INODE, "blocking ast\n");
2312                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
2313                 if (!ldlm_is_cancel_on_block(lock)) {
2314                         rc = ldlm_callback_reply(req, 0);
2315                         if (req->rq_no_reply || rc)
2316                                 ldlm_callback_errmsg(req, "Normal process", rc,
2317                                                      &dlm_req->lock_handle[0]);
2318                 }
2319                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
2320                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
2321                 break;
2322         case LDLM_CP_CALLBACK:
2323                 CDEBUG(D_INODE, "completion ast\n");
2324                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
2325                 ldlm_callback_reply(req, 0);
2326                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
2327                 break;
2328         case LDLM_GL_CALLBACK:
2329                 CDEBUG(D_INODE, "glimpse ast\n");
2330                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
2331                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
2332                 break;
2333         default:
2334                 LBUG();                         /* checked above */
2335         }
2336
2337         RETURN(0);
2338 }
2339
2340 #ifdef HAVE_SERVER_SUPPORT
2341 /**
2342  * Main handler for canceld thread.
2343  *
2344  * Separated into its own thread to avoid deadlocks.
2345  */
2346 static int ldlm_cancel_handler(struct ptlrpc_request *req)
2347 {
2348         int rc;
2349         ENTRY;
2350
2351         /* Requests arrive in sender's byte order.  The ptlrpc service
2352          * handler has already checked and, if necessary, byte-swapped the
2353          * incoming request message body, but I am responsible for the
2354          * message buffers. */
2355
2356         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2357
2358         if (req->rq_export == NULL) {
2359                 struct ldlm_request *dlm_req;
2360
2361                 CERROR("%s from %s arrived at %lu with bad export cookie "
2362                        "%llu\n",
2363                        ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
2364                        libcfs_nid2str(req->rq_peer.nid),
2365                        req->rq_arrival_time.tv_sec,
2366                        lustre_msg_get_handle(req->rq_reqmsg)->cookie);
2367
2368                 if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
2369                         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
2370                         dlm_req = req_capsule_client_get(&req->rq_pill,
2371                                                          &RMF_DLM_REQ);
2372                         if (dlm_req != NULL)
2373                                 ldlm_lock_dump_handle(D_ERROR,
2374                                                       &dlm_req->lock_handle[0]);
2375                 }
2376                 ldlm_callback_reply(req, -ENOTCONN);
2377                 RETURN(0);
2378         }
2379
2380         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2381
2382         /* XXX FIXME move this back to mds/handler.c, bug 249 */
2383         case LDLM_CANCEL:
2384                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2385                 CDEBUG(D_INODE, "cancel\n");
2386                 if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) ||
2387                     CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND) ||
2388                     CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_EVICT))
2389                         RETURN(0);
2390                 rc = ldlm_handle_cancel(req);
2391                 if (rc)
2392                         break;
2393                 RETURN(0);
2394         default:
2395                 CERROR("invalid opcode %d\n",
2396                        lustre_msg_get_opc(req->rq_reqmsg));
2397                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
2398                 ldlm_callback_reply(req, -EINVAL);
2399         }
2400
2401         RETURN(0);
2402 }
2403
2404 static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req,
2405                                         struct ldlm_lock *lock)
2406 {
2407         struct ldlm_request *dlm_req;
2408         struct lustre_handle lockh;
2409         int rc = 0;
2410         int i;
2411         ENTRY;
2412
2413         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2414         if (dlm_req == NULL)
2415                 RETURN(0);
2416
2417         ldlm_lock2handle(lock, &lockh);
2418         for (i = 0; i < dlm_req->lock_count; i++) {
2419                 if (lustre_handle_equal(&dlm_req->lock_handle[i],
2420                                         &lockh)) {
2421                         DEBUG_REQ(D_RPCTRACE, req,
2422                                   "Prio raised by lock %#llx.", lockh.cookie);
2423
2424                         rc = 1;
2425                         break;
2426                 }
2427         }
2428
2429         RETURN(rc);
2430
2431 }
2432
2433 static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req)
2434 {
2435         struct ldlm_request *dlm_req;
2436         int rc = 0;
2437         int i;
2438         ENTRY;
2439
2440         /* no prolong in recovery */
2441         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
2442                 RETURN(0);
2443
2444         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2445         if (dlm_req == NULL)
2446                 RETURN(-EFAULT);
2447
2448         for (i = 0; i < dlm_req->lock_count; i++) {
2449                 struct ldlm_lock *lock;
2450
2451                 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
2452                 if (lock == NULL)
2453                         continue;
2454
2455                 rc = ldlm_is_ast_sent(lock) ? 1 : 0;
2456                 if (rc)
2457                         LDLM_DEBUG(lock, "hpreq cancel lock");
2458                 LDLM_LOCK_PUT(lock);
2459
2460                 if (rc)
2461                         break;
2462         }
2463
2464         RETURN(rc);
2465 }
2466
2467 static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = {
2468         .hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
2469         .hpreq_check      = ldlm_cancel_hpreq_check,
2470         .hpreq_fini       = NULL,
2471 };
2472
2473 static int ldlm_hpreq_handler(struct ptlrpc_request *req)
2474 {
2475         ENTRY;
2476
2477         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2478
2479         if (req->rq_export == NULL)
2480                 RETURN(0);
2481
2482         if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) {
2483                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2484                 req->rq_ops = &ldlm_cancel_hpreq_ops;
2485         }
2486         RETURN(0);
2487 }
2488
2489 static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2490                                struct hlist_node *hnode, void *data)
2491
2492 {
2493         struct list_head         *rpc_list = data;
2494         struct ldlm_lock   *lock = cfs_hash_object(hs, hnode);
2495
2496         lock_res_and_lock(lock);
2497
2498         if (lock->l_req_mode != lock->l_granted_mode) {
2499                 unlock_res_and_lock(lock);
2500                 return 0;
2501         }
2502
2503         LASSERT(lock->l_resource);
2504         if (lock->l_resource->lr_type != LDLM_IBITS &&
2505             lock->l_resource->lr_type != LDLM_PLAIN) {
2506                 unlock_res_and_lock(lock);
2507                 return 0;
2508         }
2509
2510         if (ldlm_is_ast_sent(lock)) {
2511                 unlock_res_and_lock(lock);
2512                 return 0;
2513         }
2514
2515         LASSERT(lock->l_blocking_ast);
2516         LASSERT(!lock->l_blocking_lock);
2517
2518         ldlm_set_ast_sent(lock);
2519         if (lock->l_export && lock->l_export->exp_lock_hash) {
2520                 /* NB: it's safe to call cfs_hash_del() even lock isn't
2521                  * in exp_lock_hash. */
2522                 /* In the function below, .hs_keycmp resolves to
2523                  * ldlm_export_lock_keycmp() */
2524                 /* coverity[overrun-buffer-val] */
2525                 cfs_hash_del(lock->l_export->exp_lock_hash,
2526                              &lock->l_remote_handle, &lock->l_exp_hash);
2527         }
2528
2529         list_add_tail(&lock->l_rk_ast, rpc_list);
2530         LDLM_LOCK_GET(lock);
2531
2532         unlock_res_and_lock(lock);
2533         return 0;
2534 }
2535
2536 void ldlm_revoke_export_locks(struct obd_export *exp)
2537 {
2538         struct list_head  rpc_list;
2539         ENTRY;
2540
2541         INIT_LIST_HEAD(&rpc_list);
2542         cfs_hash_for_each_nolock(exp->exp_lock_hash,
2543                                  ldlm_revoke_lock_cb, &rpc_list, 0);
2544         ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
2545                           LDLM_WORK_REVOKE_AST);
2546
2547         EXIT;
2548 }
2549 EXPORT_SYMBOL(ldlm_revoke_export_locks);
2550 #endif /* HAVE_SERVER_SUPPORT */
2551
2552 static int ldlm_bl_get_work(struct ldlm_bl_pool *blp,
2553                             struct ldlm_bl_work_item **p_blwi,
2554                             struct obd_export **p_exp)
2555 {
2556         struct ldlm_bl_work_item *blwi = NULL;
2557         static unsigned int num_bl = 0;
2558         static unsigned int num_stale;
2559         int num_th = atomic_read(&blp->blp_num_threads);
2560
2561         *p_exp = obd_stale_export_get();
2562
2563         spin_lock(&blp->blp_lock);
2564         if (*p_exp != NULL) {
2565                 if (num_th == 1 || ++num_stale < num_th) {
2566                         spin_unlock(&blp->blp_lock);
2567                         return 1;
2568                 } else {
2569                         num_stale = 0;
2570                 }
2571         }
2572
2573         /* process a request from the blp_list at least every blp_num_threads */
2574         if (!list_empty(&blp->blp_list) &&
2575             (list_empty(&blp->blp_prio_list) || num_bl == 0))
2576                 blwi = list_entry(blp->blp_list.next,
2577                                   struct ldlm_bl_work_item, blwi_entry);
2578         else
2579                 if (!list_empty(&blp->blp_prio_list))
2580                         blwi = list_entry(blp->blp_prio_list.next,
2581                                           struct ldlm_bl_work_item,
2582                                           blwi_entry);
2583
2584         if (blwi) {
2585                 if (++num_bl >= num_th)
2586                         num_bl = 0;
2587                 list_del(&blwi->blwi_entry);
2588         }
2589         spin_unlock(&blp->blp_lock);
2590         *p_blwi = blwi;
2591
2592         if (*p_exp != NULL && *p_blwi != NULL) {
2593                 obd_stale_export_put(*p_exp);
2594                 *p_exp = NULL;
2595         }
2596
2597         return (*p_blwi != NULL || *p_exp != NULL) ? 1 : 0;
2598 }
2599
2600 /* This only contains temporary data until the thread starts */
2601 struct ldlm_bl_thread_data {
2602         struct ldlm_bl_pool     *bltd_blp;
2603         struct completion       bltd_comp;
2604         int                     bltd_num;
2605 };
2606
2607 static int ldlm_bl_thread_main(void *arg);
2608
2609 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp, bool check_busy)
2610 {
2611         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
2612         struct task_struct *task;
2613
2614         init_completion(&bltd.bltd_comp);
2615
2616         bltd.bltd_num = atomic_inc_return(&blp->blp_num_threads);
2617         if (bltd.bltd_num >= blp->blp_max_threads) {
2618                 atomic_dec(&blp->blp_num_threads);
2619                 return 0;
2620         }
2621
2622         LASSERTF(bltd.bltd_num > 0, "thread num:%d\n", bltd.bltd_num);
2623         if (check_busy &&
2624             atomic_read(&blp->blp_busy_threads) < (bltd.bltd_num - 1)) {
2625                 atomic_dec(&blp->blp_num_threads);
2626                 return 0;
2627         }
2628
2629         task = kthread_run(ldlm_bl_thread_main, &bltd, "ldlm_bl_%02d",
2630                            bltd.bltd_num);
2631         if (IS_ERR(task)) {
2632                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
2633                        bltd.bltd_num, PTR_ERR(task));
2634                 atomic_dec(&blp->blp_num_threads);
2635                 return PTR_ERR(task);
2636         }
2637         wait_for_completion(&bltd.bltd_comp);
2638
2639         return 0;
2640 }
2641
2642 /* Not fatal if racy and have a few too many threads */
2643 static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp,
2644                                       struct ldlm_bl_work_item *blwi)
2645 {
2646         if (atomic_read(&blp->blp_num_threads) >= blp->blp_max_threads)
2647                 return 0;
2648
2649         if (atomic_read(&blp->blp_busy_threads) <
2650             atomic_read(&blp->blp_num_threads))
2651                 return 0;
2652
2653         if (blwi != NULL && (blwi->blwi_ns == NULL ||
2654                              blwi->blwi_mem_pressure))
2655                 return 0;
2656
2657         return 1;
2658 }
2659
2660 static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
2661                                struct ldlm_bl_work_item *blwi)
2662 {
2663         ENTRY;
2664
2665         if (blwi->blwi_ns == NULL)
2666                 /* added by ldlm_cleanup() */
2667                 RETURN(LDLM_ITER_STOP);
2668
2669         if (blwi->blwi_mem_pressure)
2670                 memory_pressure_set();
2671
2672         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
2673
2674         if (blwi->blwi_count) {
2675                 int count;
2676                 /* The special case when we cancel locks in lru
2677                  * asynchronously, we pass the list of locks here.
2678                  * Thus locks are marked LDLM_FL_CANCELING, but NOT
2679                  * canceled locally yet. */
2680                 count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
2681                                                    blwi->blwi_count,
2682                                                    LCF_BL_AST);
2683                 ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
2684                                      blwi->blwi_flags);
2685         } else {
2686                 ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
2687                                         blwi->blwi_lock);
2688         }
2689         if (blwi->blwi_mem_pressure)
2690                 memory_pressure_clr();
2691
2692         if (blwi->blwi_flags & LCF_ASYNC)
2693                 OBD_FREE(blwi, sizeof(*blwi));
2694         else
2695                 complete(&blwi->blwi_comp);
2696
2697         RETURN(0);
2698 }
2699
2700 /**
2701  * Cancel stale locks on export. Cancel blocked locks first.
2702  * If the given export has blocked locks, the next in the list may have
2703  * them too, thus cancel not blocked locks only if the current export has
2704  * no blocked locks.
2705  **/
2706 static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp,
2707                                   struct obd_export *exp)
2708 {
2709         int num;
2710         ENTRY;
2711
2712         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4);
2713
2714         num = ldlm_export_cancel_blocked_locks(exp);
2715         if (num == 0)
2716                 ldlm_export_cancel_locks(exp);
2717
2718         obd_stale_export_put(exp);
2719
2720         RETURN(0);
2721 }
2722
2723
2724 /**
2725  * Main blocking requests processing thread.
2726  *
2727  * Callers put locks into its queue by calling ldlm_bl_to_thread.
2728  * This thread in the end ends up doing actual call to ->l_blocking_ast
2729  * for queued locks.
2730  */
2731 static int ldlm_bl_thread_main(void *arg)
2732 {
2733         struct ldlm_bl_pool *blp;
2734         struct ldlm_bl_thread_data *bltd = arg;
2735         ENTRY;
2736
2737         blp = bltd->bltd_blp;
2738
2739         complete(&bltd->bltd_comp);
2740         /* cannot use bltd after this, it is only on caller's stack */
2741
2742         while (1) {
2743                 struct l_wait_info lwi = { 0 };
2744                 struct ldlm_bl_work_item *blwi = NULL;
2745                 struct obd_export *exp = NULL;
2746                 int rc;
2747
2748                 rc = ldlm_bl_get_work(blp, &blwi, &exp);
2749
2750                 if (rc == 0)
2751                         l_wait_event_exclusive(blp->blp_waitq,
2752                                                ldlm_bl_get_work(blp, &blwi,
2753                                                                 &exp),
2754                                                &lwi);
2755                 atomic_inc(&blp->blp_busy_threads);
2756
2757                 if (ldlm_bl_thread_need_create(blp, blwi))
2758                         /* discard the return value, we tried */
2759                         ldlm_bl_thread_start(blp, true);
2760
2761                 if (exp)
2762                         rc = ldlm_bl_thread_exports(blp, exp);
2763                 else if (blwi)
2764                         rc = ldlm_bl_thread_blwi(blp, blwi);
2765
2766                 atomic_dec(&blp->blp_busy_threads);
2767
2768                 if (rc == LDLM_ITER_STOP)
2769                         break;
2770         }
2771
2772         atomic_dec(&blp->blp_num_threads);
2773         complete(&blp->blp_comp);
2774         RETURN(0);
2775 }
2776
2777
2778 static int ldlm_setup(void);
2779 static int ldlm_cleanup(void);
2780
2781 int ldlm_get_ref(void)
2782 {
2783         int rc = 0;
2784         ENTRY;
2785         mutex_lock(&ldlm_ref_mutex);
2786         if (++ldlm_refcount == 1) {
2787                 rc = ldlm_setup();
2788                 if (rc)
2789                         ldlm_refcount--;
2790         }
2791         mutex_unlock(&ldlm_ref_mutex);
2792
2793         RETURN(rc);
2794 }
2795
2796 void ldlm_put_ref(void)
2797 {
2798         ENTRY;
2799         mutex_lock(&ldlm_ref_mutex);
2800         if (ldlm_refcount == 1) {
2801                 int rc = ldlm_cleanup();
2802                 if (rc)
2803                         CERROR("ldlm_cleanup failed: %d\n", rc);
2804                 else
2805                         ldlm_refcount--;
2806         } else {
2807                 ldlm_refcount--;
2808         }
2809         mutex_unlock(&ldlm_ref_mutex);
2810
2811         EXIT;
2812 }
2813
2814 /*
2815  * Export handle<->lock hash operations.
2816  */
2817 static unsigned
2818 ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
2819 {
2820         return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
2821 }
2822
2823 static void *
2824 ldlm_export_lock_key(struct hlist_node *hnode)
2825 {
2826         struct ldlm_lock *lock;
2827
2828         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2829         return &lock->l_remote_handle;
2830 }
2831
2832 static void
2833 ldlm_export_lock_keycpy(struct hlist_node *hnode, void *key)
2834 {
2835         struct ldlm_lock     *lock;
2836
2837         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2838         lock->l_remote_handle = *(struct lustre_handle *)key;
2839 }
2840
2841 static int
2842 ldlm_export_lock_keycmp(const void *key, struct hlist_node *hnode)
2843 {
2844         return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
2845 }
2846
2847 static void *
2848 ldlm_export_lock_object(struct hlist_node *hnode)
2849 {
2850         return hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2851 }
2852
2853 static void
2854 ldlm_export_lock_get(struct cfs_hash *hs, struct hlist_node *hnode)
2855 {
2856         struct ldlm_lock *lock;
2857
2858         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2859         LDLM_LOCK_GET(lock);
2860 }
2861
2862 static void
2863 ldlm_export_lock_put(struct cfs_hash *hs, struct hlist_node *hnode)
2864 {
2865         struct ldlm_lock *lock;
2866
2867         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2868         LDLM_LOCK_RELEASE(lock);
2869 }
2870
2871 static struct cfs_hash_ops ldlm_export_lock_ops = {
2872         .hs_hash        = ldlm_export_lock_hash,
2873         .hs_key         = ldlm_export_lock_key,
2874         .hs_keycmp      = ldlm_export_lock_keycmp,
2875         .hs_keycpy      = ldlm_export_lock_keycpy,
2876         .hs_object      = ldlm_export_lock_object,
2877         .hs_get         = ldlm_export_lock_get,
2878         .hs_put         = ldlm_export_lock_put,
2879         .hs_put_locked  = ldlm_export_lock_put,
2880 };
2881
2882 int ldlm_init_export(struct obd_export *exp)
2883 {
2884         int rc;
2885         ENTRY;
2886
2887         exp->exp_lock_hash =
2888                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
2889                                 HASH_EXP_LOCK_CUR_BITS,
2890                                 HASH_EXP_LOCK_MAX_BITS,
2891                                 HASH_EXP_LOCK_BKT_BITS, 0,
2892                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
2893                                 &ldlm_export_lock_ops,
2894                                 CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY |
2895                                 CFS_HASH_NBLK_CHANGE);
2896
2897         if (!exp->exp_lock_hash)
2898                 RETURN(-ENOMEM);
2899
2900         rc = ldlm_init_flock_export(exp);
2901         if (rc)
2902                 GOTO(err, rc);
2903
2904         RETURN(0);
2905 err:
2906         ldlm_destroy_export(exp);
2907         RETURN(rc);
2908 }
2909 EXPORT_SYMBOL(ldlm_init_export);
2910
2911 void ldlm_destroy_export(struct obd_export *exp)
2912 {
2913         ENTRY;
2914         cfs_hash_putref(exp->exp_lock_hash);
2915         exp->exp_lock_hash = NULL;
2916
2917         ldlm_destroy_flock_export(exp);
2918         EXIT;
2919 }
2920 EXPORT_SYMBOL(ldlm_destroy_export);
2921
2922 static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj,
2923                                                       struct attribute *attr,
2924                                                       char *buf)
2925 {
2926         return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay);
2927 }
2928
2929 static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj,
2930                                                        struct attribute *attr,
2931                                                        const char *buffer,
2932                                                        size_t count)
2933 {
2934         int rc;
2935         unsigned long val;
2936
2937         rc = kstrtoul(buffer, 10, &val);
2938         if (rc)
2939                 return rc;
2940
2941         ldlm_cancel_unused_locks_before_replay = val;
2942
2943         return count;
2944 }
2945 LUSTRE_RW_ATTR(cancel_unused_locks_before_replay);
2946
2947 static struct attribute *ldlm_attrs[] = {
2948         &lustre_attr_cancel_unused_locks_before_replay.attr,
2949         NULL,
2950 };
2951
2952 static struct attribute_group ldlm_attr_group = {
2953         .attrs = ldlm_attrs,
2954 };
2955
2956 static int ldlm_setup(void)
2957 {
2958         static struct ptlrpc_service_conf       conf;
2959         struct ldlm_bl_pool                    *blp = NULL;
2960 #ifdef HAVE_SERVER_SUPPORT
2961         struct task_struct *task;
2962 #endif /* HAVE_SERVER_SUPPORT */
2963         int i;
2964         int rc = 0;
2965
2966         ENTRY;
2967
2968         if (ldlm_state != NULL)
2969                 RETURN(-EALREADY);
2970
2971         OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
2972         if (ldlm_state == NULL)
2973                 RETURN(-ENOMEM);
2974
2975         ldlm_kobj = kobject_create_and_add("ldlm", lustre_kobj);
2976         if (!ldlm_kobj)
2977                 GOTO(out, -ENOMEM);
2978
2979         rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group);
2980         if (rc)
2981                 GOTO(out, rc);
2982
2983         ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj);
2984         if (!ldlm_ns_kset)
2985                 GOTO(out, -ENOMEM);
2986
2987         ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj);
2988         if (!ldlm_svc_kset)
2989                 GOTO(out, -ENOMEM);
2990
2991 #ifdef CONFIG_PROC_FS
2992         rc = ldlm_proc_setup();
2993         if (rc != 0)
2994                 GOTO(out, rc);
2995 #endif /* CONFIG_PROC_FS */
2996
2997         memset(&conf, 0, sizeof(conf));
2998         conf = (typeof(conf)) {
2999                 .psc_name               = "ldlm_cbd",
3000                 .psc_watchdog_factor    = 2,
3001                 .psc_buf                = {
3002                         .bc_nbufs               = LDLM_CLIENT_NBUFS,
3003                         .bc_buf_size            = LDLM_BUFSIZE,
3004                         .bc_req_max_size        = LDLM_MAXREQSIZE,
3005                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
3006                         .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
3007                         .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
3008                 },
3009                 .psc_thr                = {
3010                         .tc_thr_name            = "ldlm_cb",
3011                         .tc_thr_factor          = LDLM_THR_FACTOR,
3012                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
3013                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
3014                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
3015                         .tc_nthrs_user          = ldlm_num_threads,
3016                         .tc_cpu_affinity        = 1,
3017                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
3018                 },
3019                 .psc_cpt                = {
3020                         .cc_pattern             = ldlm_cpts,
3021                 },
3022                 .psc_ops                = {
3023                         .so_req_handler         = ldlm_callback_handler,
3024                 },
3025         };
3026         ldlm_state->ldlm_cb_service = \
3027                         ptlrpc_register_service(&conf, ldlm_svc_proc_dir);
3028         if (IS_ERR(ldlm_state->ldlm_cb_service)) {
3029                 CERROR("failed to start service\n");
3030                 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
3031                 ldlm_state->ldlm_cb_service = NULL;
3032                 GOTO(out, rc);
3033         }
3034
3035 #ifdef HAVE_SERVER_SUPPORT
3036         memset(&conf, 0, sizeof(conf));
3037         conf = (typeof(conf)) {
3038                 .psc_name               = "ldlm_canceld",
3039                 .psc_watchdog_factor    = 6,
3040                 .psc_buf                = {
3041                         .bc_nbufs               = LDLM_SERVER_NBUFS,
3042                         .bc_buf_size            = LDLM_BUFSIZE,
3043                         .bc_req_max_size        = LDLM_MAXREQSIZE,
3044                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
3045                         .bc_req_portal          = LDLM_CANCEL_REQUEST_PORTAL,
3046                         .bc_rep_portal          = LDLM_CANCEL_REPLY_PORTAL,
3047
3048                 },
3049                 .psc_thr                = {
3050                         .tc_thr_name            = "ldlm_cn",
3051                         .tc_thr_factor          = LDLM_THR_FACTOR,
3052                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
3053                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
3054                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
3055                         .tc_nthrs_user          = ldlm_num_threads,
3056                         .tc_cpu_affinity        = 1,
3057                         .tc_ctx_tags            = LCT_MD_THREAD | \
3058                                                   LCT_DT_THREAD | \
3059                                                   LCT_CL_THREAD,
3060                 },
3061                 .psc_cpt                = {
3062                         .cc_pattern             = ldlm_cpts,
3063                 },
3064                 .psc_ops                = {
3065                         .so_req_handler         = ldlm_cancel_handler,
3066                         .so_hpreq_handler       = ldlm_hpreq_handler,
3067                 },
3068         };
3069         ldlm_state->ldlm_cancel_service = \
3070                         ptlrpc_register_service(&conf, ldlm_svc_proc_dir);
3071         if (IS_ERR(ldlm_state->ldlm_cancel_service)) {
3072                 CERROR("failed to start service\n");
3073                 rc = PTR_ERR(ldlm_state->ldlm_cancel_service);
3074                 ldlm_state->ldlm_cancel_service = NULL;
3075                 GOTO(out, rc);
3076         }
3077 #endif /* HAVE_SERVER_SUPPORT */
3078
3079         OBD_ALLOC(blp, sizeof(*blp));
3080         if (blp == NULL)
3081                 GOTO(out, rc = -ENOMEM);
3082         ldlm_state->ldlm_bl_pool = blp;
3083
3084         spin_lock_init(&blp->blp_lock);
3085         INIT_LIST_HEAD(&blp->blp_list);
3086         INIT_LIST_HEAD(&blp->blp_prio_list);
3087         init_waitqueue_head(&blp->blp_waitq);
3088         atomic_set(&blp->blp_num_threads, 0);
3089         atomic_set(&blp->blp_busy_threads, 0);
3090
3091         if (ldlm_num_threads == 0) {
3092                 blp->blp_min_threads = LDLM_NTHRS_INIT;
3093                 blp->blp_max_threads = LDLM_NTHRS_MAX;
3094         } else {
3095                 blp->blp_min_threads = blp->blp_max_threads = \
3096                         min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
3097                                                          ldlm_num_threads));
3098         }
3099
3100         for (i = 0; i < blp->blp_min_threads; i++) {
3101                 rc = ldlm_bl_thread_start(blp, false);
3102                 if (rc < 0)
3103                         GOTO(out, rc);
3104         }
3105
3106 #ifdef HAVE_SERVER_SUPPORT
3107         INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
3108         expired_lock_thread.elt_state = ELT_STOPPED;
3109         init_waitqueue_head(&expired_lock_thread.elt_waitq);
3110
3111         INIT_LIST_HEAD(&waiting_locks_list);
3112         spin_lock_init(&waiting_locks_spinlock);
3113         setup_timer(&waiting_locks_timer, waiting_locks_callback, 0);
3114
3115         task = kthread_run(expired_lock_main, NULL, "ldlm_elt");
3116         if (IS_ERR(task)) {
3117                 rc = PTR_ERR(task);
3118                 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
3119                 GOTO(out, rc);
3120         }
3121
3122         wait_event(expired_lock_thread.elt_waitq,
3123                        expired_lock_thread.elt_state == ELT_READY);
3124 #endif /* HAVE_SERVER_SUPPORT */
3125
3126         rc = ldlm_pools_init();
3127         if (rc) {
3128                 CERROR("Failed to initialize LDLM pools: %d\n", rc);
3129                 GOTO(out, rc);
3130         }
3131
3132         rc = ldlm_reclaim_setup();
3133         if (rc) {
3134                 CERROR("Failed to setup reclaim thread: rc = %d\n", rc);
3135                 GOTO(out, rc);
3136         }
3137         RETURN(0);
3138
3139  out:
3140         ldlm_cleanup();
3141         RETURN(rc);
3142 }
3143
3144 static int ldlm_cleanup(void)
3145 {
3146         ENTRY;
3147
3148         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
3149             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
3150                 CERROR("ldlm still has namespaces; clean these up first.\n");
3151                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
3152                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
3153                 RETURN(-EBUSY);
3154         }
3155
3156         ldlm_reclaim_cleanup();
3157         ldlm_pools_fini();
3158
3159         if (ldlm_state->ldlm_bl_pool != NULL) {
3160                 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
3161
3162                 while (atomic_read(&blp->blp_num_threads) > 0) {
3163                         struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
3164
3165                         init_completion(&blp->blp_comp);
3166
3167                         spin_lock(&blp->blp_lock);
3168                         list_add_tail(&blwi.blwi_entry, &blp->blp_list);
3169                         wake_up(&blp->blp_waitq);
3170                         spin_unlock(&blp->blp_lock);
3171
3172                         wait_for_completion(&blp->blp_comp);
3173                 }
3174
3175                 OBD_FREE(blp, sizeof(*blp));
3176         }
3177
3178         if (ldlm_state->ldlm_cb_service != NULL)
3179                 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
3180 #ifdef HAVE_SERVER_SUPPORT
3181         if (ldlm_state->ldlm_cancel_service != NULL)
3182                 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
3183 #endif
3184
3185         if (ldlm_ns_kset)
3186                 kset_unregister(ldlm_ns_kset);
3187         if (ldlm_svc_kset)
3188                 kset_unregister(ldlm_svc_kset);
3189         if (ldlm_kobj)
3190                 kobject_put(ldlm_kobj);
3191
3192         ldlm_proc_cleanup();
3193
3194 #ifdef HAVE_SERVER_SUPPORT
3195         if (expired_lock_thread.elt_state != ELT_STOPPED) {
3196                 expired_lock_thread.elt_state = ELT_TERMINATE;
3197                 wake_up(&expired_lock_thread.elt_waitq);