Whamcloud - gitweb
6edef77ea264841983485e0b2ef12759109e71f0
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ldlm/ldlm_lockd.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LDLM
39
40 #include <linux/kthread.h>
41 #include <linux/list.h>
42 #include <libcfs/libcfs.h>
43 #include <lustre_errno.h>
44 #include <lustre_dlm.h>
45 #include <obd_class.h>
46 #include "ldlm_internal.h"
47
48 static int ldlm_num_threads;
49 module_param(ldlm_num_threads, int, 0444);
50 MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
51
52 static char *ldlm_cpts;
53 module_param(ldlm_cpts, charp, 0444);
54 MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
55
56 static DEFINE_MUTEX(ldlm_ref_mutex);
57 static int ldlm_refcount;
58
59 struct kobject *ldlm_kobj;
60 struct kset *ldlm_ns_kset;
61 struct kset *ldlm_svc_kset;
62
63 /* LDLM state */
64
65 static struct ldlm_state *ldlm_state;
66
67 /* timeout for initial callback (AST) reply (bz10399)
68  * Due to having to send a 32 bit time value over the
69  * wire return it as time_t instead of time64_t
70  */
71 static inline time_t ldlm_get_rq_timeout(void)
72 {
73         /* Non-AT value */
74         time_t timeout = min(ldlm_timeout, obd_timeout / 3);
75
76         return timeout < 1 ? 1 : timeout;
77 }
78
79 struct ldlm_bl_pool {
80         spinlock_t              blp_lock;
81
82         /*
83          * blp_prio_list is used for callbacks that should be handled
84          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
85          * see bug 13843
86          */
87         struct list_head              blp_prio_list;
88
89         /*
90          * blp_list is used for all other callbacks which are likely
91          * to take longer to process.
92          */
93         struct list_head              blp_list;
94
95         wait_queue_head_t       blp_waitq;
96         struct completion       blp_comp;
97         atomic_t            blp_num_threads;
98         atomic_t            blp_busy_threads;
99         int                     blp_min_threads;
100         int                     blp_max_threads;
101 };
102
103 struct ldlm_bl_work_item {
104         struct list_head        blwi_entry;
105         struct ldlm_namespace   *blwi_ns;
106         struct ldlm_lock_desc   blwi_ld;
107         struct ldlm_lock        *blwi_lock;
108         struct list_head        blwi_head;
109         int                     blwi_count;
110         struct completion       blwi_comp;
111         enum ldlm_cancel_flags  blwi_flags;
112         int                     blwi_mem_pressure;
113 };
114
115 #ifdef HAVE_SERVER_SUPPORT
116
117 /**
118  * Protects both waiting_locks_list and expired_lock_thread.
119  */
120 static DEFINE_SPINLOCK(waiting_locks_spinlock); /* BH lock (timer) */
121
122 /**
123  * List for contended locks.
124  *
125  * As soon as a lock is contended, it gets placed on this list and
126  * expected time to get a response is filled in the lock. A special
127  * thread walks the list looking for locks that should be released and
128  * schedules client evictions for those that have not been released in
129  * time.
130  *
131  * All access to it should be under waiting_locks_spinlock.
132  */
133 static LIST_HEAD(waiting_locks_list);
134 static void waiting_locks_callback(TIMER_DATA_TYPE unused);
135 static CFS_DEFINE_TIMER(waiting_locks_timer, waiting_locks_callback, 0, 0);
136
137 enum elt_state {
138         ELT_STOPPED,
139         ELT_READY,
140         ELT_TERMINATE,
141 };
142
143 static DECLARE_WAIT_QUEUE_HEAD(expired_lock_wait_queue);
144 static enum elt_state expired_lock_thread_state = ELT_STOPPED;
145 static int expired_lock_dump;
146 static LIST_HEAD(expired_lock_list);
147
148 static int ldlm_lock_busy(struct ldlm_lock *lock);
149 static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout);
150 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds);
151
152 static inline int have_expired_locks(void)
153 {
154         int need_to_run;
155
156         ENTRY;
157         spin_lock_bh(&waiting_locks_spinlock);
158         need_to_run = !list_empty(&expired_lock_list);
159         spin_unlock_bh(&waiting_locks_spinlock);
160
161         RETURN(need_to_run);
162 }
163
164 /**
165  * Check expired lock list for expired locks and time them out.
166  */
167 static int expired_lock_main(void *arg)
168 {
169         struct list_head *expired = &expired_lock_list;
170         struct l_wait_info lwi = { 0 };
171         int do_dump;
172
173         ENTRY;
174
175         expired_lock_thread_state = ELT_READY;
176         wake_up(&expired_lock_wait_queue);
177
178         while (1) {
179                 l_wait_event(expired_lock_wait_queue,
180                              have_expired_locks() ||
181                              expired_lock_thread_state == ELT_TERMINATE,
182                              &lwi);
183
184                 spin_lock_bh(&waiting_locks_spinlock);
185                 if (expired_lock_dump) {
186                         spin_unlock_bh(&waiting_locks_spinlock);
187
188                         /* from waiting_locks_callback, but not in timer */
189                         libcfs_debug_dumplog();
190
191                         spin_lock_bh(&waiting_locks_spinlock);
192                         expired_lock_dump = 0;
193                 }
194
195                 do_dump = 0;
196
197                 while (!list_empty(expired)) {
198                         struct obd_export *export;
199                         struct ldlm_lock *lock;
200
201                         lock = list_entry(expired->next, struct ldlm_lock,
202                                           l_pending_chain);
203                         if ((void *)lock < LP_POISON + PAGE_SIZE &&
204                             (void *)lock >= LP_POISON) {
205                                 spin_unlock_bh(&waiting_locks_spinlock);
206                                 CERROR("free lock on elt list %p\n", lock);
207                                 LBUG();
208                         }
209                         list_del_init(&lock->l_pending_chain);
210                         if ((void *)lock->l_export <
211                              LP_POISON + PAGE_SIZE &&
212                             (void *)lock->l_export >= LP_POISON) {
213                                 CERROR("lock with free export on elt list %p\n",
214                                        lock->l_export);
215                                 lock->l_export = NULL;
216                                 LDLM_ERROR(lock, "free export");
217                                 /* release extra ref grabbed by
218                                  * ldlm_add_waiting_lock() or
219                                  * ldlm_failed_ast() */
220                                 LDLM_LOCK_RELEASE(lock);
221                                 continue;
222                         }
223
224                         if (ldlm_is_destroyed(lock)) {
225                                 /* release the lock refcount where
226                                  * waiting_locks_callback() founds */
227                                 LDLM_LOCK_RELEASE(lock);
228                                 continue;
229                         }
230                         export = class_export_lock_get(lock->l_export, lock);
231                         spin_unlock_bh(&waiting_locks_spinlock);
232
233                         /* Check if we need to prolong timeout */
234                         if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
235                             lock->l_callback_timeout != 0 && /* not AST error */
236                             ldlm_lock_busy(lock)) {
237                                 LDLM_DEBUG(lock, "prolong the busy lock");
238                                 lock_res_and_lock(lock);
239                                 ldlm_add_waiting_lock(lock,
240                                                 ldlm_bl_timeout(lock) >> 1);
241                                 unlock_res_and_lock(lock);
242                         } else {
243                                 spin_lock_bh(&export->exp_bl_list_lock);
244                                 list_del_init(&lock->l_exp_list);
245                                 spin_unlock_bh(&export->exp_bl_list_lock);
246
247                                 LDLM_ERROR(lock,
248                                            "lock callback timer expired after %llds: evicting client at %s ",
249                                            ktime_get_real_seconds() -
250                                            lock->l_blast_sent,
251                                            obd_export_nid2str(export));
252                                 ldlm_lock_to_ns(lock)->ns_timeouts++;
253                                 do_dump++;
254                                 class_fail_export(export);
255                         }
256                         class_export_lock_put(export, lock);
257                         /* release extra ref grabbed by ldlm_add_waiting_lock()
258                          * or ldlm_failed_ast() */
259                         LDLM_LOCK_RELEASE(lock);
260
261                         spin_lock_bh(&waiting_locks_spinlock);
262                 }
263                 spin_unlock_bh(&waiting_locks_spinlock);
264
265                 if (do_dump && obd_dump_on_eviction) {
266                         CERROR("dump the log upon eviction\n");
267                         libcfs_debug_dumplog();
268                 }
269
270                 if (expired_lock_thread_state == ELT_TERMINATE)
271                         break;
272         }
273
274         expired_lock_thread_state = ELT_STOPPED;
275         wake_up(&expired_lock_wait_queue);
276         RETURN(0);
277 }
278
279 /**
280  * Check if there is a request in the export request list
281  * which prevents the lock canceling.
282  */
283 static int ldlm_lock_busy(struct ldlm_lock *lock)
284 {
285         struct ptlrpc_request *req;
286         int match = 0;
287         ENTRY;
288
289         if (lock->l_export == NULL)
290                 return 0;
291
292         spin_lock(&lock->l_export->exp_rpc_lock);
293         list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
294                                 rq_exp_list) {
295                 if (req->rq_ops->hpreq_lock_match) {
296                         match = req->rq_ops->hpreq_lock_match(req, lock);
297                         if (match)
298                                 break;
299                 }
300         }
301         spin_unlock(&lock->l_export->exp_rpc_lock);
302         RETURN(match);
303 }
304
305 /* This is called from within a timer interrupt and cannot schedule */
306 static void waiting_locks_callback(TIMER_DATA_TYPE unused)
307 {
308         struct ldlm_lock        *lock;
309         int                     need_dump = 0;
310
311         spin_lock_bh(&waiting_locks_spinlock);
312         while (!list_empty(&waiting_locks_list)) {
313                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
314                                   l_pending_chain);
315                 if (lock->l_callback_timeout > ktime_get_seconds() ||
316                     lock->l_req_mode == LCK_GROUP)
317                         break;
318
319                 /* no needs to take an extra ref on the lock since it was in
320                  * the waiting_locks_list and ldlm_add_waiting_lock()
321                  * already grabbed a ref */
322                 list_del(&lock->l_pending_chain);
323                 list_add(&lock->l_pending_chain, &expired_lock_list);
324                 need_dump = 1;
325         }
326
327         if (!list_empty(&expired_lock_list)) {
328                 if (obd_dump_on_timeout && need_dump)
329                         expired_lock_dump = __LINE__;
330
331                 wake_up(&expired_lock_wait_queue);
332         }
333
334         /*
335          * Make sure the timer will fire again if we have any locks
336          * left.
337          */
338         if (!list_empty(&waiting_locks_list)) {
339                 unsigned long timeout_jiffies;
340
341                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
342                                   l_pending_chain);
343                 timeout_jiffies = cfs_time_seconds(lock->l_callback_timeout);
344                 mod_timer(&waiting_locks_timer, timeout_jiffies);
345         }
346         spin_unlock_bh(&waiting_locks_spinlock);
347 }
348
349 /**
350  * Add lock to the list of contended locks.
351  *
352  * Indicate that we're waiting for a client to call us back cancelling a given
353  * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
354  * timer to fire appropriately.  (We round up to the next second, to avoid
355  * floods of timer firings during periods of high lock contention and traffic).
356  * As done by ldlm_add_waiting_lock(), the caller must grab a lock reference
357  * if it has been added to the waiting list (1 is returned).
358  *
359  * Called with the namespace lock held.
360  */
361 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds)
362 {
363         unsigned long timeout_jiffies;
364         time64_t timeout;
365
366         if (!list_empty(&lock->l_pending_chain))
367                 return 0;
368
369         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
370             OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
371                 seconds = 1;
372
373         timeout = ktime_get_seconds() + seconds;
374         if (likely(timeout > lock->l_callback_timeout))
375                 lock->l_callback_timeout = timeout;
376
377         timeout_jiffies = cfs_time_seconds(lock->l_callback_timeout);
378
379         if (time_before(timeout_jiffies, waiting_locks_timer.expires) ||
380             !timer_pending(&waiting_locks_timer))
381                 mod_timer(&waiting_locks_timer, timeout_jiffies);
382
383         /* if the new lock has a shorter timeout than something earlier on
384          * the list, we'll wait the longer amount of time; no big deal.
385          */
386         /* FIFO */
387         list_add_tail(&lock->l_pending_chain, &waiting_locks_list);
388         return 1;
389 }
390
391 static void ldlm_add_blocked_lock(struct ldlm_lock *lock)
392 {
393         spin_lock_bh(&lock->l_export->exp_bl_list_lock);
394         if (list_empty(&lock->l_exp_list)) {
395                 if (lock->l_granted_mode != lock->l_req_mode)
396                         list_add_tail(&lock->l_exp_list,
397                                       &lock->l_export->exp_bl_list);
398                 else
399                         list_add(&lock->l_exp_list,
400                                  &lock->l_export->exp_bl_list);
401         }
402         spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
403
404         /* A blocked lock is added. Adjust the position in
405          * the stale list if the export is in the list.
406          * If export is stale and not in the list - it is being
407          * processed and will be placed on the right position
408          * on obd_stale_export_put(). */
409         if (!list_empty(&lock->l_export->exp_stale_list))
410                 obd_stale_export_adjust(lock->l_export);
411 }
412
413 static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
414 {
415         int ret;
416
417         /* NB: must be called with hold of lock_res_and_lock() */
418         LASSERT(ldlm_is_res_locked(lock));
419         LASSERT(!ldlm_is_cancel_on_block(lock));
420
421         /* Do not put cross-MDT lock in the waiting list, since we
422          * will not evict it due to timeout for now */
423         if (lock->l_export != NULL &&
424             (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS))
425                 return 0;
426
427         spin_lock_bh(&waiting_locks_spinlock);
428         if (ldlm_is_cancel(lock)) {
429                 spin_unlock_bh(&waiting_locks_spinlock);
430                 return 0;
431         }
432
433         if (ldlm_is_destroyed(lock)) {
434                 static time64_t next;
435
436                 spin_unlock_bh(&waiting_locks_spinlock);
437                 LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
438                 if (ktime_get_seconds() > next) {
439                         next = ktime_get_seconds() + 14400;
440                         libcfs_debug_dumpstack(NULL);
441                 }
442                 return 0;
443         }
444
445         ldlm_set_waited(lock);
446         lock->l_blast_sent = ktime_get_real_seconds();
447         ret = __ldlm_add_waiting_lock(lock, timeout);
448         if (ret) {
449                 /* grab ref on the lock if it has been added to the
450                  * waiting list */
451                 LDLM_LOCK_GET(lock);
452         }
453         spin_unlock_bh(&waiting_locks_spinlock);
454
455         if (ret)
456                 ldlm_add_blocked_lock(lock);
457
458         LDLM_DEBUG(lock, "%sadding to wait list(timeout: %lld, AT: %s)",
459                    ret == 0 ? "not re-" : "", timeout,
460                    AT_OFF ? "off" : "on");
461         return ret;
462 }
463
464 /**
465  * Remove a lock from the pending list, likely because it had its cancellation
466  * callback arrive without incident.  This adjusts the lock-timeout timer if
467  * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
468  * As done by ldlm_del_waiting_lock(), the caller must release the lock
469  * reference when the lock is removed from any list (1 is returned).
470  *
471  * Called with namespace lock held.
472  */
473 static int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
474 {
475         struct list_head *list_next;
476
477         if (list_empty(&lock->l_pending_chain))
478                 return 0;
479
480         list_next = lock->l_pending_chain.next;
481         if (lock->l_pending_chain.prev == &waiting_locks_list) {
482                 /* Removing the head of the list, adjust timer. */
483                 if (list_next == &waiting_locks_list) {
484                         /* No more, just cancel. */
485                         del_timer(&waiting_locks_timer);
486                 } else {
487                         struct ldlm_lock *next;
488
489                         next = list_entry(list_next, struct ldlm_lock,
490                                           l_pending_chain);
491                         mod_timer(&waiting_locks_timer,
492                                   cfs_time_seconds(next->l_callback_timeout));
493                 }
494         }
495         list_del_init(&lock->l_pending_chain);
496
497         return 1;
498 }
499
500 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
501 {
502         int ret;
503
504         if (lock->l_export == NULL) {
505                 /* We don't have a "waiting locks list" on clients. */
506                 CDEBUG(D_DLMTRACE, "Client lock %p : no-op\n", lock);
507                 return 0;
508         }
509
510         spin_lock_bh(&waiting_locks_spinlock);
511         ret = __ldlm_del_waiting_lock(lock);
512         ldlm_clear_waited(lock);
513         spin_unlock_bh(&waiting_locks_spinlock);
514
515         /* remove the lock out of export blocking list */
516         spin_lock_bh(&lock->l_export->exp_bl_list_lock);
517         list_del_init(&lock->l_exp_list);
518         spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
519
520         if (ret) {
521                 /* release lock ref if it has indeed been removed
522                  * from a list */
523                 LDLM_LOCK_RELEASE(lock);
524         }
525
526         LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
527         return ret;
528 }
529
530 /**
531  * Prolong the contended lock waiting time.
532  *
533  * Called with namespace lock held.
534  */
535 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
536 {
537         if (lock->l_export == NULL) {
538                 /* We don't have a "waiting locks list" on clients. */
539                 LDLM_DEBUG(lock, "client lock: no-op");
540                 return 0;
541         }
542
543         if (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS) {
544                 /* We don't have a "waiting locks list" on OSP. */
545                 LDLM_DEBUG(lock, "MDS-MDS lock: no-op");
546                 return 0;
547         }
548
549         spin_lock_bh(&waiting_locks_spinlock);
550
551         if (list_empty(&lock->l_pending_chain)) {
552                 spin_unlock_bh(&waiting_locks_spinlock);
553                 LDLM_DEBUG(lock, "wasn't waiting");
554                 return 0;
555         }
556
557         /* we remove/add the lock to the waiting list, so no needs to
558          * release/take a lock reference */
559         __ldlm_del_waiting_lock(lock);
560         __ldlm_add_waiting_lock(lock, timeout);
561         spin_unlock_bh(&waiting_locks_spinlock);
562
563         LDLM_DEBUG(lock, "refreshed");
564         return 1;
565 }
566 EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
567
568 #else /* HAVE_SERVER_SUPPORT */
569
570 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
571 {
572         RETURN(0);
573 }
574
575 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
576 {
577         RETURN(0);
578 }
579
580 #endif /* !HAVE_SERVER_SUPPORT */
581
582 #ifdef HAVE_SERVER_SUPPORT
583
584 /**
585  * Calculate the per-export Blocking timeout (covering BL AST, data flush,
586  * lock cancel, and their replies). Used for lock callback timeout and AST
587  * re-send period.
588  *
589  * \param[in] lock        lock which is getting the blocking callback
590  *
591  * \retval            timeout in seconds to wait for the client reply
592  */
593 time64_t ldlm_bl_timeout(struct ldlm_lock *lock)
594 {
595         time64_t timeout;
596
597         if (AT_OFF)
598                 return obd_timeout / 2;
599
600         /* Since these are non-updating timeouts, we should be conservative.
601          * Take more than usually, 150%
602          * It would be nice to have some kind of "early reply" mechanism for
603          * lock callbacks too... */
604         timeout = at_get(&lock->l_export->exp_bl_lock_at);
605         return max(timeout + (timeout >> 1), (time64_t)ldlm_enqueue_min);
606 }
607 EXPORT_SYMBOL(ldlm_bl_timeout);
608
609 /**
610  * Perform lock cleanup if AST sending failed.
611  */
612 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
613                             const char *ast_type)
614 {
615         LCONSOLE_ERROR_MSG(0x138, "%s: A client on nid %s was evicted due "
616                            "to a lock %s callback time out: rc %d\n",
617                            lock->l_export->exp_obd->obd_name,
618                            obd_export_nid2str(lock->l_export), ast_type, rc);
619
620         if (obd_dump_on_timeout)
621                 libcfs_debug_dumplog();
622         spin_lock_bh(&waiting_locks_spinlock);
623         if (__ldlm_del_waiting_lock(lock) == 0)
624                 /* the lock was not in any list, grab an extra ref before adding
625                  * the lock to the expired list */
626                 LDLM_LOCK_GET(lock);
627         lock->l_callback_timeout = 0; /* differentiate it from expired locks */
628         list_add(&lock->l_pending_chain, &expired_lock_list);
629         wake_up(&expired_lock_wait_queue);
630         spin_unlock_bh(&waiting_locks_spinlock);
631 }
632
633 /**
634  * Perform lock cleanup if AST reply came with error.
635  */
636 static int ldlm_handle_ast_error(const struct lu_env *env,
637                                  struct ldlm_lock *lock,
638                                  struct ptlrpc_request *req, int rc,
639                                  const char *ast_type)
640 {
641         struct lnet_process_id peer = req->rq_import->imp_connection->c_peer;
642
643         if (!req->rq_replied || (rc && rc != -EINVAL)) {
644                 if (ldlm_is_cancel(lock)) {
645                         LDLM_DEBUG(lock,
646                                    "%s AST (req@%p x%llu) timeout from nid %s, but cancel was received (AST reply lost?)",
647                                    ast_type, req, req->rq_xid,
648                                    libcfs_nid2str(peer.nid));
649                         ldlm_lock_cancel(lock);
650                         rc = -ERESTART;
651                 } else if (rc == -ENODEV || rc == -ESHUTDOWN ||
652                            (rc == -EIO &&
653                             req->rq_import->imp_state == LUSTRE_IMP_CLOSED)) {
654                         /* Upon umount process the AST fails because cannot be
655                          * sent. This shouldn't lead to the client eviction.
656                          * -ENODEV error is returned by ptl_send_rpc() for
657                          *  new request in such import.
658                          * -SHUTDOWN is returned by ptlrpc_import_delay_req()
659                          *  if imp_invalid is set or obd_no_recov.
660                          * Meanwhile there is also check for LUSTRE_IMP_CLOSED
661                          * in ptlrpc_import_delay_req() as well with -EIO code.
662                          * In all such cases errors are ignored.
663                          */
664                         LDLM_DEBUG(lock, "%s AST can't be sent due to a server"
665                                          " %s failure or umount process: rc = %d\n",
666                                          ast_type,
667                                          req->rq_import->imp_obd->obd_name, rc);
668                 } else {
669                         LDLM_ERROR(lock,
670                                    "client (nid %s) %s %s AST (req@%p x%llu status %d rc %d), evict it",
671                                    libcfs_nid2str(peer.nid),
672                                    req->rq_replied ? "returned error from" :
673                                    "failed to reply to",
674                                    ast_type, req, req->rq_xid,
675                                    (req->rq_repmsg != NULL) ?
676                                    lustre_msg_get_status(req->rq_repmsg) : 0,
677                                    rc);
678                         ldlm_failed_ast(lock, rc, ast_type);
679                 }
680                 return rc;
681         }
682
683         if (rc == -EINVAL) {
684                 struct ldlm_resource *res = lock->l_resource;
685
686                 LDLM_DEBUG(lock,
687                            "client (nid %s) returned %d from %s AST (req@%p x%llu) - normal race",
688                            libcfs_nid2str(peer.nid),
689                            req->rq_repmsg ?
690                            lustre_msg_get_status(req->rq_repmsg) : -1,
691                            ast_type, req, req->rq_xid);
692                 if (res) {
693                         /* update lvbo to return proper attributes.
694                          * see bug 23174 */
695                         ldlm_resource_getref(res);
696                         ldlm_lvbo_update(env, res, lock, NULL, 1);
697                         ldlm_resource_putref(res);
698                 }
699                 ldlm_lock_cancel(lock);
700                 rc = -ERESTART;
701         }
702
703         return rc;
704 }
705
706 static int ldlm_cb_interpret(const struct lu_env *env,
707                              struct ptlrpc_request *req, void *data, int rc)
708 {
709         struct ldlm_cb_async_args *ca   = data;
710         struct ldlm_lock          *lock = ca->ca_lock;
711         struct ldlm_cb_set_arg    *arg  = ca->ca_set_arg;
712         ENTRY;
713
714         LASSERT(lock != NULL);
715
716         switch (arg->type) {
717         case LDLM_GL_CALLBACK:
718                 /* Update the LVB from disk if the AST failed
719                  * (this is a legal race)
720                  *
721                  * - Glimpse callback of local lock just returns
722                  *   -ELDLM_NO_LOCK_DATA.
723                  * - Glimpse callback of remote lock might return
724                  *   -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274
725                  */
726                 if (unlikely(arg->gl_interpret_reply)) {
727                         rc = arg->gl_interpret_reply(env, req, data, rc);
728                 } else if (rc == -ELDLM_NO_LOCK_DATA) {
729                         LDLM_DEBUG(lock, "lost race - client has a lock but no "
730                                    "inode");
731                         ldlm_lvbo_update(env, lock->l_resource, lock, NULL, 1);
732                 } else if (rc != 0) {
733                         rc = ldlm_handle_ast_error(env, lock, req,
734                                                    rc, "glimpse");
735                 } else {
736                         rc = ldlm_lvbo_update(env, lock->l_resource,
737                                               lock, req, 1);
738                 }
739                 break;
740         case LDLM_BL_CALLBACK:
741                 if (rc != 0)
742                         rc = ldlm_handle_ast_error(env, lock, req,
743                                                    rc, "blocking");
744                 break;
745         case LDLM_CP_CALLBACK:
746                 if (rc != 0)
747                         rc = ldlm_handle_ast_error(env, lock, req,
748                                                    rc, "completion");
749                 break;
750         default:
751                 LDLM_ERROR(lock, "invalid opcode for lock callback %d",
752                            arg->type);
753                 LBUG();
754         }
755
756         /* release extra reference taken in ldlm_ast_fini() */
757         LDLM_LOCK_RELEASE(lock);
758
759         if (rc == -ERESTART)
760                 atomic_inc(&arg->restart);
761
762         RETURN(0);
763 }
764
765 static void ldlm_update_resend(struct ptlrpc_request *req, void *data)
766 {
767         struct ldlm_cb_async_args *ca = data;
768         struct ldlm_lock *lock = ca->ca_lock;
769
770         ldlm_refresh_waiting_lock(lock, ldlm_bl_timeout(lock));
771 }
772
773 static inline int ldlm_ast_fini(struct ptlrpc_request *req,
774                                 struct ldlm_cb_set_arg *arg,
775                                 struct ldlm_lock *lock,
776                                 int instant_cancel)
777 {
778         int rc = 0;
779         ENTRY;
780
781         if (unlikely(instant_cancel)) {
782                 rc = ptl_send_rpc(req, 1);
783                 ptlrpc_req_finished(req);
784                 if (rc == 0)
785                         atomic_inc(&arg->restart);
786         } else {
787                 LDLM_LOCK_GET(lock);
788                 ptlrpc_set_add_req(arg->set, req);
789         }
790
791         RETURN(rc);
792 }
793
794 /**
795  * Check if there are requests in the export request list which prevent
796  * the lock canceling and make these requests high priority ones.
797  */
798 static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
799 {
800         struct ptlrpc_request *req;
801         ENTRY;
802
803         if (lock->l_export == NULL) {
804                 LDLM_DEBUG(lock, "client lock: no-op");
805                 RETURN_EXIT;
806         }
807
808         spin_lock(&lock->l_export->exp_rpc_lock);
809         list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
810                             rq_exp_list) {
811                 /* Do not process requests that were not yet added to there
812                  * incoming queue or were already removed from there for
813                  * processing. We evaluate ptlrpc_nrs_req_can_move() without
814                  * holding svcpt->scp_req_lock, and then redo the check with
815                  * the lock held once we need to obtain a reliable result.
816                  */
817                 if (ptlrpc_nrs_req_can_move(req) &&
818                     req->rq_ops->hpreq_lock_match &&
819                     req->rq_ops->hpreq_lock_match(req, lock))
820                         ptlrpc_nrs_req_hp_move(req);
821         }
822         spin_unlock(&lock->l_export->exp_rpc_lock);
823         EXIT;
824 }
825
826 /**
827  * ->l_blocking_ast() method for server-side locks. This is invoked when newly
828  * enqueued server lock conflicts with given one.
829  *
830  * Sends blocking AST RPC to the client owning that lock; arms timeout timer
831  * to wait for client response.
832  */
833 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
834                              struct ldlm_lock_desc *desc,
835                              void *data, int flag)
836 {
837         struct ldlm_cb_async_args *ca;
838         struct ldlm_cb_set_arg *arg = data;
839         struct ldlm_request    *body;
840         struct ptlrpc_request  *req;
841         int                     instant_cancel = 0;
842         int                     rc = 0;
843         ENTRY;
844
845         if (flag == LDLM_CB_CANCELING)
846                 /* Don't need to do anything here. */
847                 RETURN(0);
848
849         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) {
850                 LDLM_DEBUG(lock, "dropping BL AST");
851                 RETURN(0);
852         }
853
854         LASSERT(lock);
855         LASSERT(data != NULL);
856         if (lock->l_export->exp_obd->obd_recovering != 0)
857                 LDLM_ERROR(lock, "BUG 6063: lock collide during recovery");
858
859         ldlm_lock_reorder_req(lock);
860
861         req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
862                                         &RQF_LDLM_BL_CALLBACK,
863                                         LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK);
864         if (req == NULL)
865                 RETURN(-ENOMEM);
866
867         CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
868         ca = ptlrpc_req_async_args(req);
869         ca->ca_set_arg = arg;
870         ca->ca_lock = lock;
871
872         req->rq_interpret_reply = ldlm_cb_interpret;
873
874         lock_res_and_lock(lock);
875         if (ldlm_is_destroyed(lock)) {
876                 /* What's the point? */
877                 unlock_res_and_lock(lock);
878                 ptlrpc_req_finished(req);
879                 RETURN(0);
880         }
881
882         if (lock->l_granted_mode != lock->l_req_mode) {
883                 /* this blocking AST will be communicated as part of the
884                  * completion AST instead */
885                 ldlm_add_blocked_lock(lock);
886                 ldlm_set_waited(lock);
887                 unlock_res_and_lock(lock);
888
889                 ptlrpc_req_finished(req);
890                 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
891                 RETURN(0);
892         }
893
894         if (ldlm_is_cancel_on_block(lock))
895                 instant_cancel = 1;
896
897         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
898         body->lock_handle[0] = lock->l_remote_handle;
899         body->lock_desc = *desc;
900         body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_FL_AST_MASK);
901
902         LDLM_DEBUG(lock, "server preparing blocking AST");
903
904         ptlrpc_request_set_replen(req);
905         ldlm_set_cbpending(lock);
906         if (instant_cancel) {
907                 unlock_res_and_lock(lock);
908                 ldlm_lock_cancel(lock);
909
910                 req->rq_no_resend = 1;
911         } else {
912                 LASSERT(lock->l_granted_mode == lock->l_req_mode);
913                 ldlm_add_waiting_lock(lock, ldlm_bl_timeout(lock));
914                 unlock_res_and_lock(lock);
915
916                 /* Do not resend after lock callback timeout */
917                 req->rq_delay_limit = ldlm_bl_timeout(lock);
918                 req->rq_resend_cb = ldlm_update_resend;
919         }
920
921         req->rq_send_state = LUSTRE_IMP_FULL;
922         /* ptlrpc_request_alloc_pack already set timeout */
923         if (AT_OFF)
924                 req->rq_timeout = ldlm_get_rq_timeout();
925
926         if (lock->l_export && lock->l_export->exp_nid_stats &&
927             lock->l_export->exp_nid_stats->nid_ldlm_stats)
928                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
929                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
930
931         rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
932
933         RETURN(rc);
934 }
935
936 /**
937  * ->l_completion_ast callback for a remote lock in server namespace.
938  *
939  *  Sends AST to the client notifying it of lock granting.  If initial
940  *  lock response was not sent yet, instead of sending another RPC, just
941  *  mark the lock as granted and client will understand
942  */
943 int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
944 {
945         struct ldlm_cb_set_arg *arg = data;
946         struct ldlm_request    *body;
947         struct ptlrpc_request  *req;
948         struct ldlm_cb_async_args *ca;
949         int                     instant_cancel = 0;
950         int                     rc = 0;
951         int                     lvb_len;
952         ENTRY;
953
954         LASSERT(lock != NULL);
955         LASSERT(data != NULL);
956
957         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_CP_AST)) {
958                 LDLM_DEBUG(lock, "dropping CP AST");
959                 RETURN(0);
960         }
961
962         req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
963                                     &RQF_LDLM_CP_CALLBACK);
964         if (req == NULL)
965                 RETURN(-ENOMEM);
966
967         /* server namespace, doesn't need lock */
968         lvb_len = ldlm_lvbo_size(lock);
969         /* LU-3124 & LU-2187: to not return layout in completion AST because
970          * it may deadlock for LU-2187, or client may not have enough space
971          * for large layout. The layout will be returned to client with an
972          * extra RPC to fetch xattr.lov */
973         if (ldlm_has_layout(lock))
974                 lvb_len = 0;
975
976         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, lvb_len);
977         rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
978         if (rc) {
979                 ptlrpc_request_free(req);
980                 RETURN(rc);
981         }
982
983         CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
984         ca = ptlrpc_req_async_args(req);
985         ca->ca_set_arg = arg;
986         ca->ca_lock = lock;
987
988         req->rq_interpret_reply = ldlm_cb_interpret;
989         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
990
991         body->lock_handle[0] = lock->l_remote_handle;
992         body->lock_flags = ldlm_flags_to_wire(flags);
993         ldlm_lock2desc(lock, &body->lock_desc);
994         if (lvb_len > 0) {
995                 void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
996                 const struct lu_env *env = NULL;
997
998                 if (req->rq_svc_thread)
999                         env = req->rq_svc_thread->t_env;
1000
1001                 lvb_len = ldlm_lvbo_fill(env, lock, lvb, lvb_len);
1002                 if (lvb_len < 0) {
1003                         /* We still need to send the RPC to wake up the blocked
1004                          * enqueue thread on the client.
1005                          *
1006                          * Consider old client, there is no better way to notify
1007                          * the failure, just zero-sized the LVB, then the client
1008                          * will fail out as "-EPROTO". */
1009                         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, 0,
1010                                            RCL_CLIENT);
1011                         instant_cancel = 1;
1012                 } else {
1013                         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len,
1014                                            RCL_CLIENT);
1015                 }
1016         }
1017
1018         LDLM_DEBUG(lock, "server preparing completion AST");
1019
1020         ptlrpc_request_set_replen(req);
1021
1022         req->rq_send_state = LUSTRE_IMP_FULL;
1023         /* ptlrpc_request_pack already set timeout */
1024         if (AT_OFF)
1025                 req->rq_timeout = ldlm_get_rq_timeout();
1026
1027         /* We only send real blocking ASTs after the lock is granted */
1028         lock_res_and_lock(lock);
1029         if (ldlm_is_ast_sent(lock)) {
1030                 body->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT);
1031                 /* Copy AST flags like LDLM_FL_DISCARD_DATA. */
1032                 body->lock_flags |= ldlm_flags_to_wire(lock->l_flags &
1033                                                        LDLM_FL_AST_MASK);
1034
1035                 /* We might get here prior to ldlm_handle_enqueue setting
1036                  * LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
1037                  * into waiting list, but this is safe and similar code in
1038                  * ldlm_handle_enqueue will call ldlm_lock_cancel() still,
1039                  * that would not only cancel the lock, but will also remove
1040                  * it from waiting list */
1041                 if (ldlm_is_cancel_on_block(lock)) {
1042                         unlock_res_and_lock(lock);
1043                         ldlm_lock_cancel(lock);
1044
1045                         instant_cancel = 1;
1046                         req->rq_no_resend = 1;
1047
1048                         lock_res_and_lock(lock);
1049                 } else {
1050                         /* start the lock-timeout clock */
1051                         ldlm_add_waiting_lock(lock, ldlm_bl_timeout(lock));
1052                         /* Do not resend after lock callback timeout */
1053                         req->rq_delay_limit = ldlm_bl_timeout(lock);
1054                         req->rq_resend_cb = ldlm_update_resend;
1055                 }
1056         }
1057         unlock_res_and_lock(lock);
1058
1059         if (lock->l_export && lock->l_export->exp_nid_stats &&
1060             lock->l_export->exp_nid_stats->nid_ldlm_stats)
1061                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
1062                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
1063
1064         rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
1065
1066         RETURN(lvb_len < 0 ? lvb_len : rc);
1067 }
1068
1069 /**
1070  * Server side ->l_glimpse_ast handler for client locks.
1071  *
1072  * Sends glimpse AST to the client and waits for reply. Then updates
1073  * lvbo with the result.
1074  */
1075 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
1076 {
1077         struct ldlm_cb_set_arg          *arg = data;
1078         struct ldlm_request             *body;
1079         struct ptlrpc_request           *req;
1080         struct ldlm_cb_async_args       *ca;
1081         int                              rc;
1082         struct req_format               *req_fmt;
1083         ENTRY;
1084
1085         LASSERT(lock != NULL);
1086
1087         if (arg->gl_desc != NULL)
1088                 /* There is a glimpse descriptor to pack */
1089                 req_fmt = &RQF_LDLM_GL_CALLBACK_DESC;
1090         else
1091                 req_fmt = &RQF_LDLM_GL_CALLBACK;
1092
1093         req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
1094                                         req_fmt, LUSTRE_DLM_VERSION,
1095                                         LDLM_GL_CALLBACK);
1096
1097         if (req == NULL)
1098                 RETURN(-ENOMEM);
1099
1100         if (arg->gl_desc != NULL) {
1101                 /* copy the GL descriptor */
1102                 union ldlm_gl_desc      *desc;
1103                 desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
1104                 *desc = *arg->gl_desc;
1105         }
1106
1107         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1108         body->lock_handle[0] = lock->l_remote_handle;
1109         ldlm_lock2desc(lock, &body->lock_desc);
1110
1111         CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
1112         ca = ptlrpc_req_async_args(req);
1113         ca->ca_set_arg = arg;
1114         ca->ca_lock = lock;
1115
1116         /* server namespace, doesn't need lock */
1117         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
1118                              ldlm_lvbo_size(lock));
1119         ptlrpc_request_set_replen(req);
1120
1121         req->rq_send_state = LUSTRE_IMP_FULL;
1122         /* ptlrpc_request_alloc_pack already set timeout */
1123         if (AT_OFF)
1124                 req->rq_timeout = ldlm_get_rq_timeout();
1125
1126         req->rq_interpret_reply = ldlm_cb_interpret;
1127
1128         if (lock->l_export && lock->l_export->exp_nid_stats &&
1129             lock->l_export->exp_nid_stats->nid_ldlm_stats)
1130                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
1131                                      LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
1132
1133         rc = ldlm_ast_fini(req, arg, lock, 0);
1134
1135         RETURN(rc);
1136 }
1137 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
1138
1139 int ldlm_glimpse_locks(struct ldlm_resource *res,
1140                        struct list_head *gl_work_list)
1141 {
1142         int     rc;
1143         ENTRY;
1144
1145         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
1146                                LDLM_WORK_GL_AST);
1147         if (rc == -ERESTART)
1148                 ldlm_reprocess_all(res);
1149
1150         RETURN(rc);
1151 }
1152 EXPORT_SYMBOL(ldlm_glimpse_locks);
1153
1154 /* return LDLM lock associated with a lock callback request */
1155 struct ldlm_lock *ldlm_request_lock(struct ptlrpc_request *req)
1156 {
1157         struct ldlm_cb_async_args       *ca;
1158         struct ldlm_lock                *lock;
1159         ENTRY;
1160
1161         ca = ptlrpc_req_async_args(req);
1162         lock = ca->ca_lock;
1163         if (lock == NULL)
1164                 RETURN(ERR_PTR(-EFAULT));
1165
1166         RETURN(lock);
1167 }
1168 EXPORT_SYMBOL(ldlm_request_lock);
1169
1170 /**
1171  * Main server-side entry point into LDLM for enqueue. This is called by ptlrpc
1172  * service threads to carry out client lock enqueueing requests.
1173  */
1174 int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
1175                          struct ptlrpc_request *req,
1176                          const struct ldlm_request *dlm_req,
1177                          const struct ldlm_callback_suite *cbs)
1178 {
1179         struct ldlm_reply *dlm_rep;
1180         __u64 flags;
1181         enum ldlm_error err = ELDLM_OK;
1182         struct ldlm_lock *lock = NULL;
1183         void *cookie = NULL;
1184         int rc = 0;
1185         struct ldlm_resource *res = NULL;
1186         const struct lu_env *env = req->rq_svc_thread->t_env;
1187         ENTRY;
1188
1189         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
1190
1191         ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF, LATF_SKIP);
1192         flags = ldlm_flags_from_wire(dlm_req->lock_flags);
1193
1194         LASSERT(req->rq_export);
1195
1196         /* for intent enqueue the stat will be updated inside intent policy */
1197         if (ptlrpc_req2svc(req)->srv_stats != NULL &&
1198             !(dlm_req->lock_flags & LDLM_FL_HAS_INTENT))
1199                 ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats);
1200
1201         if (req->rq_export && req->rq_export->exp_nid_stats &&
1202             req->rq_export->exp_nid_stats->nid_ldlm_stats)
1203                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1204                                      LDLM_ENQUEUE - LDLM_FIRST_OPC);
1205
1206         if (unlikely(dlm_req->lock_desc.l_resource.lr_type < LDLM_MIN_TYPE ||
1207                      dlm_req->lock_desc.l_resource.lr_type >= LDLM_MAX_TYPE)) {
1208                 DEBUG_REQ(D_ERROR, req, "invalid lock request type %d",
1209                           dlm_req->lock_desc.l_resource.lr_type);
1210                 GOTO(out, rc = -EFAULT);
1211         }
1212
1213         if (unlikely(dlm_req->lock_desc.l_req_mode <= LCK_MINMODE ||
1214                      dlm_req->lock_desc.l_req_mode >= LCK_MAXMODE ||
1215                      dlm_req->lock_desc.l_req_mode &
1216                      (dlm_req->lock_desc.l_req_mode-1))) {
1217                 DEBUG_REQ(D_ERROR, req, "invalid lock request mode %d",
1218                           dlm_req->lock_desc.l_req_mode);
1219                 GOTO(out, rc = -EFAULT);
1220         }
1221
1222         if (unlikely((flags & LDLM_FL_REPLAY) ||
1223                      (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) {
1224                 /* Find an existing lock in the per-export lock hash */
1225                 /* In the function below, .hs_keycmp resolves to
1226                  * ldlm_export_lock_keycmp() */
1227                 /* coverity[overrun-buffer-val] */
1228                 lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
1229                                        (void *)&dlm_req->lock_handle[0]);
1230                 if (lock != NULL) {
1231                         DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie %#llx",
1232                                   lock->l_handle.h_cookie);
1233                         flags |= LDLM_FL_RESENT;
1234                         GOTO(existing_lock, rc = 0);
1235                 }
1236         } else {
1237                 if (ldlm_reclaim_full()) {
1238                         DEBUG_REQ(D_DLMTRACE, req, "Too many granted locks, "
1239                                   "reject current enqueue request and let the "
1240                                   "client retry later.\n");
1241                         GOTO(out, rc = -EINPROGRESS);
1242                 }
1243         }
1244
1245         /* The lock's callback data might be set in the policy function */
1246         lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
1247                                 dlm_req->lock_desc.l_resource.lr_type,
1248                                 dlm_req->lock_desc.l_req_mode,
1249                                 cbs, NULL, 0, LVB_T_NONE);
1250         if (IS_ERR(lock)) {
1251                 rc = PTR_ERR(lock);
1252                 lock = NULL;
1253                 GOTO(out, rc);
1254         }
1255
1256         lock->l_remote_handle = dlm_req->lock_handle[0];
1257         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
1258
1259         /* Initialize resource lvb but not for a lock being replayed since
1260          * Client already got lvb sent in this case.
1261          * This must occur early since some policy methods assume resource
1262          * lvb is available (lr_lvb_data != NULL).
1263          */
1264         res = lock->l_resource;
1265         if (!(flags & LDLM_FL_REPLAY)) {
1266                 /* non-replayed lock, delayed lvb init may need to be done */
1267                 rc = ldlm_lvbo_init(env, res);
1268                 if (rc < 0) {
1269                         LDLM_DEBUG(lock, "delayed lvb init failed (rc %d)", rc);
1270                         GOTO(out, rc);
1271                 }
1272         }
1273
1274         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
1275         /* Don't enqueue a lock onto the export if it is been disonnected
1276          * due to eviction (bug 3822) or server umount (bug 24324).
1277          * Cancel it now instead. */
1278         if (req->rq_export->exp_disconnected) {
1279                 LDLM_ERROR(lock, "lock on disconnected export %p",
1280                            req->rq_export);
1281                 GOTO(out, rc = -ENOTCONN);
1282         }
1283
1284         lock->l_export = class_export_lock_get(req->rq_export, lock);
1285         if (lock->l_export->exp_lock_hash)
1286                 cfs_hash_add(lock->l_export->exp_lock_hash,
1287                              &lock->l_remote_handle,
1288                              &lock->l_exp_hash);
1289
1290         /* Inherit the enqueue flags before the operation, because we do not
1291          * keep the res lock on return and next operations (BL AST) may proceed
1292          * without them. */
1293         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
1294                                               LDLM_FL_INHERIT_MASK);
1295
1296         ldlm_convert_policy_to_local(req->rq_export,
1297                                      dlm_req->lock_desc.l_resource.lr_type,
1298                                      &dlm_req->lock_desc.l_policy_data,
1299                                      &lock->l_policy_data);
1300         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
1301                 lock->l_req_extent = lock->l_policy_data.l_extent;
1302
1303 existing_lock:
1304         if (flags & LDLM_FL_HAS_INTENT) {
1305                 /* In this case, the reply buffer is allocated deep in
1306                  * local_lock_enqueue by the policy function. */
1307                 cookie = req;
1308         } else {
1309                 /* based on the assumption that lvb size never changes during
1310                  * resource life time otherwise it need resource->lr_lock's
1311                  * protection */
1312                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
1313                                      RCL_SERVER, ldlm_lvbo_size(lock));
1314
1315                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
1316                         GOTO(out, rc = -ENOMEM);
1317
1318                 rc = req_capsule_server_pack(&req->rq_pill);
1319                 if (rc)
1320                         GOTO(out, rc);
1321         }
1322
1323         err = ldlm_lock_enqueue(env, ns, &lock, cookie, &flags);
1324         if (err) {
1325                 if ((int)err < 0)
1326                         rc = (int)err;
1327                 GOTO(out, err);
1328         }
1329
1330         dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1331
1332         ldlm_lock2desc(lock, &dlm_rep->lock_desc);
1333         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
1334
1335         if (lock && lock->l_resource->lr_type == LDLM_EXTENT)
1336                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
1337
1338         /* We never send a blocking AST until the lock is granted, but
1339          * we can tell it right now */
1340         lock_res_and_lock(lock);
1341
1342         /* Now take into account flags to be inherited from original lock
1343            request both in reply to client and in our own lock flags. */
1344         dlm_rep->lock_flags = ldlm_flags_to_wire(flags);
1345         lock->l_flags |= flags & LDLM_FL_INHERIT_MASK;
1346
1347         /* Don't move a pending lock onto the export if it has already been
1348          * disconnected due to eviction (bug 5683) or server umount (bug 24324).
1349          * Cancel it now instead. */
1350         if (unlikely(req->rq_export->exp_disconnected ||
1351                      OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) {
1352                 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
1353                 rc = -ENOTCONN;
1354         } else if (ldlm_is_ast_sent(lock)) {
1355                 /* fill lock desc for possible lock convert */
1356                 if (lock->l_blocking_lock &&
1357                     lock->l_resource->lr_type == LDLM_IBITS) {
1358                         struct ldlm_lock *bl_lock = lock->l_blocking_lock;
1359                         struct ldlm_lock_desc *rep_desc = &dlm_rep->lock_desc;
1360
1361                         LDLM_DEBUG(lock,
1362                                    "save blocking bits %llx in granted lock",
1363                                    bl_lock->l_policy_data.l_inodebits.bits);
1364                         /* If lock is blocked then save blocking ibits
1365                          * in returned lock policy for the possible lock
1366                          * convert on a client.
1367                          */
1368                         rep_desc->l_policy_data.l_inodebits.cancel_bits =
1369                                 bl_lock->l_policy_data.l_inodebits.bits;
1370                 }
1371                 dlm_rep->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT);
1372                 if (lock->l_granted_mode == lock->l_req_mode) {
1373                         /*
1374                          * Only cancel lock if it was granted, because it would
1375                          * be destroyed immediately and would never be granted
1376                          * in the future, causing timeouts on client.  Not
1377                          * granted lock will be cancelled immediately after
1378                          * sending completion AST.
1379                          */
1380                         if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) {
1381                                 unlock_res_and_lock(lock);
1382                                 ldlm_lock_cancel(lock);
1383                                 lock_res_and_lock(lock);
1384                         } else {
1385                                 ldlm_add_waiting_lock(lock,
1386                                                       ldlm_bl_timeout(lock));
1387                         }
1388                 }
1389         }
1390         unlock_res_and_lock(lock);
1391
1392         EXIT;
1393  out:
1394         req->rq_status = rc ?: err; /* return either error - bug 11190 */
1395         if (!req->rq_packed_final) {
1396                 err = lustre_pack_reply(req, 1, NULL, NULL);
1397                 if (rc == 0)
1398                         rc = err;
1399         }
1400
1401         /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
1402          * ldlm_reprocess_all.  If this moves, revisit that code. -phil */
1403         if (lock != NULL) {
1404                 LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
1405                            "(err=%d, rc=%d)", err, rc);
1406
1407                 if (rc == 0) {
1408                         if (req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
1409                                                   RCL_SERVER) &&
1410                             ldlm_lvbo_size(lock) > 0) {
1411                                 void *buf;
1412                                 int buflen;
1413
1414                                 buf = req_capsule_server_get(&req->rq_pill,
1415                                                              &RMF_DLM_LVB);
1416                                 LASSERTF(buf != NULL, "req %p, lock %p\n",
1417                                          req, lock);
1418                                 buflen = req_capsule_get_size(&req->rq_pill,
1419                                                 &RMF_DLM_LVB, RCL_SERVER);
1420                                 /* non-replayed lock, delayed lvb init may
1421                                  * need to be occur now */
1422                                 if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
1423                                         buflen = ldlm_lvbo_fill(env, lock, buf,
1424                                                                 buflen);
1425                                         if (buflen >= 0)
1426                                                 req_capsule_shrink(
1427                                                         &req->rq_pill,
1428                                                         &RMF_DLM_LVB,
1429                                                         buflen, RCL_SERVER);
1430                                         else
1431                                                 rc = buflen;
1432                                 } else if (flags & LDLM_FL_REPLAY) {
1433                                         /* no LVB resend upon replay */
1434                                         if (buflen > 0)
1435                                                 req_capsule_shrink(
1436                                                         &req->rq_pill,
1437                                                         &RMF_DLM_LVB,
1438                                                         0, RCL_SERVER);
1439                                         else
1440                                                 rc = buflen;
1441                                 } else {
1442                                         rc = buflen;
1443                                 }
1444                         }
1445                 }
1446
1447                 if (rc != 0 && !(flags & LDLM_FL_RESENT)) {
1448                         if (lock->l_export) {
1449                                 ldlm_lock_cancel(lock);
1450                         } else {
1451                                 lock_res_and_lock(lock);
1452                                 ldlm_resource_unlink_lock(lock);
1453                                 ldlm_lock_destroy_nolock(lock);
1454                                 unlock_res_and_lock(lock);
1455
1456                         }
1457                 }
1458
1459                 if (!err && !ldlm_is_cbpending(lock) &&
1460                     dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
1461                         ldlm_reprocess_all(lock->l_resource);
1462
1463                 LDLM_LOCK_RELEASE(lock);
1464         }
1465
1466         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
1467                           lock, rc);
1468
1469         return rc;
1470 }
1471
1472 /**
1473  * Main LDLM entry point for server code to process lock conversion requests.
1474  */
1475 int ldlm_handle_convert0(struct ptlrpc_request *req,
1476                          const struct ldlm_request *dlm_req)
1477 {
1478         struct obd_export *exp = req->rq_export;
1479         struct ldlm_reply *dlm_rep;
1480         struct ldlm_lock *lock;
1481         int rc;
1482
1483         ENTRY;
1484
1485         if (exp && exp->exp_nid_stats && exp->exp_nid_stats->nid_ldlm_stats)
1486                 lprocfs_counter_incr(exp->exp_nid_stats->nid_ldlm_stats,
1487                                      LDLM_CONVERT - LDLM_FIRST_OPC);
1488
1489         rc = req_capsule_server_pack(&req->rq_pill);
1490         if (rc)
1491                 RETURN(rc);
1492
1493         dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1494         dlm_rep->lock_flags = dlm_req->lock_flags;
1495
1496         lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
1497         if (lock) {
1498                 __u64 bits;
1499                 __u64 new;
1500
1501                 bits = lock->l_policy_data.l_inodebits.bits;
1502                 new = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
1503                 LDLM_DEBUG(lock, "server-side convert handler START");
1504
1505                 if (ldlm_is_cancel(lock)) {
1506                         LDLM_ERROR(lock, "convert on canceled lock!");
1507                         rc = ELDLM_NO_LOCK_DATA;
1508                 } else if (dlm_req->lock_desc.l_req_mode !=
1509                            lock->l_granted_mode) {
1510                         LDLM_ERROR(lock, "lock mode differs!");
1511                         rc = ELDLM_NO_LOCK_DATA;
1512                 } else if (bits == new) {
1513                         /* This can be valid situation if CONVERT RPCs are
1514                          * re-ordered. Just finish silently */
1515                         LDLM_DEBUG(lock, "lock is converted already!");
1516                         rc = ELDLM_OK;
1517                 } else {
1518                         lock_res_and_lock(lock);
1519                         if (ldlm_is_waited(lock))
1520                                 ldlm_del_waiting_lock(lock);
1521
1522                         ldlm_clear_cbpending(lock);
1523                         lock->l_policy_data.l_inodebits.cancel_bits = 0;
1524                         ldlm_inodebits_drop(lock, bits & ~new);
1525                         /* if lock is in a bl_ast list, remove it from the list
1526                          * here before reprocessing.
1527                          */
1528                         if (!list_empty(&lock->l_bl_ast)) {
1529                                 ldlm_discard_bl_lock(lock);
1530                         } else {
1531                                 /* in this case lock was taken from bl_ast list
1532                                  * already by ldlm_work_bl_ast_lock() and lock
1533                                  * must clear only some remaining states.
1534                                  */
1535                                 ldlm_clear_ast_sent(lock);
1536                                 lock->l_bl_ast_run = 0;
1537                                 ldlm_clear_blocking_lock(lock);
1538                         }
1539                         unlock_res_and_lock(lock);
1540
1541                         ldlm_reprocess_all(lock->l_resource);
1542                         rc = ELDLM_OK;
1543                 }
1544
1545                 if (rc == ELDLM_OK) {
1546                         dlm_rep->lock_handle = lock->l_remote_handle;
1547                         ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
1548                                         &dlm_rep->lock_desc.l_policy_data);
1549                 }
1550
1551                 LDLM_DEBUG(lock, "server-side convert handler END, rc = %d",
1552                            rc);
1553                 LDLM_LOCK_PUT(lock);
1554         } else {
1555                 rc = ELDLM_NO_LOCK_DATA;
1556                 LDLM_DEBUG_NOLOCK("server-side convert handler END, rc = %d",
1557                                   rc);
1558         }
1559
1560         req->rq_status = rc;
1561
1562         RETURN(0);
1563 }
1564
1565 /**
1566  * Cancel all the locks whose handles are packed into ldlm_request
1567  *
1568  * Called by server code expecting such combined cancel activity
1569  * requests.
1570  */
1571 int ldlm_request_cancel(struct ptlrpc_request *req,
1572                         const struct ldlm_request *dlm_req,
1573                         int first, enum lustre_at_flags flags)
1574 {
1575         const struct lu_env *env = req->rq_svc_thread->t_env;
1576         struct ldlm_resource *res, *pres = NULL;
1577         struct ldlm_lock *lock;
1578         int i, count, done = 0;
1579         ENTRY;
1580
1581         count = dlm_req->lock_count ? dlm_req->lock_count : 1;
1582         if (first >= count)
1583                 RETURN(0);
1584
1585         if (count == 1 && dlm_req->lock_handle[0].cookie == 0)
1586                 RETURN(0);
1587
1588         /* There is no lock on the server at the replay time,
1589          * skip lock cancelling to make replay tests to pass. */
1590         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1591                 RETURN(0);
1592
1593         LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, "
1594                           "starting at %d", count, first);
1595
1596         for (i = first; i < count; i++) {
1597                 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
1598                 if (!lock) {
1599                         LDLM_DEBUG_NOLOCK("server-side cancel handler stale "
1600                                           "lock (cookie %llu)",
1601                                           dlm_req->lock_handle[i].cookie);
1602                         continue;
1603                 }
1604
1605                 res = lock->l_resource;
1606                 done++;
1607
1608                 /* This code is an optimization to only attempt lock
1609                  * granting on the resource (that could be CPU-expensive)
1610                  * after we are done cancelling lock in that resource. */
1611                 if (res != pres) {
1612                         if (pres != NULL) {
1613                                 ldlm_reprocess_all(pres);
1614                                 LDLM_RESOURCE_DELREF(pres);
1615                                 ldlm_resource_putref(pres);
1616                         }
1617                         if (res != NULL) {
1618                                 ldlm_resource_getref(res);
1619                                 LDLM_RESOURCE_ADDREF(res);
1620
1621                                 if (!ldlm_is_discard_data(lock))
1622                                         ldlm_lvbo_update(env, res, lock,
1623                                                          NULL, 1);
1624                         }
1625                         pres = res;
1626                 }
1627
1628                 if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock) &&
1629                     lock->l_blast_sent != 0) {
1630                         time64_t delay = ktime_get_real_seconds() -
1631                                          lock->l_blast_sent;
1632                         LDLM_DEBUG(lock, "server cancels blocked lock after %llds",
1633                                    (s64)delay);
1634                         at_measured(&lock->l_export->exp_bl_lock_at, delay);
1635                 }
1636                 ldlm_lock_cancel(lock);
1637                 LDLM_LOCK_PUT(lock);
1638         }
1639         if (pres != NULL) {
1640                 ldlm_reprocess_all(pres);
1641                 LDLM_RESOURCE_DELREF(pres);
1642                 ldlm_resource_putref(pres);
1643         }
1644         LDLM_DEBUG_NOLOCK("server-side cancel handler END");
1645         RETURN(done);
1646 }
1647 EXPORT_SYMBOL(ldlm_request_cancel);
1648
1649 /**
1650  * Main LDLM entry point for server code to cancel locks.
1651  *
1652  * Typically gets called from service handler on LDLM_CANCEL opc.
1653  */
1654 int ldlm_handle_cancel(struct ptlrpc_request *req)
1655 {
1656         struct ldlm_request *dlm_req;
1657         int rc;
1658         ENTRY;
1659
1660         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1661         if (dlm_req == NULL) {
1662                 CDEBUG(D_INFO, "bad request buffer for cancel\n");
1663                 RETURN(-EFAULT);
1664         }
1665
1666         if (req->rq_export && req->rq_export->exp_nid_stats &&
1667             req->rq_export->exp_nid_stats->nid_ldlm_stats)
1668                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1669                                      LDLM_CANCEL - LDLM_FIRST_OPC);
1670
1671         rc = req_capsule_server_pack(&req->rq_pill);
1672         if (rc)
1673                 RETURN(rc);
1674
1675         if (!ldlm_request_cancel(req, dlm_req, 0, LATF_STATS))
1676                 req->rq_status = LUSTRE_ESTALE;
1677
1678         RETURN(ptlrpc_reply(req));
1679 }
1680 #endif /* HAVE_SERVER_SUPPORT */
1681
1682 /**
1683  * Callback handler for receiving incoming blocking ASTs.
1684  *
1685  * This can only happen on client side.
1686  */
1687 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
1688                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
1689 {
1690         int do_ast;
1691         ENTRY;
1692
1693         LDLM_DEBUG(lock, "client blocking AST callback handler");
1694
1695         lock_res_and_lock(lock);
1696
1697         /* set bits to cancel for this lock for possible lock convert */
1698         if (ns_is_client(ns) && (lock->l_resource->lr_type == LDLM_IBITS)) {
1699                 /* Lock description contains policy of blocking lock,
1700                  * and its cancel_bits is used to pass conflicting bits.
1701                  * NOTE: ld can be NULL or can be not NULL but zeroed if
1702                  * passed from ldlm_bl_thread_blwi(), check below used bits
1703                  * in ld to make sure it is valid description.
1704                  */
1705                 if (ld && ld->l_policy_data.l_inodebits.bits)
1706                         lock->l_policy_data.l_inodebits.cancel_bits =
1707                                 ld->l_policy_data.l_inodebits.cancel_bits;
1708                 /* if there is no valid ld and lock is cbpending already
1709                  * then cancel_bits should be kept, otherwise it is zeroed.
1710                  */
1711                 else if (!ldlm_is_cbpending(lock))
1712                         lock->l_policy_data.l_inodebits.cancel_bits = 0;
1713         }
1714         ldlm_set_cbpending(lock);
1715
1716         if (ldlm_is_cancel_on_block(lock))
1717                 ldlm_set_cancel(lock);
1718
1719         do_ast = (!lock->l_readers && !lock->l_writers);
1720         unlock_res_and_lock(lock);
1721
1722         if (do_ast) {
1723                 CDEBUG(D_DLMTRACE, "Lock %p already unused, calling callback (%p)\n",
1724                        lock, lock->l_blocking_ast);
1725                 if (lock->l_blocking_ast != NULL)
1726                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
1727                                              LDLM_CB_BLOCKING);
1728         } else {
1729                 CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
1730                        lock);
1731         }
1732
1733         LDLM_DEBUG(lock, "client blocking callback handler END");
1734         LDLM_LOCK_RELEASE(lock);
1735         EXIT;
1736 }
1737
1738 /**
1739  * Callback handler for receiving incoming completion ASTs.
1740  *
1741  * This only can happen on client side.
1742  */
1743 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
1744                                     struct ldlm_namespace *ns,
1745                                     struct ldlm_request *dlm_req,
1746                                     struct ldlm_lock *lock)
1747 {
1748         struct list_head ast_list;
1749         int lvb_len;
1750         int rc = 0;
1751         ENTRY;
1752
1753         LDLM_DEBUG(lock, "client completion callback handler START");
1754
1755         INIT_LIST_HEAD(&ast_list);
1756         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
1757                 long to = cfs_time_seconds(1);
1758
1759                 while (to > 0) {
1760                         set_current_state(TASK_INTERRUPTIBLE);
1761                         schedule_timeout(to);
1762                         if (lock->l_granted_mode == lock->l_req_mode ||
1763                             ldlm_is_destroyed(lock))
1764                                 break;
1765                 }
1766         }
1767
1768         lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
1769         if (lvb_len < 0) {
1770                 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
1771                 GOTO(out, rc = lvb_len);
1772         } else if (lvb_len > 0) {
1773                 if (lock->l_lvb_len > 0) {
1774                         /* for extent lock, lvb contains ost_lvb{}. */
1775                         LASSERT(lock->l_lvb_data != NULL);
1776
1777                         if (unlikely(lock->l_lvb_len < lvb_len)) {
1778                                 LDLM_ERROR(lock, "Replied LVB is larger than "
1779                                            "expectation, expected = %d, "
1780                                            "replied = %d",
1781                                            lock->l_lvb_len, lvb_len);
1782                                 GOTO(out, rc = -EINVAL);
1783                         }
1784                 }
1785         }
1786
1787         lock_res_and_lock(lock);
1788         if (ldlm_is_destroyed(lock) ||
1789             lock->l_granted_mode == lock->l_req_mode) {
1790                 /* bug 11300: the lock has already been granted */
1791                 unlock_res_and_lock(lock);
1792                 LDLM_DEBUG(lock, "Double grant race happened");
1793                 GOTO(out, rc = 0);
1794         }
1795
1796         /* If we receive the completion AST before the actual enqueue returned,
1797          * then we might need to switch lock modes, resources, or extents. */
1798         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
1799                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
1800                 LDLM_DEBUG(lock, "completion AST, new lock mode");
1801         }
1802
1803         if (lock->l_resource->lr_type != LDLM_PLAIN) {
1804                 ldlm_convert_policy_to_local(req->rq_export,
1805                                           dlm_req->lock_desc.l_resource.lr_type,
1806                                           &dlm_req->lock_desc.l_policy_data,
1807                                           &lock->l_policy_data);
1808                 LDLM_DEBUG(lock, "completion AST, new policy data");
1809         }
1810
1811         ldlm_resource_unlink_lock(lock);
1812         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
1813                    &lock->l_resource->lr_name,
1814                    sizeof(lock->l_resource->lr_name)) != 0) {
1815                 unlock_res_and_lock(lock);
1816                 rc = ldlm_lock_change_resource(ns, lock,
1817                                 &dlm_req->lock_desc.l_resource.lr_name);
1818                 if (rc < 0) {
1819                         LDLM_ERROR(lock, "Failed to allocate resource");
1820                         GOTO(out, rc);
1821                 }
1822                 LDLM_DEBUG(lock, "completion AST, new resource");
1823                 CERROR("change resource!\n");
1824                 lock_res_and_lock(lock);
1825         }
1826
1827         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
1828                 /* BL_AST locks are not needed in LRU.
1829                  * Let ldlm_cancel_lru() be fast. */
1830                 ldlm_lock_remove_from_lru(lock);
1831                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
1832                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
1833         }
1834
1835         if (lock->l_lvb_len > 0) {
1836                 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
1837                                    lock->l_lvb_data, lvb_len);
1838                 if (rc < 0) {
1839                         unlock_res_and_lock(lock);
1840                         GOTO(out, rc);
1841                 }
1842         }
1843
1844         ldlm_grant_lock(lock, &ast_list);
1845         unlock_res_and_lock(lock);
1846
1847         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
1848
1849         /* Let Enqueue to call osc_lock_upcall() and initialize
1850          * l_ast_data */
1851         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
1852
1853         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
1854
1855         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
1856                           lock);
1857         GOTO(out, rc);
1858
1859 out:
1860         if (rc < 0) {
1861                 lock_res_and_lock(lock);
1862                 ldlm_set_failed(lock);
1863                 unlock_res_and_lock(lock);
1864                 wake_up(&lock->l_waitq);
1865         }
1866         LDLM_LOCK_RELEASE(lock);
1867 }
1868
1869 /**
1870  * Callback handler for receiving incoming glimpse ASTs.
1871  *
1872  * This only can happen on client side.  After handling the glimpse AST
1873  * we also consider dropping the lock here if it is unused locally for a
1874  * long time.
1875  */
1876 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
1877                                     struct ldlm_namespace *ns,
1878                                     struct ldlm_request *dlm_req,
1879                                     struct ldlm_lock *lock)
1880 {
1881         int rc = -ENOSYS;
1882         ENTRY;
1883
1884         LDLM_DEBUG(lock, "client glimpse AST callback handler");
1885
1886         if (lock->l_glimpse_ast != NULL)
1887                 rc = lock->l_glimpse_ast(lock, req);
1888
1889         if (req->rq_repmsg != NULL) {
1890                 ptlrpc_reply(req);
1891         } else {
1892                 req->rq_status = rc;
1893                 ptlrpc_error(req);
1894         }
1895
1896         lock_res_and_lock(lock);
1897         if (lock->l_granted_mode == LCK_PW &&
1898             !lock->l_readers && !lock->l_writers &&
1899             ktime_after(ktime_get(),
1900                         ktime_add(lock->l_last_used,
1901                                   ktime_set(ns->ns_dirty_age_limit, 0)))) {
1902                 unlock_res_and_lock(lock);
1903                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
1904                         ldlm_handle_bl_callback(ns, NULL, lock);
1905
1906                 EXIT;
1907                 return;
1908         }
1909         unlock_res_and_lock(lock);
1910         LDLM_LOCK_RELEASE(lock);
1911         EXIT;
1912 }
1913
1914 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
1915 {
1916         if (req->rq_no_reply)
1917                 return 0;
1918
1919         req->rq_status = rc;
1920         if (!req->rq_packed_final) {
1921                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1922                 if (rc)
1923                         return rc;
1924         }
1925         return ptlrpc_reply(req);
1926 }
1927
1928 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
1929                                enum ldlm_cancel_flags cancel_flags)
1930 {
1931         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1932         ENTRY;
1933
1934         spin_lock(&blp->blp_lock);
1935         if (blwi->blwi_lock &&
1936             ldlm_is_discard_data(blwi->blwi_lock)) {
1937                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
1938                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
1939         } else {
1940                 /* other blocking callbacks are added to the regular list */
1941                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
1942         }
1943         spin_unlock(&blp->blp_lock);
1944
1945         wake_up(&blp->blp_waitq);
1946
1947         /* can not check blwi->blwi_flags as blwi could be already freed in
1948            LCF_ASYNC mode */
1949         if (!(cancel_flags & LCF_ASYNC))
1950                 wait_for_completion(&blwi->blwi_comp);
1951
1952         RETURN(0);
1953 }
1954
1955 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
1956                              struct ldlm_namespace *ns,
1957                              struct ldlm_lock_desc *ld,
1958                              struct list_head *cancels, int count,
1959                              struct ldlm_lock *lock,
1960                              enum ldlm_cancel_flags cancel_flags)
1961 {
1962         init_completion(&blwi->blwi_comp);
1963         INIT_LIST_HEAD(&blwi->blwi_head);
1964
1965         if (memory_pressure_get())
1966                 blwi->blwi_mem_pressure = 1;
1967
1968         blwi->blwi_ns = ns;
1969         blwi->blwi_flags = cancel_flags;
1970         if (ld != NULL)
1971                 blwi->blwi_ld = *ld;
1972         if (count) {
1973                 list_add(&blwi->blwi_head, cancels);
1974                 list_del_init(cancels);
1975                 blwi->blwi_count = count;
1976         } else {
1977                 blwi->blwi_lock = lock;
1978         }
1979 }
1980
1981 /**
1982  * Queues a list of locks \a cancels containing \a count locks
1983  * for later processing by a blocking thread.  If \a count is zero,
1984  * then the lock referenced as \a lock is queued instead.
1985  *
1986  * The blocking thread would then call ->l_blocking_ast callback in the lock.
1987  * If list addition fails an error is returned and caller is supposed to
1988  * call ->l_blocking_ast itself.
1989  */
1990 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
1991                              struct ldlm_lock_desc *ld,
1992                              struct ldlm_lock *lock,
1993                              struct list_head *cancels, int count,
1994                              enum ldlm_cancel_flags cancel_flags)
1995 {
1996         ENTRY;
1997
1998         if (cancels && count == 0)
1999                 RETURN(0);
2000
2001         if (cancel_flags & LCF_ASYNC) {
2002                 struct ldlm_bl_work_item *blwi;
2003
2004                 OBD_ALLOC(blwi, sizeof(*blwi));
2005                 if (blwi == NULL)
2006                         RETURN(-ENOMEM);
2007                 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
2008
2009                 RETURN(__ldlm_bl_to_thread(blwi, cancel_flags));
2010         } else {
2011                 /* if it is synchronous call do minimum mem alloc, as it could
2012                  * be triggered from kernel shrinker
2013                  */
2014                 struct ldlm_bl_work_item blwi;
2015
2016                 memset(&blwi, 0, sizeof(blwi));
2017                 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
2018                 RETURN(__ldlm_bl_to_thread(&blwi, cancel_flags));
2019         }
2020 }
2021
2022
2023 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
2024                            struct ldlm_lock *lock)
2025 {
2026         return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
2027 }
2028
2029 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
2030                            struct list_head *cancels, int count,
2031                            enum ldlm_cancel_flags cancel_flags)
2032 {
2033         return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
2034 }
2035
2036 int ldlm_bl_thread_wakeup(void)
2037 {
2038         wake_up(&ldlm_state->ldlm_bl_pool->blp_waitq);
2039         return 0;
2040 }
2041
2042 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
2043 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
2044 {
2045         struct obd_device *obd = req->rq_export->exp_obd;
2046         char *key;
2047         void *val;
2048         int keylen, vallen;
2049         int rc = -ENOSYS;
2050         ENTRY;
2051
2052         DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
2053
2054         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
2055
2056         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2057         if (key == NULL) {
2058                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
2059                 RETURN(-EFAULT);
2060         }
2061         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
2062                                       RCL_CLIENT);
2063         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
2064         if (val == NULL) {
2065                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
2066                 RETURN(-EFAULT);
2067         }
2068         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
2069                                       RCL_CLIENT);
2070
2071         /* We are responsible for swabbing contents of val */
2072
2073         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
2074                 /* Pass it on to mdc (the "export" in this case) */
2075                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
2076                                         req->rq_export,
2077                                         sizeof(KEY_HSM_COPYTOOL_SEND),
2078                                         KEY_HSM_COPYTOOL_SEND,
2079                                         vallen, val, NULL);
2080         else
2081                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
2082
2083         return rc;
2084 }
2085
2086 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
2087                                         const char *msg, int rc,
2088                                         const struct lustre_handle *handle)
2089 {
2090         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
2091                   "%s: [nid %s] [rc %d] [lock %#llx]",
2092                   msg, libcfs_id2str(req->rq_peer), rc,
2093                   handle ? handle->cookie : 0);
2094         if (req->rq_no_reply)
2095                 CWARN("No reply was sent, maybe cause bug 21636.\n");
2096         else if (rc)
2097                 CWARN("Send reply failed, maybe cause bug 21636.\n");
2098 }
2099
2100 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
2101 static int ldlm_callback_handler(struct ptlrpc_request *req)
2102 {
2103         struct ldlm_namespace *ns;
2104         struct ldlm_request *dlm_req;
2105         struct ldlm_lock *lock;
2106         int rc;
2107         ENTRY;
2108
2109         /* Requests arrive in sender's byte order.  The ptlrpc service
2110          * handler has already checked and, if necessary, byte-swapped the
2111          * incoming request message body, but I am responsible for the
2112          * message buffers. */
2113
2114         /* do nothing for sec context finalize */
2115         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
2116                 RETURN(0);
2117
2118         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2119
2120         if (req->rq_export == NULL) {
2121                 rc = ldlm_callback_reply(req, -ENOTCONN);
2122                 ldlm_callback_errmsg(req, "Operate on unconnected server",
2123                                      rc, NULL);
2124                 RETURN(0);
2125         }
2126
2127         LASSERT(req->rq_export != NULL);
2128         LASSERT(req->rq_export->exp_obd != NULL);
2129
2130         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2131         case LDLM_BL_CALLBACK:
2132                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) {
2133                         if (cfs_fail_err)
2134                                 ldlm_callback_reply(req, -(int)cfs_fail_err);
2135                         RETURN(0);
2136                 }
2137                 break;
2138         case LDLM_CP_CALLBACK:
2139                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
2140                         RETURN(0);
2141                 break;
2142         case LDLM_GL_CALLBACK:
2143                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
2144                         RETURN(0);
2145                 break;
2146         case LDLM_SET_INFO:
2147                 rc = ldlm_handle_setinfo(req);
2148                 ldlm_callback_reply(req, rc);
2149                 RETURN(0);
2150         default:
2151                 CERROR("unknown opcode %u\n",
2152                        lustre_msg_get_opc(req->rq_reqmsg));
2153                 ldlm_callback_reply(req, -EPROTO);
2154                 RETURN(0);
2155         }
2156
2157         ns = req->rq_export->exp_obd->obd_namespace;
2158         LASSERT(ns != NULL);
2159
2160         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
2161
2162         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2163         if (dlm_req == NULL) {
2164                 rc = ldlm_callback_reply(req, -EPROTO);
2165                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
2166                                      NULL);
2167                 RETURN(0);
2168         }
2169
2170         /* Force a known safe race, send a cancel to the server for a lock
2171          * which the server has already started a blocking callback on. */
2172         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
2173             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
2174                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
2175                 if (rc < 0)
2176                         CERROR("ldlm_cli_cancel: %d\n", rc);
2177         }
2178
2179         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
2180         if (!lock) {
2181                 CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock "
2182                        "disappeared\n", dlm_req->lock_handle[0].cookie);
2183                 rc = ldlm_callback_reply(req, -EINVAL);
2184                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
2185                                      &dlm_req->lock_handle[0]);
2186                 RETURN(0);
2187         }
2188
2189         if (ldlm_is_fail_loc(lock) &&
2190             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
2191                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
2192
2193         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
2194         lock_res_and_lock(lock);
2195         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
2196                                               LDLM_FL_AST_MASK);
2197         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
2198                 /* If somebody cancels lock and cache is already dropped,
2199                  * or lock is failed before cp_ast received on client,
2200                  * we can tell the server we have no lock. Otherwise, we
2201                  * should send cancel after dropping the cache. */
2202                 if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
2203                      ldlm_is_failed(lock)) {
2204                         LDLM_DEBUG(lock, "callback on lock %llx - lock disappeared",
2205                                    dlm_req->lock_handle[0].cookie);
2206                         unlock_res_and_lock(lock);
2207                         LDLM_LOCK_RELEASE(lock);
2208                         rc = ldlm_callback_reply(req, -EINVAL);
2209                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
2210                                              &dlm_req->lock_handle[0]);
2211                         RETURN(0);
2212                 }
2213                 /* BL_AST locks are not needed in LRU.
2214                  * Let ldlm_cancel_lru() be fast. */
2215                 ldlm_lock_remove_from_lru(lock);
2216                 ldlm_set_bl_ast(lock);
2217         }
2218         unlock_res_and_lock(lock);
2219
2220         /* We want the ost thread to get this reply so that it can respond
2221          * to ost requests (write cache writeback) that might be triggered
2222          * in the callback.
2223          *
2224          * But we'd also like to be able to indicate in the reply that we're
2225          * cancelling right now, because it's unused, or have an intent result
2226          * in the reply, so we might have to push the responsibility for sending
2227          * the reply down into the AST handlers, alas. */
2228
2229         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2230         case LDLM_BL_CALLBACK:
2231                 CDEBUG(D_INODE, "blocking ast\n");
2232                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
2233                 if (!ldlm_is_cancel_on_block(lock)) {
2234                         rc = ldlm_callback_reply(req, 0);
2235                         if (req->rq_no_reply || rc)
2236                                 ldlm_callback_errmsg(req, "Normal process", rc,
2237                                                      &dlm_req->lock_handle[0]);
2238                 }
2239                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
2240                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
2241                 break;
2242         case LDLM_CP_CALLBACK:
2243                 CDEBUG(D_INODE, "completion ast\n");
2244                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
2245                 ldlm_callback_reply(req, 0);
2246                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
2247                 break;
2248         case LDLM_GL_CALLBACK:
2249                 CDEBUG(D_INODE, "glimpse ast\n");
2250                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
2251                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
2252                 break;
2253         default:
2254                 LBUG();                         /* checked above */
2255         }
2256
2257         RETURN(0);
2258 }
2259
2260 #ifdef HAVE_SERVER_SUPPORT
2261 /**
2262  * Main handler for canceld thread.
2263  *
2264  * Separated into its own thread to avoid deadlocks.
2265  */
2266 static int ldlm_cancel_handler(struct ptlrpc_request *req)
2267 {
2268         int rc;
2269
2270         ENTRY;
2271
2272         /* Requests arrive in sender's byte order.  The ptlrpc service
2273          * handler has already checked and, if necessary, byte-swapped the
2274          * incoming request message body, but I am responsible for the
2275          * message buffers. */
2276
2277         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2278
2279         if (req->rq_export == NULL) {
2280                 struct ldlm_request *dlm_req;
2281
2282                 CERROR("%s from %s arrived at %lu with bad export cookie "
2283                        "%llu\n",
2284                        ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
2285                        libcfs_nid2str(req->rq_peer.nid),
2286                        req->rq_arrival_time.tv_sec,
2287                        lustre_msg_get_handle(req->rq_reqmsg)->cookie);
2288
2289                 if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
2290                         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
2291                         dlm_req = req_capsule_client_get(&req->rq_pill,
2292                                                          &RMF_DLM_REQ);
2293                         if (dlm_req != NULL)
2294                                 ldlm_lock_dump_handle(D_ERROR,
2295                                                       &dlm_req->lock_handle[0]);
2296                 }
2297                 ldlm_callback_reply(req, -ENOTCONN);
2298                 RETURN(0);
2299         }
2300
2301         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2302         /* XXX FIXME move this back to mds/handler.c, bug 249 */
2303         case LDLM_CANCEL:
2304                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2305                 CDEBUG(D_INODE, "cancel\n");
2306                 if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) ||
2307                     CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND) ||
2308                     CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_EVICT))
2309                         RETURN(0);
2310                 rc = ldlm_handle_cancel(req);
2311                 break;
2312         case LDLM_CONVERT:
2313         {
2314                 struct ldlm_request *dlm_req;
2315
2316                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
2317                 CDEBUG(D_INODE, "convert\n");
2318
2319                 dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2320                 if (dlm_req == NULL) {
2321                         CDEBUG(D_INFO, "bad request buffer for cancel\n");
2322                         rc = ldlm_callback_reply(req, -EPROTO);
2323                 } else {
2324                         req->rq_status = ldlm_handle_convert0(req, dlm_req);
2325                         rc = ptlrpc_reply(req);
2326                 }
2327                 break;
2328         }
2329         default:
2330                 CERROR("invalid opcode %d\n",
2331                        lustre_msg_get_opc(req->rq_reqmsg));
2332                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
2333                 rc = ldlm_callback_reply(req, -EINVAL);
2334         }
2335
2336         RETURN(rc);
2337 }
2338
2339 static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req,
2340                                         struct ldlm_lock *lock)
2341 {
2342         struct ldlm_request *dlm_req;
2343         struct lustre_handle lockh;
2344         int rc = 0;
2345         int i;
2346
2347         ENTRY;
2348
2349         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2350         if (dlm_req == NULL)
2351                 RETURN(0);
2352
2353         ldlm_lock2handle(lock, &lockh);
2354         for (i = 0; i < dlm_req->lock_count; i++) {
2355                 if (lustre_handle_equal(&dlm_req->lock_handle[i],
2356                                         &lockh)) {
2357                         DEBUG_REQ(D_RPCTRACE, req,
2358                                   "Prio raised by lock %#llx.", lockh.cookie);
2359                         rc = 1;
2360                         break;
2361                 }
2362         }
2363
2364         RETURN(rc);
2365 }
2366
2367 static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req)
2368 {
2369         struct ldlm_request *dlm_req;
2370         int rc = 0;
2371         int i;
2372
2373         ENTRY;
2374
2375         /* no prolong in recovery */
2376         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
2377                 RETURN(0);
2378
2379         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2380         if (dlm_req == NULL)
2381                 RETURN(-EFAULT);
2382
2383         for (i = 0; i < dlm_req->lock_count; i++) {
2384                 struct ldlm_lock *lock;
2385
2386                 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
2387                 if (lock == NULL)
2388                         continue;
2389
2390                 rc = ldlm_is_ast_sent(lock) ? 1 : 0;
2391                 if (rc)
2392                         LDLM_DEBUG(lock, "hpreq cancel/convert lock");
2393                 LDLM_LOCK_PUT(lock);
2394
2395                 if (rc)
2396                         break;
2397         }
2398
2399         RETURN(rc);
2400 }
2401
2402 static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = {
2403         .hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
2404         .hpreq_check      = ldlm_cancel_hpreq_check,
2405         .hpreq_fini       = NULL,
2406 };
2407
2408 static int ldlm_hpreq_handler(struct ptlrpc_request *req)
2409 {
2410         ENTRY;
2411
2412         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2413
2414         if (req->rq_export == NULL)
2415                 RETURN(0);
2416
2417         if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) {
2418                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2419                 req->rq_ops = &ldlm_cancel_hpreq_ops;
2420         } else if (LDLM_CONVERT == lustre_msg_get_opc(req->rq_reqmsg)) {
2421                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
2422                 req->rq_ops = &ldlm_cancel_hpreq_ops;
2423         }
2424         RETURN(0);
2425 }
2426
2427 static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2428                                struct hlist_node *hnode, void *data)
2429
2430 {
2431         struct list_head         *rpc_list = data;
2432         struct ldlm_lock   *lock = cfs_hash_object(hs, hnode);
2433
2434         lock_res_and_lock(lock);
2435
2436         if (lock->l_req_mode != lock->l_granted_mode) {
2437                 unlock_res_and_lock(lock);
2438                 return 0;
2439         }
2440
2441         LASSERT(lock->l_resource);
2442         if (lock->l_resource->lr_type != LDLM_IBITS &&
2443             lock->l_resource->lr_type != LDLM_PLAIN) {
2444                 unlock_res_and_lock(lock);
2445                 return 0;
2446         }
2447
2448         if (ldlm_is_ast_sent(lock)) {
2449                 unlock_res_and_lock(lock);
2450                 return 0;
2451         }
2452
2453         LASSERT(lock->l_blocking_ast);
2454         LASSERT(!lock->l_blocking_lock);
2455
2456         ldlm_set_ast_sent(lock);
2457         if (lock->l_export && lock->l_export->exp_lock_hash) {
2458                 /* NB: it's safe to call cfs_hash_del() even lock isn't
2459                  * in exp_lock_hash. */
2460                 /* In the function below, .hs_keycmp resolves to
2461                  * ldlm_export_lock_keycmp() */
2462                 /* coverity[overrun-buffer-val] */
2463                 cfs_hash_del(lock->l_export->exp_lock_hash,
2464                              &lock->l_remote_handle, &lock->l_exp_hash);
2465         }
2466
2467         list_add_tail(&lock->l_rk_ast, rpc_list);
2468         LDLM_LOCK_GET(lock);
2469
2470         unlock_res_and_lock(lock);
2471         return 0;
2472 }
2473
2474 void ldlm_revoke_export_locks(struct obd_export *exp)
2475 {
2476         struct list_head  rpc_list;
2477         ENTRY;
2478
2479         INIT_LIST_HEAD(&rpc_list);
2480         cfs_hash_for_each_nolock(exp->exp_lock_hash,
2481                                  ldlm_revoke_lock_cb, &rpc_list, 0);
2482         ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
2483                           LDLM_WORK_REVOKE_AST);
2484
2485         EXIT;
2486 }
2487 EXPORT_SYMBOL(ldlm_revoke_export_locks);
2488 #endif /* HAVE_SERVER_SUPPORT */
2489
2490 static int ldlm_bl_get_work(struct ldlm_bl_pool *blp,
2491                             struct ldlm_bl_work_item **p_blwi,
2492                             struct obd_export **p_exp)
2493 {
2494         struct ldlm_bl_work_item *blwi = NULL;
2495         static unsigned int num_bl = 0;
2496         static unsigned int num_stale;
2497         int num_th = atomic_read(&blp->blp_num_threads);
2498
2499         *p_exp = obd_stale_export_get();
2500
2501         spin_lock(&blp->blp_lock);
2502         if (*p_exp != NULL) {
2503                 if (num_th == 1 || ++num_stale < num_th) {
2504                         spin_unlock(&blp->blp_lock);
2505                         return 1;
2506                 } else {
2507                         num_stale = 0;
2508                 }
2509         }
2510
2511         /* process a request from the blp_list at least every blp_num_threads */
2512         if (!list_empty(&blp->blp_list) &&
2513             (list_empty(&blp->blp_prio_list) || num_bl == 0))
2514                 blwi = list_entry(blp->blp_list.next,
2515                                   struct ldlm_bl_work_item, blwi_entry);
2516         else
2517                 if (!list_empty(&blp->blp_prio_list))
2518                         blwi = list_entry(blp->blp_prio_list.next,
2519                                           struct ldlm_bl_work_item,
2520                                           blwi_entry);
2521
2522         if (blwi) {
2523                 if (++num_bl >= num_th)
2524                         num_bl = 0;
2525                 list_del(&blwi->blwi_entry);
2526         }
2527         spin_unlock(&blp->blp_lock);
2528         *p_blwi = blwi;
2529
2530         if (*p_exp != NULL && *p_blwi != NULL) {
2531                 obd_stale_export_put(*p_exp);
2532                 *p_exp = NULL;
2533         }
2534
2535         return (*p_blwi != NULL || *p_exp != NULL) ? 1 : 0;
2536 }
2537
2538 /* This only contains temporary data until the thread starts */
2539 struct ldlm_bl_thread_data {
2540         struct ldlm_bl_pool     *bltd_blp;
2541         struct completion       bltd_comp;
2542         int                     bltd_num;
2543 };
2544
2545 static int ldlm_bl_thread_main(void *arg);
2546
2547 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp, bool check_busy)
2548 {
2549         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
2550         struct task_struct *task;
2551
2552         init_completion(&bltd.bltd_comp);
2553
2554         bltd.bltd_num = atomic_inc_return(&blp->blp_num_threads);
2555         if (bltd.bltd_num >= blp->blp_max_threads) {
2556                 atomic_dec(&blp->blp_num_threads);
2557                 return 0;
2558         }
2559
2560         LASSERTF(bltd.bltd_num > 0, "thread num:%d\n", bltd.bltd_num);
2561         if (check_busy &&
2562             atomic_read(&blp->blp_busy_threads) < (bltd.bltd_num - 1)) {
2563                 atomic_dec(&blp->blp_num_threads);
2564                 return 0;
2565         }
2566
2567         task = kthread_run(ldlm_bl_thread_main, &bltd, "ldlm_bl_%02d",
2568                            bltd.bltd_num);
2569         if (IS_ERR(task)) {
2570                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
2571                        bltd.bltd_num, PTR_ERR(task));
2572                 atomic_dec(&blp->blp_num_threads);
2573                 return PTR_ERR(task);
2574         }
2575         wait_for_completion(&bltd.bltd_comp);
2576
2577         return 0;
2578 }
2579
2580 /* Not fatal if racy and have a few too many threads */
2581 static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp,
2582                                       struct ldlm_bl_work_item *blwi)
2583 {
2584         if (atomic_read(&blp->blp_num_threads) >= blp->blp_max_threads)
2585                 return 0;
2586
2587         if (atomic_read(&blp->blp_busy_threads) <
2588             atomic_read(&blp->blp_num_threads))
2589                 return 0;
2590
2591         if (blwi != NULL && (blwi->blwi_ns == NULL ||
2592                              blwi->blwi_mem_pressure))
2593                 return 0;
2594
2595         return 1;
2596 }
2597
2598 static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
2599                                struct ldlm_bl_work_item *blwi)
2600 {
2601         ENTRY;
2602
2603         if (blwi->blwi_ns == NULL)
2604                 /* added by ldlm_cleanup() */
2605                 RETURN(LDLM_ITER_STOP);
2606
2607         if (blwi->blwi_mem_pressure)
2608                 memory_pressure_set();
2609
2610         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
2611
2612         if (blwi->blwi_count) {
2613                 int count;
2614                 /* The special case when we cancel locks in lru
2615                  * asynchronously, we pass the list of locks here.
2616                  * Thus locks are marked LDLM_FL_CANCELING, but NOT
2617                  * canceled locally yet. */
2618                 count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
2619                                                    blwi->blwi_count,
2620                                                    LCF_BL_AST);
2621                 ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
2622                                      blwi->blwi_flags);
2623         } else {
2624                 ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
2625                                         blwi->blwi_lock);
2626         }
2627         if (blwi->blwi_mem_pressure)
2628                 memory_pressure_clr();
2629
2630         if (blwi->blwi_flags & LCF_ASYNC)
2631                 OBD_FREE(blwi, sizeof(*blwi));
2632         else
2633                 complete(&blwi->blwi_comp);
2634
2635         RETURN(0);
2636 }
2637
2638 /**
2639  * Cancel stale locks on export. Cancel blocked locks first.
2640  * If the given export has blocked locks, the next in the list may have
2641  * them too, thus cancel not blocked locks only if the current export has
2642  * no blocked locks.
2643  **/
2644 static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp,
2645                                   struct obd_export *exp)
2646 {
2647         int num;
2648         ENTRY;
2649
2650         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4);
2651
2652         num = ldlm_export_cancel_blocked_locks(exp);
2653         if (num == 0)
2654                 ldlm_export_cancel_locks(exp);
2655
2656         obd_stale_export_put(exp);
2657
2658         RETURN(0);
2659 }
2660
2661
2662 /**
2663  * Main blocking requests processing thread.
2664  *
2665  * Callers put locks into its queue by calling ldlm_bl_to_thread.
2666  * This thread in the end ends up doing actual call to ->l_blocking_ast
2667  * for queued locks.
2668  */
2669 static int ldlm_bl_thread_main(void *arg)
2670 {
2671         struct ldlm_bl_pool *blp;
2672         struct ldlm_bl_thread_data *bltd = arg;
2673         ENTRY;
2674
2675         blp = bltd->bltd_blp;
2676
2677         complete(&bltd->bltd_comp);
2678         /* cannot use bltd after this, it is only on caller's stack */
2679
2680         while (1) {
2681                 struct l_wait_info lwi = { 0 };
2682                 struct ldlm_bl_work_item *blwi = NULL;
2683                 struct obd_export *exp = NULL;
2684                 int rc;
2685
2686                 rc = ldlm_bl_get_work(blp, &blwi, &exp);
2687
2688                 if (rc == 0)
2689                         l_wait_event_exclusive(blp->blp_waitq,
2690                                                ldlm_bl_get_work(blp, &blwi,
2691                                                                 &exp),
2692                                                &lwi);
2693                 atomic_inc(&blp->blp_busy_threads);
2694
2695                 if (ldlm_bl_thread_need_create(blp, blwi))
2696                         /* discard the return value, we tried */
2697                         ldlm_bl_thread_start(blp, true);
2698
2699                 if (exp)
2700                         rc = ldlm_bl_thread_exports(blp, exp);
2701                 else if (blwi)
2702                         rc = ldlm_bl_thread_blwi(blp, blwi);
2703
2704                 atomic_dec(&blp->blp_busy_threads);
2705
2706                 if (rc == LDLM_ITER_STOP)
2707                         break;
2708
2709                 /* If there are many namespaces, we will not sleep waiting for
2710                  * work, and must do a cond_resched to avoid holding the CPU
2711                  * for too long */
2712                 cond_resched();
2713         }
2714
2715         atomic_dec(&blp->blp_num_threads);
2716         complete(&blp->blp_comp);
2717         RETURN(0);
2718 }
2719
2720
2721 static int ldlm_setup(void);
2722 static int ldlm_cleanup(void);
2723
2724 int ldlm_get_ref(void)
2725 {
2726         int rc = 0;
2727         ENTRY;
2728         mutex_lock(&ldlm_ref_mutex);
2729         if (++ldlm_refcount == 1) {
2730                 rc = ldlm_setup();
2731                 if (rc)
2732                         ldlm_refcount--;
2733         }
2734         mutex_unlock(&ldlm_ref_mutex);
2735
2736         RETURN(rc);
2737 }
2738
2739 void ldlm_put_ref(void)
2740 {
2741         ENTRY;
2742         mutex_lock(&ldlm_ref_mutex);
2743         if (ldlm_refcount == 1) {
2744                 int rc = ldlm_cleanup();
2745                 if (rc)
2746                         CERROR("ldlm_cleanup failed: %d\n", rc);
2747                 else
2748                         ldlm_refcount--;
2749         } else {
2750                 ldlm_refcount--;
2751         }
2752         mutex_unlock(&ldlm_ref_mutex);
2753
2754         EXIT;
2755 }
2756
2757 /*
2758  * Export handle<->lock hash operations.
2759  */
2760 static unsigned
2761 ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
2762 {
2763         return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
2764 }
2765
2766 static void *
2767 ldlm_export_lock_key(struct hlist_node *hnode)
2768 {
2769         struct ldlm_lock *lock;
2770
2771         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2772         return &lock->l_remote_handle;
2773 }
2774
2775 static void
2776 ldlm_export_lock_keycpy(struct hlist_node *hnode, void *key)
2777 {
2778         struct ldlm_lock     *lock;
2779
2780         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2781         lock->l_remote_handle = *(struct lustre_handle *)key;
2782 }
2783
2784 static int
2785 ldlm_export_lock_keycmp(const void *key, struct hlist_node *hnode)
2786 {
2787         return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
2788 }
2789
2790 static void *
2791 ldlm_export_lock_object(struct hlist_node *hnode)
2792 {
2793         return hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2794 }
2795
2796 static void
2797 ldlm_export_lock_get(struct cfs_hash *hs, struct hlist_node *hnode)
2798 {
2799         struct ldlm_lock *lock;
2800
2801         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2802         LDLM_LOCK_GET(lock);
2803 }
2804
2805 static void
2806 ldlm_export_lock_put(struct cfs_hash *hs, struct hlist_node *hnode)
2807 {
2808         struct ldlm_lock *lock;
2809
2810         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
2811         LDLM_LOCK_RELEASE(lock);
2812 }
2813
2814 static struct cfs_hash_ops ldlm_export_lock_ops = {
2815         .hs_hash        = ldlm_export_lock_hash,
2816         .hs_key         = ldlm_export_lock_key,
2817         .hs_keycmp      = ldlm_export_lock_keycmp,
2818         .hs_keycpy      = ldlm_export_lock_keycpy,
2819         .hs_object      = ldlm_export_lock_object,
2820         .hs_get         = ldlm_export_lock_get,
2821         .hs_put         = ldlm_export_lock_put,
2822         .hs_put_locked  = ldlm_export_lock_put,
2823 };
2824
2825 int ldlm_init_export(struct obd_export *exp)
2826 {
2827         int rc;
2828         ENTRY;
2829
2830         exp->exp_lock_hash =
2831                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
2832                                 HASH_EXP_LOCK_CUR_BITS,
2833                                 HASH_EXP_LOCK_MAX_BITS,
2834                                 HASH_EXP_LOCK_BKT_BITS, 0,
2835                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
2836                                 &ldlm_export_lock_ops,
2837                                 CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY |
2838                                 CFS_HASH_NBLK_CHANGE);
2839
2840         if (!exp->exp_lock_hash)
2841                 RETURN(-ENOMEM);
2842
2843         rc = ldlm_init_flock_export(exp);
2844         if (rc)
2845                 GOTO(err, rc);
2846
2847         RETURN(0);
2848 err:
2849         ldlm_destroy_export(exp);
2850         RETURN(rc);
2851 }
2852 EXPORT_SYMBOL(ldlm_init_export);
2853
2854 void ldlm_destroy_export(struct obd_export *exp)
2855 {
2856         ENTRY;
2857         cfs_hash_putref(exp->exp_lock_hash);
2858         exp->exp_lock_hash = NULL;
2859
2860         ldlm_destroy_flock_export(exp);
2861         EXIT;
2862 }
2863 EXPORT_SYMBOL(ldlm_destroy_export);
2864
2865 static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj,
2866                                                       struct attribute *attr,
2867                                                       char *buf)
2868 {
2869         return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay);
2870 }
2871
2872 static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj,
2873                                                        struct attribute *attr,
2874                                                        const char *buffer,
2875                                                        size_t count)
2876 {
2877         int rc;
2878         unsigned long val;
2879
2880         rc = kstrtoul(buffer, 10, &val);
2881         if (rc)
2882                 return rc;
2883
2884         ldlm_cancel_unused_locks_before_replay = val;
2885
2886         return count;
2887 }
2888 LUSTRE_RW_ATTR(cancel_unused_locks_before_replay);
2889
2890 static struct attribute *ldlm_attrs[] = {
2891         &lustre_attr_cancel_unused_locks_before_replay.attr,
2892         NULL,
2893 };
2894
2895 static struct attribute_group ldlm_attr_group = {
2896         .attrs = ldlm_attrs,
2897 };
2898
2899 static int ldlm_setup(void)
2900 {
2901         static struct ptlrpc_service_conf       conf;
2902         struct ldlm_bl_pool                    *blp = NULL;
2903 #ifdef HAVE_SERVER_SUPPORT
2904         struct task_struct *task;
2905 #endif /* HAVE_SERVER_SUPPORT */
2906         int i;
2907         int rc = 0;
2908
2909         ENTRY;
2910
2911         if (ldlm_state != NULL)
2912                 RETURN(-EALREADY);
2913
2914         OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
2915         if (ldlm_state == NULL)
2916                 RETURN(-ENOMEM);
2917
2918         ldlm_kobj = kobject_create_and_add("ldlm", &lustre_kset->kobj);
2919         if (!ldlm_kobj)
2920                 GOTO(out, -ENOMEM);
2921
2922         rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group);
2923         if (rc)
2924                 GOTO(out, rc);
2925
2926         ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj);
2927         if (!ldlm_ns_kset)
2928                 GOTO(out, -ENOMEM);
2929
2930         ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj);
2931         if (!ldlm_svc_kset)
2932                 GOTO(out, -ENOMEM);
2933
2934         rc = ldlm_debugfs_setup();
2935         if (rc != 0)
2936                 GOTO(out, rc);
2937
2938         memset(&conf, 0, sizeof(conf));
2939         conf = (typeof(conf)) {
2940                 .psc_name               = "ldlm_cbd",
2941                 .psc_watchdog_factor    = 2,
2942                 .psc_buf                = {
2943                         .bc_nbufs               = LDLM_CLIENT_NBUFS,
2944                         .bc_buf_size            = LDLM_BUFSIZE,
2945                         .bc_req_max_size        = LDLM_MAXREQSIZE,
2946                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
2947                         .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
2948                         .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
2949                 },
2950                 .psc_thr                = {
2951                         .tc_thr_name            = "ldlm_cb",
2952                         .tc_thr_factor          = LDLM_THR_FACTOR,
2953                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
2954                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
2955                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
2956                         .tc_nthrs_user          = ldlm_num_threads,
2957                         .tc_cpu_affinity        = 1,
2958                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
2959                 },
2960                 .psc_cpt                = {
2961                         .cc_pattern             = ldlm_cpts,
2962                 },
2963                 .psc_ops                = {
2964                         .so_req_handler         = ldlm_callback_handler,
2965                 },
2966         };
2967         ldlm_state->ldlm_cb_service = \
2968                         ptlrpc_register_service(&conf, ldlm_svc_kset,
2969                                                 ldlm_svc_debugfs_dir);
2970         if (IS_ERR(ldlm_state->ldlm_cb_service)) {
2971                 CERROR("failed to start service\n");
2972                 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
2973                 ldlm_state->ldlm_cb_service = NULL;
2974                 GOTO(out, rc);
2975         }
2976
2977 #ifdef HAVE_SERVER_SUPPORT
2978         memset(&conf, 0, sizeof(conf));
2979         conf = (typeof(conf)) {
2980                 .psc_name               = "ldlm_canceld",
2981                 .psc_watchdog_factor    = 6,
2982                 .psc_buf                = {
2983                         .bc_nbufs               = LDLM_SERVER_NBUFS,
2984                         .bc_buf_size            = LDLM_BUFSIZE,
2985                         .bc_req_max_size        = LDLM_MAXREQSIZE,
2986                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
2987                         .bc_req_portal          = LDLM_CANCEL_REQUEST_PORTAL,
2988                         .bc_rep_portal          = LDLM_CANCEL_REPLY_PORTAL,
2989
2990                 },
2991                 .psc_thr                = {
2992                         .tc_thr_name            = "ldlm_cn",
2993                         .tc_thr_factor          = LDLM_THR_FACTOR,
2994                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
2995                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
2996                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
2997                         .tc_nthrs_user          = ldlm_num_threads,
2998                         .tc_cpu_affinity        = 1,
2999                         .tc_ctx_tags            = LCT_MD_THREAD | \
3000                                                   LCT_DT_THREAD | \
3001                                                   LCT_CL_THREAD,
3002                 },
3003                 .psc_cpt                = {
3004                         .cc_pattern             = ldlm_cpts,
3005                 },
3006                 .psc_ops                = {
3007                         .so_req_handler         = ldlm_cancel_handler,
3008                         .so_hpreq_handler       = ldlm_hpreq_handler,
3009                 },
3010         };
3011         ldlm_state->ldlm_cancel_service = \
3012                         ptlrpc_register_service(&conf, ldlm_svc_kset,
3013                                                 ldlm_svc_debugfs_dir);
3014         if (IS_ERR(ldlm_state->ldlm_cancel_service)) {
3015                 CERROR("failed to start service\n");
3016                 rc = PTR_ERR(ldlm_state->ldlm_cancel_service);
3017                 ldlm_state->ldlm_cancel_service = NULL;
3018                 GOTO(out, rc);
3019         }
3020 #endif /* HAVE_SERVER_SUPPORT */
3021
3022         OBD_ALLOC(blp, sizeof(*blp));
3023         if (blp == NULL)
3024                 GOTO(out, rc = -ENOMEM);
3025         ldlm_state->ldlm_bl_pool = blp;
3026
3027         spin_lock_init(&blp->blp_lock);
3028         INIT_LIST_HEAD(&blp->blp_list);
3029         INIT_LIST_HEAD(&blp->blp_prio_list);
3030         init_waitqueue_head(&blp->blp_waitq);
3031         atomic_set(&blp->blp_num_threads, 0);
3032         atomic_set(&blp->blp_busy_threads, 0);
3033
3034         if (ldlm_num_threads == 0) {
3035                 blp->blp_min_threads = LDLM_NTHRS_INIT;
3036                 blp->blp_max_threads = LDLM_NTHRS_MAX;
3037         } else {
3038                 blp->blp_min_threads = blp->blp_max_threads = \
3039                         min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
3040                                                          ldlm_num_threads));
3041         }
3042
3043         for (i = 0; i < blp->blp_min_threads; i++) {
3044                 rc = ldlm_bl_thread_start(blp, false);
3045                 if (rc < 0)
3046                         GOTO(out, rc);
3047         }
3048
3049 #ifdef HAVE_SERVER_SUPPORT
3050         task = kthread_run(expired_lock_main, NULL, "ldlm_elt");
3051         if (IS_ERR(task)) {
3052                 rc = PTR_ERR(task);
3053                 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
3054                 GOTO(out, rc);
3055         }
3056
3057         wait_event(expired_lock_wait_queue,
3058                    expired_lock_thread_state == ELT_READY);
3059 #endif /* HAVE_SERVER_SUPPORT */
3060
3061         rc = ldlm_pools_init();
3062         if (rc) {
3063                 CERROR("Failed to initialize LDLM pools: %d\n", rc);
3064                 GOTO(out, rc);
3065         }
3066
3067         rc = ldlm_reclaim_setup();
3068         if (rc) {
3069                 CERROR("Failed to setup reclaim thread: rc = %d\n", rc);
3070                 GOTO(out, rc);
3071         }
3072         RETURN(0);
3073
3074  out:
3075         ldlm_cleanup();
3076         RETURN(rc);
3077 }
3078
3079 static int ldlm_cleanup(void)
3080 {
3081         ENTRY;
3082
3083         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
3084             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
3085                 CERROR("ldlm still has namespaces; clean these up first.\n");
3086                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
3087                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
3088                 RETURN(-EBUSY);
3089         }
3090
3091         ldlm_reclaim_cleanup();
3092         ldlm_pools_fini();
3093
3094         if (ldlm_state->ldlm_bl_pool != NULL) {
3095                 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
3096
3097                 while (atomic_read(&blp->blp_num_threads) > 0) {
3098                         struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
3099
3100                         init_completion(&blp->blp_comp);
3101
3102                         spin_lock(&blp->blp_lock);
3103                         list_add_tail(&blwi.blwi_entry, &blp->blp_list);
3104                         wake_up(&blp->blp_waitq);
3105                         spin_unlock(&blp->blp_lock);
3106
3107                         wait_for_completion(&blp->blp_comp);
3108                 }
3109
3110                 OBD_FREE(blp, sizeof(*blp));
3111         }
3112
3113         if (ldlm_state->ldlm_cb_service != NULL)
3114                 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
3115 #ifdef HAVE_SERVER_SUPPORT
3116         if (ldlm_state->ldlm_cancel_service != NULL)
3117                 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
3118 #endif
3119
3120         if (ldlm_ns_kset)
3121                 kset_unregister(ldlm_ns_kset);
3122         if (ldlm_svc_kset)
3123                 kset_unregister(ldlm_svc_kset);
3124         if (ldlm_kobj) {
3125                 sysfs_remove_group(ldlm_kobj, &ldlm_attr_group);
3126                 kobject_put(ldlm_kobj);
3127         }
3128
3129         ldlm_debugfs_cleanup();
3130
3131 #ifdef HAVE_SERVER_SUPPORT
3132         if (expired_lock_thread_state != ELT_STOPPED) {
3133                 expired_lock_thread_state = ELT_TERMINATE;
3134                 wake_up(&expired_lock_wait_queue);
3135                 wait_event(expired_lock_wait_queue,
3136                            expired_lock_thread_state == ELT_STOPPED);
3137         }
3138 #endif
3139
3140         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
3141         ldlm_state = NULL;
3142
3143         RETURN(0);
3144 }
3145
3146 int ldlm_init(void)
3147 {
3148         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
3149                                                sizeof(struct ldlm_resource), 0,
3150                                                SLAB_HWCACHE_ALIGN, NULL);
3151         if (ldlm_resource_slab == NULL)
3152                 return -ENOMEM;
3153
3154         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
3155                               sizeof(struct ldlm_lock), 0,
3156                               SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU, NULL);
3157         if (ldlm_lock_slab == NULL)
3158                 goto out_resource;
3159
3160         ldlm_interval_slab = kmem_cache_create("interval_node",
3161                                         sizeof(struct ldlm_interval),
3162                                         0, SLAB_HWCACHE_ALIGN, NULL);
3163         if (ldlm_interval_slab == NULL)
3164                 goto out_lock;
3165
3166         ldlm_interval_tree_slab = kmem_cache_create("interval_tree",
3167                         sizeof(struct ldlm_interval_tree) * LCK_MODE_NUM,
3168                         0, SLAB_HWCACHE_ALIGN, NULL);
3169         if (ldlm_interval_tree_slab == NULL)
3170                 goto out_interval;
3171
3172 #ifdef HAVE_SERVER_SUPPORT
3173         ldlm_glimpse_work_kmem = kmem_cache_create("ldlm_glimpse_work_kmem",
3174                                         sizeof(struct ldlm_glimpse_work),
3175                                         0, 0, NULL);
3176         if (ldlm_glimpse_work_kmem == NULL)
3177                 goto out_interval_tree;
3178 #endif
3179
3180 #if LUSTRE_TRACKS_LOCK_EXP_REFS
3181         class_export_dump_hook = ldlm_dump_export_locks;
3182 #endif
3183         return 0;
3184 #ifdef HAVE_SERVER_SUPPORT
3185 out_interval_tree:
3186         kmem_cache_destroy(ldlm_interval_tree_slab);
3187 #endif
3188 out_interval:
3189         kmem_cache_destroy(ldlm_interval_slab);
3190 out_lock:
3191         kmem_cache_destroy(ldlm_lock_slab);
3192 out_resource:
3193         kmem_cache_destroy(ldlm_resource_slab);
3194
3195         return -ENOMEM;
3196 }
3197
3198 void ldlm_exit(void)
3199 {
3200         if (ldlm_refcount)
3201                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
3202         kmem_cache_destroy(ldlm_resource_slab);
3203         /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
3204          * synchronize_rcu() to wait a grace period elapsed, so that
3205          * ldlm_lock_free() get a chance to be called. */
3206         synchronize_rcu();
3207         kmem_cache_destroy(ldlm_lock_slab);
3208         kmem_cache_destroy(ldlm_interval_slab);
3209         kmem_cache_destroy(ldlm_interval_tree_slab);
3210 #ifdef HAVE_SERVER_SUPPORT
3211         kmem_cache_destroy(ldlm_glimpse_work_kmem);
3212 #endif
3213 }