Whamcloud - gitweb
LU-13783 procfs: fix improper prop_ops fields
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/ldlm/ldlm_lockd.c
32  *
33  * Author: Peter Braam <braam@clusterfs.com>
34  * Author: Phil Schwan <phil@clusterfs.com>
35  */
36
37 #define DEBUG_SUBSYSTEM S_LDLM
38
39 #include <linux/kthread.h>
40 #include <linux/list.h>
41 #include <libcfs/libcfs.h>
42 #include <libcfs/linux/linux-mem.h>
43 #include <lustre_errno.h>
44 #include <lustre_dlm.h>
45 #include <obd_class.h>
46 #include "ldlm_internal.h"
47
48 static int ldlm_num_threads;
49 module_param(ldlm_num_threads, int, 0444);
50 MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
51
52 static unsigned int ldlm_cpu_bind = 1;
53 module_param(ldlm_cpu_bind, uint, 0444);
54 MODULE_PARM_DESC(ldlm_cpu_bind,
55                  "bind DLM service threads to particular CPU partitions");
56
57 static char *ldlm_cpts;
58 module_param(ldlm_cpts, charp, 0444);
59 MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
60
61 static DEFINE_MUTEX(ldlm_ref_mutex);
62 static int ldlm_refcount;
63
64 struct kobject *ldlm_kobj;
65 struct kset *ldlm_ns_kset;
66 struct kset *ldlm_svc_kset;
67
68 /* LDLM state */
69
70 static struct ldlm_state *ldlm_state;
71
72 /*
73  * timeout for initial callback (AST) reply (bz10399)
74  * Due to having to send a 32 bit time value over the
75  * wire return it as timeout_t instead of time64_t
76  */
77 static inline timeout_t ldlm_get_rq_timeout(void)
78 {
79         /* Non-AT value */
80         timeout_t timeout = min(ldlm_timeout, obd_timeout / 3);
81
82         return timeout < 1 ? 1 : timeout;
83 }
84
85 struct ldlm_bl_pool {
86         spinlock_t blp_lock;
87
88         /*
89          * blp_prio_list is used for callbacks that should be handled
90          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
91          * see b=13843
92          */
93         struct list_head blp_prio_list;
94
95         /*
96          * blp_list is used for all other callbacks which are likely
97          * to take longer to process.
98          */
99         struct list_head blp_list;
100
101         wait_queue_head_t blp_waitq;
102         struct completion blp_comp;
103         atomic_t blp_num_threads;
104         atomic_t blp_busy_threads;
105         int blp_min_threads;
106         int blp_max_threads;
107 };
108
109 struct ldlm_bl_work_item {
110         struct list_head        blwi_entry;
111         struct ldlm_namespace   *blwi_ns;
112         struct ldlm_lock_desc   blwi_ld;
113         struct ldlm_lock        *blwi_lock;
114         struct list_head        blwi_head;
115         int                     blwi_count;
116         struct completion       blwi_comp;
117         enum ldlm_cancel_flags  blwi_flags;
118         int                     blwi_mem_pressure;
119 };
120
121 #ifdef HAVE_SERVER_SUPPORT
122
123 /**
124  * Protects both waiting_locks_list and expired_lock_thread.
125  */
126 static DEFINE_SPINLOCK(waiting_locks_spinlock); /* BH lock (timer) */
127
128 /**
129  * List for contended locks.
130  *
131  * As soon as a lock is contended, it gets placed on this list and
132  * expected time to get a response is filled in the lock. A special
133  * thread walks the list looking for locks that should be released and
134  * schedules client evictions for those that have not been released in
135  * time.
136  *
137  * All access to it should be under waiting_locks_spinlock.
138  */
139 static LIST_HEAD(waiting_locks_list);
140 static void waiting_locks_callback(TIMER_DATA_TYPE unused);
141 static CFS_DEFINE_TIMER(waiting_locks_timer, waiting_locks_callback, 0, 0);
142
143 enum elt_state {
144         ELT_STOPPED,
145         ELT_READY,
146         ELT_TERMINATE,
147 };
148
149 static DECLARE_WAIT_QUEUE_HEAD(expired_lock_wait_queue);
150 static enum elt_state expired_lock_thread_state = ELT_STOPPED;
151 static int expired_lock_dump;
152 static LIST_HEAD(expired_lock_list);
153
154 static int ldlm_lock_busy(struct ldlm_lock *lock);
155 static int ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout);
156 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout);
157
158 static inline int have_expired_locks(void)
159 {
160         int need_to_run;
161
162         ENTRY;
163         spin_lock_bh(&waiting_locks_spinlock);
164         need_to_run = !list_empty(&expired_lock_list);
165         spin_unlock_bh(&waiting_locks_spinlock);
166
167         RETURN(need_to_run);
168 }
169
170 /**
171  * Check expired lock list for expired locks and time them out.
172  */
173 static int expired_lock_main(void *arg)
174 {
175         struct list_head *expired = &expired_lock_list;
176         int do_dump;
177
178         ENTRY;
179
180         expired_lock_thread_state = ELT_READY;
181         wake_up(&expired_lock_wait_queue);
182
183         while (1) {
184                 wait_event_idle(expired_lock_wait_queue,
185                                 have_expired_locks() ||
186                                 expired_lock_thread_state == ELT_TERMINATE);
187
188                 spin_lock_bh(&waiting_locks_spinlock);
189                 if (expired_lock_dump) {
190                         spin_unlock_bh(&waiting_locks_spinlock);
191
192                         /* from waiting_locks_callback, but not in timer */
193                         libcfs_debug_dumplog();
194
195                         spin_lock_bh(&waiting_locks_spinlock);
196                         expired_lock_dump = 0;
197                 }
198
199                 do_dump = 0;
200
201                 while (!list_empty(expired)) {
202                         struct obd_export *export;
203                         struct ldlm_lock *lock;
204
205                         lock = list_entry(expired->next, struct ldlm_lock,
206                                           l_pending_chain);
207                         if ((void *)lock < LP_POISON + PAGE_SIZE &&
208                             (void *)lock >= LP_POISON) {
209                                 spin_unlock_bh(&waiting_locks_spinlock);
210                                 CERROR("free lock on elt list %p\n", lock);
211                                 LBUG();
212                         }
213                         list_del_init(&lock->l_pending_chain);
214                         if ((void *)lock->l_export <
215                              LP_POISON + PAGE_SIZE &&
216                             (void *)lock->l_export >= LP_POISON) {
217                                 CERROR("lock with free export on elt list %p\n",
218                                        lock->l_export);
219                                 lock->l_export = NULL;
220                                 LDLM_ERROR(lock, "free export");
221                                 /*
222                                  * release extra ref grabbed by
223                                  * ldlm_add_waiting_lock() or
224                                  * ldlm_failed_ast()
225                                  */
226                                 LDLM_LOCK_RELEASE(lock);
227                                 continue;
228                         }
229
230                         if (ldlm_is_destroyed(lock)) {
231                                 /*
232                                  * release the lock refcount where
233                                  * waiting_locks_callback() founds
234                                  */
235                                 LDLM_LOCK_RELEASE(lock);
236                                 continue;
237                         }
238                         export = class_export_lock_get(lock->l_export, lock);
239                         spin_unlock_bh(&waiting_locks_spinlock);
240
241                         /* Check if we need to prolong timeout */
242                         if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
243                             lock->l_callback_timestamp != 0 && /* not AST error */
244                             ldlm_lock_busy(lock)) {
245                                 LDLM_DEBUG(lock, "prolong the busy lock");
246                                 lock_res_and_lock(lock);
247                                 ldlm_add_waiting_lock(lock,
248                                                 ldlm_bl_timeout(lock) >> 1);
249                                 unlock_res_and_lock(lock);
250                         } else {
251                                 spin_lock_bh(&export->exp_bl_list_lock);
252                                 list_del_init(&lock->l_exp_list);
253                                 spin_unlock_bh(&export->exp_bl_list_lock);
254
255                                 LDLM_ERROR(lock,
256                                            "lock callback timer expired after %llds: evicting client at %s ",
257                                            ktime_get_seconds() -
258                                            lock->l_blast_sent,
259                                            obd_export_nid2str(export));
260                                 ldlm_lock_to_ns(lock)->ns_timeouts++;
261                                 do_dump++;
262                                 class_fail_export(export);
263                         }
264                         class_export_lock_put(export, lock);
265                         /*
266                          * release extra ref grabbed by ldlm_add_waiting_lock()
267                          * or ldlm_failed_ast()
268                          */
269                         LDLM_LOCK_RELEASE(lock);
270
271                         spin_lock_bh(&waiting_locks_spinlock);
272                 }
273                 spin_unlock_bh(&waiting_locks_spinlock);
274
275                 if (do_dump && obd_dump_on_eviction) {
276                         CERROR("dump the log upon eviction\n");
277                         libcfs_debug_dumplog();
278                 }
279
280                 if (expired_lock_thread_state == ELT_TERMINATE)
281                         break;
282         }
283
284         expired_lock_thread_state = ELT_STOPPED;
285         wake_up(&expired_lock_wait_queue);
286         RETURN(0);
287 }
288
289 /**
290  * Check if there is a request in the export request list
291  * which prevents the lock canceling.
292  */
293 static int ldlm_lock_busy(struct ldlm_lock *lock)
294 {
295         struct ptlrpc_request *req;
296         int match = 0;
297
298         ENTRY;
299
300         if (lock->l_export == NULL)
301                 return 0;
302
303         spin_lock(&lock->l_export->exp_rpc_lock);
304         list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
305                                 rq_exp_list) {
306                 if (req->rq_ops->hpreq_lock_match) {
307                         match = req->rq_ops->hpreq_lock_match(req, lock);
308                         if (match)
309                                 break;
310                 }
311         }
312         spin_unlock(&lock->l_export->exp_rpc_lock);
313         RETURN(match);
314 }
315
316 /* This is called from within a timer interrupt and cannot schedule */
317 static void waiting_locks_callback(TIMER_DATA_TYPE unused)
318 {
319         struct ldlm_lock *lock;
320         int need_dump = 0;
321
322         spin_lock_bh(&waiting_locks_spinlock);
323         while (!list_empty(&waiting_locks_list)) {
324                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
325                                   l_pending_chain);
326                 if (lock->l_callback_timestamp > ktime_get_seconds() ||
327                     lock->l_req_mode == LCK_GROUP)
328                         break;
329
330                 /*
331                  * no needs to take an extra ref on the lock since it was in
332                  * the waiting_locks_list and ldlm_add_waiting_lock()
333                  * already grabbed a ref
334                  */
335                 list_move(&lock->l_pending_chain, &expired_lock_list);
336                 need_dump = 1;
337         }
338
339         if (!list_empty(&expired_lock_list)) {
340                 if (obd_dump_on_timeout && need_dump)
341                         expired_lock_dump = __LINE__;
342
343                 wake_up(&expired_lock_wait_queue);
344         }
345
346         /*
347          * Make sure the timer will fire again if we have any locks
348          * left.
349          */
350         if (!list_empty(&waiting_locks_list)) {
351                 time64_t now = ktime_get_seconds();
352                 timeout_t delta = 0;
353
354                 lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
355                                   l_pending_chain);
356                 if (lock->l_callback_timestamp - now > 0)
357                         delta = lock->l_callback_timestamp - now;
358                 mod_timer(&waiting_locks_timer,
359                           jiffies + cfs_time_seconds(delta));
360         }
361         spin_unlock_bh(&waiting_locks_spinlock);
362 }
363
364 /**
365  * Add lock to the list of contended locks.
366  *
367  * Indicate that we're waiting for a client to call us back cancelling a given
368  * lock.  We add it to the pending-callback chain, and schedule the lock-timeout
369  * timer to fire appropriately.  (We round up to the next second, to avoid
370  * floods of timer firings during periods of high lock contention and traffic).
371  * As done by ldlm_add_waiting_lock(), the caller must grab a lock reference
372  * if it has been added to the waiting list (1 is returned).
373  *
374  * Called with the namespace lock held.
375  */
376 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t delay)
377 {
378         unsigned long timeout_jiffies = jiffies;
379         time64_t deadline;
380         timeout_t timeout;
381
382         lock->l_blast_sent = ktime_get_seconds();
383         if (!list_empty(&lock->l_pending_chain))
384                 return 0;
385
386         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
387             OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
388                 delay = 1;
389
390         deadline = lock->l_blast_sent + delay;
391         if (likely(deadline > lock->l_callback_timestamp))
392                 lock->l_callback_timestamp = deadline;
393
394         timeout = clamp_t(timeout_t,
395                           lock->l_callback_timestamp - lock->l_blast_sent,
396                           0, delay);
397         timeout_jiffies += cfs_time_seconds(timeout);
398
399         if (time_before(timeout_jiffies, waiting_locks_timer.expires) ||
400             !timer_pending(&waiting_locks_timer))
401                 mod_timer(&waiting_locks_timer, timeout_jiffies);
402
403         /*
404          * if the new lock has a shorter timeout than something earlier on
405          * the list, we'll wait the longer amount of time; no big deal.
406          */
407         /* FIFO */
408         list_add_tail(&lock->l_pending_chain, &waiting_locks_list);
409         return 1;
410 }
411
412 static void ldlm_add_blocked_lock(struct ldlm_lock *lock)
413 {
414         spin_lock_bh(&lock->l_export->exp_bl_list_lock);
415         if (list_empty(&lock->l_exp_list)) {
416                 if (!ldlm_is_granted(lock))
417                         list_add_tail(&lock->l_exp_list,
418                                       &lock->l_export->exp_bl_list);
419                 else
420                         list_add(&lock->l_exp_list,
421                                  &lock->l_export->exp_bl_list);
422         }
423         spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
424
425         /*
426          * A blocked lock is added. Adjust the position in
427          * the stale list if the export is in the list.
428          * If export is stale and not in the list - it is being
429          * processed and will be placed on the right position
430          * on obd_stale_export_put().
431          */
432         if (!list_empty(&lock->l_export->exp_stale_list))
433                 obd_stale_export_adjust(lock->l_export);
434 }
435
436 static int ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
437 {
438         int ret;
439
440         /* NB: must be called with hold of lock_res_and_lock() */
441         LASSERT(ldlm_is_res_locked(lock));
442         LASSERT(!ldlm_is_cancel_on_block(lock));
443
444         /*
445          * Do not put cross-MDT lock in the waiting list, since we
446          * will not evict it due to timeout for now
447          */
448         if (lock->l_export != NULL &&
449             (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS))
450                 return 0;
451
452         spin_lock_bh(&waiting_locks_spinlock);
453         if (ldlm_is_cancel(lock)) {
454                 spin_unlock_bh(&waiting_locks_spinlock);
455                 return 0;
456         }
457
458         if (ldlm_is_destroyed(lock)) {
459                 static time64_t next;
460
461                 spin_unlock_bh(&waiting_locks_spinlock);
462                 LDLM_ERROR(lock, "not waiting on destroyed lock (b=5653)");
463                 if (ktime_get_seconds() > next) {
464                         next = ktime_get_seconds() + 14400;
465                         libcfs_debug_dumpstack(NULL);
466                 }
467                 return 0;
468         }
469
470         ldlm_set_waited(lock);
471         ret = __ldlm_add_waiting_lock(lock, timeout);
472         if (ret) {
473                 /*
474                  * grab ref on the lock if it has been added to the
475                  * waiting list
476                  */
477                 LDLM_LOCK_GET(lock);
478         }
479         spin_unlock_bh(&waiting_locks_spinlock);
480
481         if (ret)
482                 ldlm_add_blocked_lock(lock);
483
484         LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
485                    ret == 0 ? "not re-" : "", timeout,
486                    AT_OFF ? "off" : "on");
487         return ret;
488 }
489
490 /**
491  * Remove a lock from the pending list, likely because it had its cancellation
492  * callback arrive without incident.  This adjusts the lock-timeout timer if
493  * needed.  Returns 0 if the lock wasn't pending after all, 1 if it was.
494  * As done by ldlm_del_waiting_lock(), the caller must release the lock
495  * reference when the lock is removed from any list (1 is returned).
496  *
497  * Called with namespace lock held.
498  */
499 static int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
500 {
501         struct list_head *list_next;
502
503         if (list_empty(&lock->l_pending_chain))
504                 return 0;
505
506         list_next = lock->l_pending_chain.next;
507         if (lock->l_pending_chain.prev == &waiting_locks_list) {
508                 /* Removing the head of the list, adjust timer. */
509                 if (list_next == &waiting_locks_list) {
510                         /* No more, just cancel. */
511                         del_timer(&waiting_locks_timer);
512                 } else {
513                         time64_t now = ktime_get_seconds();
514                         struct ldlm_lock *next;
515                         timeout_t delta = 0;
516
517                         next = list_entry(list_next, struct ldlm_lock,
518                                           l_pending_chain);
519                         if (next->l_callback_timestamp - now > 0)
520                                 delta = lock->l_callback_timestamp - now;
521
522                         mod_timer(&waiting_locks_timer,
523                                   jiffies + cfs_time_seconds(delta));
524                 }
525         }
526         list_del_init(&lock->l_pending_chain);
527
528         return 1;
529 }
530
531 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
532 {
533         int ret;
534
535         if (lock->l_export == NULL) {
536                 /* We don't have a "waiting locks list" on clients. */
537                 CDEBUG(D_DLMTRACE, "Client lock %p : no-op\n", lock);
538                 return 0;
539         }
540
541         spin_lock_bh(&waiting_locks_spinlock);
542         ret = __ldlm_del_waiting_lock(lock);
543         ldlm_clear_waited(lock);
544         spin_unlock_bh(&waiting_locks_spinlock);
545
546         /* remove the lock out of export blocking list */
547         spin_lock_bh(&lock->l_export->exp_bl_list_lock);
548         list_del_init(&lock->l_exp_list);
549         spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
550
551         if (ret) {
552                 /*
553                  * release lock ref if it has indeed been removed
554                  * from a list
555                  */
556                 LDLM_LOCK_RELEASE(lock);
557         }
558
559         LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
560         return ret;
561 }
562
563 /**
564  * Prolong the contended lock waiting time.
565  *
566  * Called with namespace lock held.
567  */
568 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
569 {
570         if (lock->l_export == NULL) {
571                 /* We don't have a "waiting locks list" on clients. */
572                 LDLM_DEBUG(lock, "client lock: no-op");
573                 return 0;
574         }
575
576         if (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS) {
577                 /* We don't have a "waiting locks list" on OSP. */
578                 LDLM_DEBUG(lock, "MDS-MDS lock: no-op");
579                 return 0;
580         }
581
582         spin_lock_bh(&waiting_locks_spinlock);
583
584         if (list_empty(&lock->l_pending_chain)) {
585                 spin_unlock_bh(&waiting_locks_spinlock);
586                 LDLM_DEBUG(lock, "wasn't waiting");
587                 return 0;
588         }
589
590         /*
591          * we remove/add the lock to the waiting list, so no needs to
592          * release/take a lock reference
593          */
594         __ldlm_del_waiting_lock(lock);
595         __ldlm_add_waiting_lock(lock, timeout);
596         spin_unlock_bh(&waiting_locks_spinlock);
597
598         LDLM_DEBUG(lock, "refreshed to %ds", timeout);
599         return 1;
600 }
601 EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
602
603 #else /* HAVE_SERVER_SUPPORT */
604
605 int ldlm_del_waiting_lock(struct ldlm_lock *lock)
606 {
607         RETURN(0);
608 }
609
610 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
611 {
612         RETURN(0);
613 }
614
615 #endif /* !HAVE_SERVER_SUPPORT */
616
617 #ifdef HAVE_SERVER_SUPPORT
618
619 /**
620  * Calculate the per-export Blocking timeout (covering BL AST, data flush,
621  * lock cancel, and their replies). Used for lock callback timeout and AST
622  * re-send period.
623  *
624  * \param[in] lock        lock which is getting the blocking callback
625  *
626  * \retval            timeout in seconds to wait for the client reply
627  */
628 timeout_t ldlm_bl_timeout(struct ldlm_lock *lock)
629 {
630         timeout_t timeout;
631
632         if (AT_OFF)
633                 return obd_timeout / 2;
634
635         /*
636          * Since these are non-updating timeouts, we should be conservative.
637          * Take more than usually, 150%
638          * It would be nice to have some kind of "early reply" mechanism for
639          * lock callbacks too...
640          */
641         timeout = at_get(&lock->l_export->exp_bl_lock_at);
642         return max_t(timeout_t, timeout + (timeout >> 1),
643                      (timeout_t)ldlm_enqueue_min);
644 }
645 EXPORT_SYMBOL(ldlm_bl_timeout);
646
647 /**
648  * Perform lock cleanup if AST sending failed.
649  */
650 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
651                             const char *ast_type)
652 {
653         LCONSOLE_ERROR_MSG(0x138,
654                            "%s: A client on nid %s was evicted due to a lock %s callback time out: rc %d\n",
655                            lock->l_export->exp_obd->obd_name,
656                            obd_export_nid2str(lock->l_export), ast_type, rc);
657
658         if (obd_dump_on_timeout)
659                 libcfs_debug_dumplog();
660         spin_lock_bh(&waiting_locks_spinlock);
661         if (__ldlm_del_waiting_lock(lock) == 0)
662                 /*
663                  * the lock was not in any list, grab an extra ref before adding
664                  * the lock to the expired list
665                  */
666                 LDLM_LOCK_GET(lock);
667         /* differentiate it from expired locks */
668         lock->l_callback_timestamp = 0;
669         list_add(&lock->l_pending_chain, &expired_lock_list);
670         wake_up(&expired_lock_wait_queue);
671         spin_unlock_bh(&waiting_locks_spinlock);
672 }
673
674 /**
675  * Perform lock cleanup if AST reply came with error.
676  */
677 static int ldlm_handle_ast_error(struct ldlm_lock *lock,
678                                  struct ptlrpc_request *req, int rc,
679                                  const char *ast_type)
680 {
681         struct lnet_process_id peer = req->rq_import->imp_connection->c_peer;
682
683         if (!req->rq_replied || (rc && rc != -EINVAL)) {
684                 if (ldlm_is_cancel(lock)) {
685                         LDLM_DEBUG(lock,
686                                    "%s AST (req@%p x%llu) timeout from nid %s, but cancel was received (AST reply lost?)",
687                                    ast_type, req, req->rq_xid,
688                                    libcfs_nid2str(peer.nid));
689                         ldlm_lock_cancel(lock);
690                         rc = -ERESTART;
691                 } else if (rc == -ENODEV || rc == -ESHUTDOWN ||
692                            (rc == -EIO &&
693                             req->rq_import->imp_state == LUSTRE_IMP_CLOSED)) {
694                         /*
695                          * Upon umount process the AST fails because cannot be
696                          * sent. This shouldn't lead to the client eviction.
697                          * -ENODEV error is returned by ptl_send_rpc() for
698                          *  new request in such import.
699                          * -SHUTDOWN is returned by ptlrpc_import_delay_req()
700                          *  if imp_invalid is set or obd_no_recov.
701                          * Meanwhile there is also check for LUSTRE_IMP_CLOSED
702                          * in ptlrpc_import_delay_req() as well with -EIO code.
703                          * In all such cases errors are ignored.
704                          */
705                         LDLM_DEBUG(lock,
706                                    "%s AST can't be sent due to a server %s failure or umount process: rc = %d\n",
707                                     ast_type,
708                                      req->rq_import->imp_obd->obd_name, rc);
709                 } else {
710                         LDLM_ERROR(lock,
711                                    "client (nid %s) %s %s AST (req@%p x%llu status %d rc %d), evict it",
712                                    libcfs_nid2str(peer.nid),
713                                    req->rq_replied ? "returned error from" :
714                                    "failed to reply to",
715                                    ast_type, req, req->rq_xid,
716                                    (req->rq_repmsg != NULL) ?
717                                    lustre_msg_get_status(req->rq_repmsg) : 0,
718                                    rc);
719                         ldlm_failed_ast(lock, rc, ast_type);
720                 }
721                 return rc;
722         }
723
724         if (rc == -EINVAL) {
725                 struct ldlm_resource *res = lock->l_resource;
726
727                 LDLM_DEBUG(lock,
728                            "client (nid %s) returned %d from %s AST (req@%p x%llu) - normal race",
729                            libcfs_nid2str(peer.nid),
730                            req->rq_repmsg ?
731                            lustre_msg_get_status(req->rq_repmsg) : -1,
732                            ast_type, req, req->rq_xid);
733                 if (res) {
734                         /*
735                          * update lvbo to return proper attributes.
736                          * see b=23174
737                          */
738                         ldlm_resource_getref(res);
739                         ldlm_lvbo_update(res, lock, NULL, 1);
740                         ldlm_resource_putref(res);
741                 }
742                 ldlm_lock_cancel(lock);
743                 rc = -ERESTART;
744         }
745
746         return rc;
747 }
748
749 static int ldlm_cb_interpret(const struct lu_env *env,
750                              struct ptlrpc_request *req, void *args, int rc)
751 {
752         struct ldlm_cb_async_args *ca = args;
753         struct ldlm_lock *lock = ca->ca_lock;
754         struct ldlm_cb_set_arg *arg  = ca->ca_set_arg;
755
756         ENTRY;
757
758         LASSERT(lock != NULL);
759
760         switch (arg->type) {
761         case LDLM_GL_CALLBACK:
762                 /*
763                  * Update the LVB from disk if the AST failed
764                  * (this is a legal race)
765                  *
766                  * - Glimpse callback of local lock just returns
767                  *   -ELDLM_NO_LOCK_DATA.
768                  * - Glimpse callback of remote lock might return
769                  *   -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274
770                  */
771                 if (unlikely(arg->gl_interpret_reply)) {
772                         rc = arg->gl_interpret_reply(NULL, req, args, rc);
773                 } else if (rc == -ELDLM_NO_LOCK_DATA) {
774                         LDLM_DEBUG(lock,
775                                    "lost race - client has a lock but no inode");
776                         ldlm_lvbo_update(lock->l_resource, lock, NULL, 1);
777                 } else if (rc != 0) {
778                         rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
779                 } else {
780                         rc = ldlm_lvbo_update(lock->l_resource,
781                                               lock, req, 1);
782                 }
783                 break;
784         case LDLM_BL_CALLBACK:
785                 if (rc != 0)
786                         rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
787                 break;
788         case LDLM_CP_CALLBACK:
789                 if (rc != 0)
790                         rc = ldlm_handle_ast_error(lock, req, rc, "completion");
791                 break;
792         default:
793                 LDLM_ERROR(lock, "invalid opcode for lock callback %d",
794                            arg->type);
795                 LBUG();
796         }
797
798         /* release extra reference taken in ldlm_ast_fini() */
799         LDLM_LOCK_RELEASE(lock);
800
801         if (rc == -ERESTART)
802                 atomic_inc(&arg->restart);
803
804         RETURN(0);
805 }
806
807 static void ldlm_update_resend(struct ptlrpc_request *req, void *data)
808 {
809         struct ldlm_cb_async_args *ca = data;
810         struct ldlm_lock *lock = ca->ca_lock;
811
812         ldlm_refresh_waiting_lock(lock, ldlm_bl_timeout(lock));
813 }
814
815 static inline int ldlm_ast_fini(struct ptlrpc_request *req,
816                                 struct ldlm_cb_set_arg *arg,
817                                 struct ldlm_lock *lock,
818                                 int instant_cancel)
819 {
820         int rc = 0;
821
822         ENTRY;
823
824         if (unlikely(instant_cancel)) {
825                 rc = ptl_send_rpc(req, 1);
826                 ptlrpc_req_finished(req);
827                 if (rc == 0)
828                         atomic_inc(&arg->restart);
829         } else {
830                 LDLM_LOCK_GET(lock);
831                 ptlrpc_set_add_req(arg->set, req);
832         }
833
834         RETURN(rc);
835 }
836
837 /**
838  * Check if there are requests in the export request list which prevent
839  * the lock canceling and make these requests high priority ones.
840  */
841 static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
842 {
843         struct ptlrpc_request *req;
844
845         ENTRY;
846
847         if (lock->l_export == NULL) {
848                 LDLM_DEBUG(lock, "client lock: no-op");
849                 RETURN_EXIT;
850         }
851
852         spin_lock(&lock->l_export->exp_rpc_lock);
853         list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
854                             rq_exp_list) {
855                 /*
856                  * Do not process requests that were not yet added to there
857                  * incoming queue or were already removed from there for
858                  * processing. We evaluate ptlrpc_nrs_req_can_move() without
859                  * holding svcpt->scp_req_lock, and then redo the check with
860                  * the lock held once we need to obtain a reliable result.
861                  */
862                 if (ptlrpc_nrs_req_can_move(req) &&
863                     req->rq_ops->hpreq_lock_match &&
864                     req->rq_ops->hpreq_lock_match(req, lock))
865                         ptlrpc_nrs_req_hp_move(req);
866         }
867         spin_unlock(&lock->l_export->exp_rpc_lock);
868         EXIT;
869 }
870
871 /**
872  * ->l_blocking_ast() method for server-side locks. This is invoked when newly
873  * enqueued server lock conflicts with given one.
874  *
875  * Sends blocking AST RPC to the client owning that lock; arms timeout timer
876  * to wait for client response.
877  */
878 int ldlm_server_blocking_ast(struct ldlm_lock *lock,
879                              struct ldlm_lock_desc *desc,
880                              void *data, int flag)
881 {
882         struct ldlm_cb_async_args *ca;
883         struct ldlm_cb_set_arg *arg = data;
884         struct ldlm_request *body;
885         struct ptlrpc_request  *req;
886         int instant_cancel = 0;
887         int rc = 0;
888
889         ENTRY;
890
891         if (flag == LDLM_CB_CANCELING)
892                 /* Don't need to do anything here. */
893                 RETURN(0);
894
895         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) {
896                 LDLM_DEBUG(lock, "dropping BL AST");
897                 RETURN(0);
898         }
899
900         LASSERT(lock);
901         LASSERT(data != NULL);
902         if (lock->l_export->exp_obd->obd_recovering != 0)
903                 LDLM_ERROR(lock, "BUG 6063: lock collide during recovery");
904
905         ldlm_lock_reorder_req(lock);
906
907         req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
908                                         &RQF_LDLM_BL_CALLBACK,
909                                         LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK);
910         if (req == NULL)
911                 RETURN(-ENOMEM);
912
913         ca = ptlrpc_req_async_args(ca, req);
914         ca->ca_set_arg = arg;
915         ca->ca_lock = lock;
916
917         req->rq_interpret_reply = ldlm_cb_interpret;
918
919         lock_res_and_lock(lock);
920         if (ldlm_is_destroyed(lock)) {
921                 /* What's the point? */
922                 unlock_res_and_lock(lock);
923                 ptlrpc_req_finished(req);
924                 RETURN(0);
925         }
926
927         if (!ldlm_is_granted(lock)) {
928                 /*
929                  * this blocking AST will be communicated as part of the
930                  * completion AST instead
931                  */
932                 ldlm_add_blocked_lock(lock);
933                 ldlm_set_waited(lock);
934                 unlock_res_and_lock(lock);
935
936                 ptlrpc_req_finished(req);
937                 LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
938                 RETURN(0);
939         }
940
941         if (ldlm_is_cancel_on_block(lock))
942                 instant_cancel = 1;
943
944         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
945         body->lock_handle[0] = lock->l_remote_handle;
946         body->lock_handle[1].cookie = lock->l_handle.h_cookie;
947         body->lock_desc = *desc;
948         body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_FL_AST_MASK);
949
950         LDLM_DEBUG(lock, "server preparing blocking AST");
951
952         ptlrpc_request_set_replen(req);
953         ldlm_set_cbpending(lock);
954         if (instant_cancel) {
955                 unlock_res_and_lock(lock);
956                 ldlm_lock_cancel(lock);
957
958                 req->rq_no_resend = 1;
959         } else {
960                 LASSERT(ldlm_is_granted(lock));
961                 ldlm_add_waiting_lock(lock, ldlm_bl_timeout(lock));
962                 unlock_res_and_lock(lock);
963
964                 /* Do not resend after lock callback timeout */
965                 req->rq_delay_limit = ldlm_bl_timeout(lock);
966                 req->rq_resend_cb = ldlm_update_resend;
967         }
968
969         req->rq_send_state = LUSTRE_IMP_FULL;
970         /* ptlrpc_request_alloc_pack already set timeout */
971         if (AT_OFF)
972                 req->rq_timeout = ldlm_get_rq_timeout();
973
974         if (lock->l_export && lock->l_export->exp_nid_stats &&
975             lock->l_export->exp_nid_stats->nid_ldlm_stats)
976                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
977                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
978
979         rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
980
981         RETURN(rc);
982 }
983
984 /**
985  * ->l_completion_ast callback for a remote lock in server namespace.
986  *
987  *  Sends AST to the client notifying it of lock granting.  If initial
988  *  lock response was not sent yet, instead of sending another RPC, just
989  *  mark the lock as granted and client will understand
990  */
991 int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
992 {
993         struct ldlm_cb_set_arg *arg = data;
994         struct ldlm_request *body;
995         struct ptlrpc_request *req;
996         struct ldlm_cb_async_args *ca;
997         int instant_cancel = 0;
998         int rc = 0;
999         int lvb_len;
1000
1001         ENTRY;
1002
1003         LASSERT(lock != NULL);
1004         LASSERT(data != NULL);
1005
1006         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_CP_AST)) {
1007                 LDLM_DEBUG(lock, "dropping CP AST");
1008                 RETURN(0);
1009         }
1010
1011         req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
1012                                    &RQF_LDLM_CP_CALLBACK);
1013         if (req == NULL)
1014                 RETURN(-ENOMEM);
1015
1016         /* server namespace, doesn't need lock */
1017         lvb_len = ldlm_lvbo_size(lock);
1018         /*
1019          * LU-3124 & LU-2187: to not return layout in completion AST because
1020          * it may deadlock for LU-2187, or client may not have enough space
1021          * for large layout. The layout will be returned to client with an
1022          * extra RPC to fetch xattr.lov
1023          */
1024         if (ldlm_has_layout(lock))
1025                 lvb_len = 0;
1026
1027         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, lvb_len);
1028         rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK);
1029         if (rc) {
1030                 ptlrpc_request_free(req);
1031                 RETURN(rc);
1032         }
1033
1034         ca = ptlrpc_req_async_args(ca, req);
1035         ca->ca_set_arg = arg;
1036         ca->ca_lock = lock;
1037
1038         req->rq_interpret_reply = ldlm_cb_interpret;
1039         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1040
1041         body->lock_handle[0] = lock->l_remote_handle;
1042         body->lock_handle[1].cookie = lock->l_handle.h_cookie;
1043         body->lock_flags = ldlm_flags_to_wire(flags);
1044         ldlm_lock2desc(lock, &body->lock_desc);
1045         if (lvb_len > 0) {
1046                 void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
1047                 lvb_len = ldlm_lvbo_fill(lock, lvb, &lvb_len);
1048                 if (lvb_len < 0) {
1049                         /*
1050                          * We still need to send the RPC to wake up the blocked
1051                          * enqueue thread on the client.
1052                          *
1053                          * Consider old client, there is no better way to notify
1054                          * the failure, just zero-sized the LVB, then the client
1055                          * will fail out as "-EPROTO".
1056                          */
1057                         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, 0,
1058                                            RCL_CLIENT);
1059                         instant_cancel = 1;
1060                 } else {
1061                         req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len,
1062                                            RCL_CLIENT);
1063                 }
1064         }
1065
1066         LDLM_DEBUG(lock, "server preparing completion AST");
1067
1068         ptlrpc_request_set_replen(req);
1069
1070         req->rq_send_state = LUSTRE_IMP_FULL;
1071         /* ptlrpc_request_pack already set timeout */
1072         if (AT_OFF)
1073                 req->rq_timeout = ldlm_get_rq_timeout();
1074
1075         /* We only send real blocking ASTs after the lock is granted */
1076         lock_res_and_lock(lock);
1077         if (ldlm_is_ast_sent(lock)) {
1078                 body->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT);
1079                 /* Copy AST flags like LDLM_FL_DISCARD_DATA. */
1080                 body->lock_flags |= ldlm_flags_to_wire(lock->l_flags &
1081                                                        LDLM_FL_AST_MASK);
1082
1083                 /*
1084                  * We might get here prior to ldlm_handle_enqueue setting
1085                  * LDLM_FL_CANCEL_ON_BLOCK flag. Then we will put this lock
1086                  * into waiting list, but this is safe and similar code in
1087                  * ldlm_handle_enqueue will call ldlm_lock_cancel() still,
1088                  * that would not only cancel the lock, but will also remove
1089                  * it from waiting list
1090                  */
1091                 if (ldlm_is_cancel_on_block(lock)) {
1092                         unlock_res_and_lock(lock);
1093                         ldlm_lock_cancel(lock);
1094
1095                         instant_cancel = 1;
1096                         req->rq_no_resend = 1;
1097
1098                         lock_res_and_lock(lock);
1099                 } else {
1100                         /* start the lock-timeout clock */
1101                         ldlm_add_waiting_lock(lock, ldlm_bl_timeout(lock));
1102                         /* Do not resend after lock callback timeout */
1103                         req->rq_delay_limit = ldlm_bl_timeout(lock);
1104                         req->rq_resend_cb = ldlm_update_resend;
1105                 }
1106         }
1107         unlock_res_and_lock(lock);
1108
1109         if (lock->l_export && lock->l_export->exp_nid_stats &&
1110             lock->l_export->exp_nid_stats->nid_ldlm_stats)
1111                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
1112                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
1113
1114         rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
1115
1116         RETURN(lvb_len < 0 ? lvb_len : rc);
1117 }
1118
1119 /**
1120  * Server side ->l_glimpse_ast handler for client locks.
1121  *
1122  * Sends glimpse AST to the client and waits for reply. Then updates
1123  * lvbo with the result.
1124  */
1125 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
1126 {
1127         struct ldlm_cb_set_arg *arg = data;
1128         struct ldlm_request *body;
1129         struct ptlrpc_request *req;
1130         struct ldlm_cb_async_args *ca;
1131         int rc;
1132         struct req_format *req_fmt;
1133
1134         ENTRY;
1135
1136         LASSERT(lock != NULL);
1137
1138         if (arg->gl_desc != NULL)
1139                 /* There is a glimpse descriptor to pack */
1140                 req_fmt = &RQF_LDLM_GL_CALLBACK_DESC;
1141         else
1142                 req_fmt = &RQF_LDLM_GL_CALLBACK;
1143
1144         req = ptlrpc_request_alloc_pack(lock->l_export->exp_imp_reverse,
1145                                         req_fmt, LUSTRE_DLM_VERSION,
1146                                         LDLM_GL_CALLBACK);
1147
1148         if (req == NULL)
1149                 RETURN(-ENOMEM);
1150
1151         if (arg->gl_desc != NULL) {
1152                 /* copy the GL descriptor */
1153                 union ldlm_gl_desc      *desc;
1154
1155                 desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
1156                 *desc = *arg->gl_desc;
1157         }
1158
1159         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1160         body->lock_handle[0] = lock->l_remote_handle;
1161         ldlm_lock2desc(lock, &body->lock_desc);
1162
1163         ca = ptlrpc_req_async_args(ca, req);
1164         ca->ca_set_arg = arg;
1165         ca->ca_lock = lock;
1166
1167         /* server namespace, doesn't need lock */
1168         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
1169                              ldlm_lvbo_size(lock));
1170         ptlrpc_request_set_replen(req);
1171
1172         req->rq_send_state = LUSTRE_IMP_FULL;
1173         /* ptlrpc_request_alloc_pack already set timeout */
1174         if (AT_OFF)
1175                 req->rq_timeout = ldlm_get_rq_timeout();
1176
1177         req->rq_interpret_reply = ldlm_cb_interpret;
1178
1179         if (lock->l_export && lock->l_export->exp_nid_stats) {
1180                 struct nid_stat *nid_stats = lock->l_export->exp_nid_stats;
1181
1182                 lprocfs_counter_incr(nid_stats->nid_ldlm_stats,
1183                                      LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
1184         }
1185
1186         rc = ldlm_ast_fini(req, arg, lock, 0);
1187
1188         RETURN(rc);
1189 }
1190 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
1191
1192 int ldlm_glimpse_locks(struct ldlm_resource *res,
1193                        struct list_head *gl_work_list)
1194 {
1195         int rc;
1196
1197         ENTRY;
1198
1199         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
1200                                LDLM_WORK_GL_AST);
1201         if (rc == -ERESTART)
1202                 ldlm_reprocess_all(res, NULL);
1203
1204         RETURN(rc);
1205 }
1206 EXPORT_SYMBOL(ldlm_glimpse_locks);
1207
1208 /* return LDLM lock associated with a lock callback request */
1209 struct ldlm_lock *ldlm_request_lock(struct ptlrpc_request *req)
1210 {
1211         struct ldlm_cb_async_args *ca;
1212         struct ldlm_lock *lock;
1213
1214         ENTRY;
1215
1216         ca = ptlrpc_req_async_args(ca, req);
1217         lock = ca->ca_lock;
1218         if (lock == NULL)
1219                 RETURN(ERR_PTR(-EFAULT));
1220
1221         RETURN(lock);
1222 }
1223 EXPORT_SYMBOL(ldlm_request_lock);
1224
1225 /**
1226  * Main server-side entry point into LDLM for enqueue. This is called by ptlrpc
1227  * service threads to carry out client lock enqueueing requests.
1228  */
1229 int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
1230                          struct ptlrpc_request *req,
1231                          const struct ldlm_request *dlm_req,
1232                          const struct ldlm_callback_suite *cbs)
1233 {
1234         struct ldlm_reply *dlm_rep;
1235         __u64 flags;
1236         enum ldlm_error err = ELDLM_OK;
1237         struct ldlm_lock *lock = NULL;
1238         void *cookie = NULL;
1239         int rc = 0;
1240         struct ldlm_resource *res = NULL;
1241         const struct lu_env *env = req->rq_svc_thread->t_env;
1242
1243         ENTRY;
1244
1245         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
1246
1247         ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF, LATF_SKIP);
1248         flags = ldlm_flags_from_wire(dlm_req->lock_flags);
1249
1250         LASSERT(req->rq_export);
1251
1252         /* for intent enqueue the stat will be updated inside intent policy */
1253         if (ptlrpc_req2svc(req)->srv_stats != NULL &&
1254             !(dlm_req->lock_flags & LDLM_FL_HAS_INTENT))
1255                 ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats);
1256
1257         if (req->rq_export && req->rq_export->exp_nid_stats &&
1258             req->rq_export->exp_nid_stats->nid_ldlm_stats)
1259                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1260                                      LDLM_ENQUEUE - LDLM_FIRST_OPC);
1261
1262         if (unlikely(dlm_req->lock_desc.l_resource.lr_type < LDLM_MIN_TYPE ||
1263                      dlm_req->lock_desc.l_resource.lr_type >= LDLM_MAX_TYPE)) {
1264                 DEBUG_REQ(D_ERROR, req, "invalid lock request type %d",
1265                           dlm_req->lock_desc.l_resource.lr_type);
1266                 GOTO(out, rc = -EFAULT);
1267         }
1268
1269         if (unlikely(dlm_req->lock_desc.l_req_mode <= LCK_MINMODE ||
1270                      dlm_req->lock_desc.l_req_mode >= LCK_MAXMODE ||
1271                      dlm_req->lock_desc.l_req_mode &
1272                      (dlm_req->lock_desc.l_req_mode-1))) {
1273                 DEBUG_REQ(D_ERROR, req, "invalid lock request mode %d",
1274                           dlm_req->lock_desc.l_req_mode);
1275                 GOTO(out, rc = -EFAULT);
1276         }
1277
1278         if (unlikely((flags & LDLM_FL_REPLAY) ||
1279                      (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))) {
1280                 /* Find an existing lock in the per-export lock hash */
1281                 /*
1282                  * In the function below, .hs_keycmp resolves to
1283                  * ldlm_export_lock_keycmp()
1284                  */
1285                 /* coverity[overrun-buffer-val] */
1286                 lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
1287                                        (void *)&dlm_req->lock_handle[0]);
1288                 if (lock != NULL) {
1289                         DEBUG_REQ(D_DLMTRACE, req,
1290                                   "found existing lock cookie %#llx",
1291                                   lock->l_handle.h_cookie);
1292                         flags |= LDLM_FL_RESENT;
1293                         GOTO(existing_lock, rc = 0);
1294                 }
1295         } else {
1296                 if (ldlm_reclaim_full()) {
1297                         DEBUG_REQ(D_DLMTRACE, req,
1298                                   "Too many granted locks, reject current enqueue request and let the client retry later");
1299                         GOTO(out, rc = -EINPROGRESS);
1300                 }
1301         }
1302
1303         /* The lock's callback data might be set in the policy function */
1304         lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
1305                                 dlm_req->lock_desc.l_resource.lr_type,
1306                                 dlm_req->lock_desc.l_req_mode,
1307                                 cbs, NULL, 0, LVB_T_NONE);
1308         if (IS_ERR(lock)) {
1309                 rc = PTR_ERR(lock);
1310                 lock = NULL;
1311                 GOTO(out, rc);
1312         }
1313
1314         lock->l_remote_handle = dlm_req->lock_handle[0];
1315         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
1316
1317         /*
1318          * Initialize resource lvb but not for a lock being replayed since
1319          * Client already got lvb sent in this case.
1320          * This must occur early since some policy methods assume resource
1321          * lvb is available (lr_lvb_data != NULL).
1322          */
1323         res = lock->l_resource;
1324         if (!(flags & LDLM_FL_REPLAY)) {
1325                 /* non-replayed lock, delayed lvb init may need to be done */
1326                 rc = ldlm_lvbo_init(res);
1327                 if (rc < 0) {
1328                         LDLM_DEBUG(lock, "delayed lvb init failed (rc %d)", rc);
1329                         GOTO(out, rc);
1330                 }
1331         }
1332
1333         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
1334         /*
1335          * Don't enqueue a lock onto the export if it is been disonnected
1336          * due to eviction (b=3822) or server umount (b=24324).
1337          * Cancel it now instead.
1338          */
1339         if (req->rq_export->exp_disconnected) {
1340                 LDLM_ERROR(lock, "lock on disconnected export %p",
1341                            req->rq_export);
1342                 GOTO(out, rc = -ENOTCONN);
1343         }
1344
1345         lock->l_export = class_export_lock_get(req->rq_export, lock);
1346         if (lock->l_export->exp_lock_hash)
1347                 cfs_hash_add(lock->l_export->exp_lock_hash,
1348                              &lock->l_remote_handle,
1349                              &lock->l_exp_hash);
1350
1351         /*
1352          * Inherit the enqueue flags before the operation, because we do not
1353          * keep the res lock on return and next operations (BL AST) may proceed
1354          * without them.
1355          */
1356         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
1357                                               LDLM_FL_INHERIT_MASK);
1358
1359         ldlm_convert_policy_to_local(req->rq_export,
1360                                      dlm_req->lock_desc.l_resource.lr_type,
1361                                      &dlm_req->lock_desc.l_policy_data,
1362                                      &lock->l_policy_data);
1363         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
1364                 lock->l_req_extent = lock->l_policy_data.l_extent;
1365         } else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) {
1366                 lock->l_policy_data.l_inodebits.try_bits =
1367                         dlm_req->lock_desc.l_policy_data.l_inodebits.try_bits;
1368                 lock->l_policy_data.l_inodebits.li_gid =
1369                         dlm_req->lock_desc.l_policy_data.l_inodebits.li_gid;
1370         }
1371
1372 existing_lock:
1373         cookie = req;
1374         if (!(flags & LDLM_FL_HAS_INTENT)) {
1375                 /* based on the assumption that lvb size never changes during
1376                  * resource life time otherwise it need resource->lr_lock's
1377                  * protection */
1378                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
1379                                      RCL_SERVER, ldlm_lvbo_size(lock));
1380
1381                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
1382                         GOTO(out, rc = -ENOMEM);
1383
1384                 rc = req_capsule_server_pack(&req->rq_pill);
1385                 if (rc)
1386                         GOTO(out, rc);
1387         }
1388
1389         err = ldlm_lock_enqueue(env, ns, &lock, cookie, &flags);
1390         if (err) {
1391                 if ((int)err < 0)
1392                         rc = (int)err;
1393                 GOTO(out, err);
1394         }
1395
1396         dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1397
1398         ldlm_lock2desc(lock, &dlm_rep->lock_desc);
1399         ldlm_lock2handle(lock, &dlm_rep->lock_handle);
1400
1401         if (lock && lock->l_resource->lr_type == LDLM_EXTENT)
1402                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
1403
1404         /*
1405          * We never send a blocking AST until the lock is granted, but
1406          * we can tell it right now
1407          */
1408         lock_res_and_lock(lock);
1409
1410         /*
1411          * Now take into account flags to be inherited from original lock
1412          * request both in reply to client and in our own lock flags.
1413          */
1414         dlm_rep->lock_flags = ldlm_flags_to_wire(flags);
1415         lock->l_flags |= flags & LDLM_FL_INHERIT_MASK;
1416
1417         /*
1418          * Don't move a pending lock onto the export if it has already been
1419          * disconnected due to eviction (b=5683) or server umount (b=24324).
1420          * Cancel it now instead.
1421          */
1422         if (unlikely(req->rq_export->exp_disconnected ||
1423                      OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) {
1424                 LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
1425                 rc = -ENOTCONN;
1426         } else if (ldlm_is_ast_sent(lock)) {
1427                 /* fill lock desc for possible lock convert */
1428                 if (lock->l_blocking_lock &&
1429                     lock->l_resource->lr_type == LDLM_IBITS) {
1430                         struct ldlm_lock *bl_lock = lock->l_blocking_lock;
1431                         struct ldlm_lock_desc *rep_desc = &dlm_rep->lock_desc;
1432
1433                         LDLM_DEBUG(lock,
1434                                    "save blocking bits %llx in granted lock",
1435                                    bl_lock->l_policy_data.l_inodebits.bits);
1436                         /*
1437                          * If lock is blocked then save blocking ibits
1438                          * in returned lock policy for the possible lock
1439                          * convert on a client.
1440                          */
1441                         rep_desc->l_policy_data.l_inodebits.cancel_bits =
1442                                 bl_lock->l_policy_data.l_inodebits.bits;
1443                 }
1444                 dlm_rep->lock_flags |= ldlm_flags_to_wire(LDLM_FL_AST_SENT);
1445                 if (ldlm_is_granted(lock)) {
1446                         /*
1447                          * Only cancel lock if it was granted, because it would
1448                          * be destroyed immediately and would never be granted
1449                          * in the future, causing timeouts on client.  Not
1450                          * granted lock will be cancelled immediately after
1451                          * sending completion AST.
1452                          */
1453                         if (ldlm_is_cancel_on_block(lock)) {
1454                                 unlock_res_and_lock(lock);
1455                                 ldlm_lock_cancel(lock);
1456                                 lock_res_and_lock(lock);
1457                         } else {
1458                                 ldlm_add_waiting_lock(lock,
1459                                                       ldlm_bl_timeout(lock));
1460                         }
1461                 }
1462         }
1463         unlock_res_and_lock(lock);
1464
1465         EXIT;
1466 out:
1467         req->rq_status = rc ?: err; /* return either error - b=11190 */
1468         if (!req->rq_packed_final) {
1469                 int rc1 = lustre_pack_reply(req, 1, NULL, NULL);
1470                 if (rc == 0)
1471                         rc = rc1;
1472         }
1473
1474         /*
1475          * The LOCK_CHANGED code in ldlm_lock_enqueue depends on this
1476          * ldlm_reprocess_all.  If this moves, revisit that code. -phil
1477          */
1478         if (lock != NULL) {
1479                 LDLM_DEBUG(lock,
1480                            "server-side enqueue handler, sending reply (err=%d, rc=%d)",
1481                            err, rc);
1482
1483                 if (rc == 0 &&
1484                     req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
1485                                           RCL_SERVER) &&
1486                     ldlm_lvbo_size(lock) > 0) {
1487                         void *buf;
1488                         int buflen;
1489
1490 retry:
1491                         buf = req_capsule_server_get(&req->rq_pill,
1492                                                      &RMF_DLM_LVB);
1493                         LASSERTF(buf != NULL, "req %p, lock %p\n", req, lock);
1494                         buflen = req_capsule_get_size(&req->rq_pill,
1495                                         &RMF_DLM_LVB, RCL_SERVER);
1496                         /*
1497                          * non-replayed lock, delayed lvb init may
1498                          * need to be occur now
1499                          */
1500                         if ((buflen > 0) && !(flags & LDLM_FL_REPLAY)) {
1501                                 int rc2;
1502
1503                                 rc2 = ldlm_lvbo_fill(lock, buf, &buflen);
1504                                 if (rc2 >= 0) {
1505                                         req_capsule_shrink(&req->rq_pill,
1506                                                            &RMF_DLM_LVB,
1507                                                            rc2, RCL_SERVER);
1508                                 } else if (rc2 == -ERANGE) {
1509                                         rc2 = req_capsule_server_grow(
1510                                                         &req->rq_pill,
1511                                                         &RMF_DLM_LVB, buflen);
1512                                         if (!rc2) {
1513                                                 goto retry;
1514                                         } else {
1515                                                 /*
1516                                                  * if we can't grow the buffer,
1517                                                  * it's ok to return empty lvb
1518                                                  * to client.
1519                                                  */
1520                                                 req_capsule_shrink(
1521                                                         &req->rq_pill,
1522                                                         &RMF_DLM_LVB, 0,
1523                                                         RCL_SERVER);
1524                                         }
1525                                 } else {
1526                                         rc = rc2;
1527                                 }
1528                         } else if (flags & LDLM_FL_REPLAY) {
1529                                 /* no LVB resend upon replay */
1530                                 if (buflen > 0)
1531                                         req_capsule_shrink(&req->rq_pill,
1532                                                            &RMF_DLM_LVB,
1533                                                            0, RCL_SERVER);
1534                                 else
1535                                         rc = buflen;
1536                         } else {
1537                                 rc = buflen;
1538                         }
1539                 }
1540
1541                 if (rc != 0 && !(flags & LDLM_FL_RESENT)) {
1542                         if (lock->l_export) {
1543                                 ldlm_lock_cancel(lock);
1544                         } else {
1545                                 lock_res_and_lock(lock);
1546                                 ldlm_resource_unlink_lock(lock);
1547                                 ldlm_lock_destroy_nolock(lock);
1548                                 unlock_res_and_lock(lock);
1549                         }
1550                         ldlm_reprocess_all(lock->l_resource, lock);
1551                 }
1552
1553                 if (!err && !ldlm_is_cbpending(lock) &&
1554                     dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
1555                         ldlm_reprocess_all(lock->l_resource, lock);
1556
1557                 LDLM_LOCK_RELEASE(lock);
1558         }
1559
1560         LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",
1561                           lock, rc);
1562
1563         return rc;
1564 }
1565
1566 /*
1567  * Clear the blocking lock, the race is possible between ldlm_handle_convert0()
1568  * and ldlm_work_bl_ast_lock(), so this is done under lock with check for NULL.
1569  */
1570 void ldlm_clear_blocking_lock(struct ldlm_lock *lock)
1571 {
1572         if (lock->l_blocking_lock) {
1573                 LDLM_LOCK_RELEASE(lock->l_blocking_lock);
1574                 lock->l_blocking_lock = NULL;
1575         }
1576 }
1577
1578 /* A lock can be converted to new ibits or mode and should be considered
1579  * as new lock. Clear all states related to a previous blocking AST
1580  * processing so new conflicts will cause new blocking ASTs.
1581  *
1582  * This is used during lock convert below and lock downgrade to COS mode in
1583  * ldlm_lock_mode_downgrade().
1584  */
1585 void ldlm_clear_blocking_data(struct ldlm_lock *lock)
1586 {
1587         ldlm_clear_ast_sent(lock);
1588         lock->l_bl_ast_run = 0;
1589         ldlm_clear_blocking_lock(lock);
1590 }
1591
1592 /**
1593  * Main LDLM entry point for server code to process lock conversion requests.
1594  */
1595 int ldlm_handle_convert0(struct ptlrpc_request *req,
1596                          const struct ldlm_request *dlm_req)
1597 {
1598         struct obd_export *exp = req->rq_export;
1599         struct ldlm_reply *dlm_rep;
1600         struct ldlm_lock *lock;
1601         __u64 bits;
1602         __u64 new_bits;
1603         int rc;
1604
1605         ENTRY;
1606
1607         if (exp && exp->exp_nid_stats && exp->exp_nid_stats->nid_ldlm_stats)
1608                 lprocfs_counter_incr(exp->exp_nid_stats->nid_ldlm_stats,
1609                                      LDLM_CONVERT - LDLM_FIRST_OPC);
1610
1611         rc = req_capsule_server_pack(&req->rq_pill);
1612         if (rc)
1613                 RETURN(rc);
1614
1615         dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1616         dlm_rep->lock_flags = dlm_req->lock_flags;
1617
1618         lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
1619         if (!lock) {
1620                 LDLM_DEBUG_NOLOCK("server lock is canceled already");
1621                 req->rq_status = ELDLM_NO_LOCK_DATA;
1622                 RETURN(0);
1623         }
1624
1625         LDLM_DEBUG(lock, "server-side convert handler START");
1626
1627         lock_res_and_lock(lock);
1628         bits = lock->l_policy_data.l_inodebits.bits;
1629         new_bits = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
1630
1631         if (ldlm_is_cancel(lock)) {
1632                 LDLM_DEBUG(lock, "convert on canceled lock!");
1633                 unlock_res_and_lock(lock);
1634                 GOTO(out_put, rc = ELDLM_NO_LOCK_DATA);
1635         }
1636
1637         if (dlm_req->lock_desc.l_req_mode != lock->l_granted_mode) {
1638                 LDLM_ERROR(lock, "lock mode differs!");
1639                 unlock_res_and_lock(lock);
1640                 GOTO(out_put, rc = -EPROTO);
1641         }
1642
1643         if (bits == new_bits) {
1644                 /*
1645                  * This can be valid situation if CONVERT RPCs are
1646                  * re-ordered. Just finish silently
1647                  */
1648                 LDLM_DEBUG(lock, "lock is converted already!");
1649                 unlock_res_and_lock(lock);
1650         } else {
1651                 if (ldlm_is_waited(lock))
1652                         ldlm_del_waiting_lock(lock);
1653
1654                 ldlm_clear_cbpending(lock);
1655                 lock->l_policy_data.l_inodebits.cancel_bits = 0;
1656                 ldlm_inodebits_drop(lock, bits & ~new_bits);
1657
1658                 ldlm_clear_blocking_data(lock);
1659                 unlock_res_and_lock(lock);
1660
1661                 ldlm_reprocess_all(lock->l_resource, NULL);
1662         }
1663
1664         dlm_rep->lock_handle = lock->l_remote_handle;
1665         ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
1666                                         &dlm_rep->lock_desc.l_policy_data);
1667         rc = ELDLM_OK;
1668         EXIT;
1669 out_put:
1670         LDLM_DEBUG(lock, "server-side convert handler END, rc = %d", rc);
1671         LDLM_LOCK_PUT(lock);
1672         req->rq_status = rc;
1673         return 0;
1674 }
1675
1676 /**
1677  * Cancel all the locks whose handles are packed into ldlm_request
1678  *
1679  * Called by server code expecting such combined cancel activity
1680  * requests.
1681  */
1682 int ldlm_request_cancel(struct ptlrpc_request *req,
1683                         const struct ldlm_request *dlm_req,
1684                         int first, enum lustre_at_flags flags)
1685 {
1686         struct ldlm_resource *res, *pres = NULL;
1687         struct ldlm_lock *lock;
1688         int i, count, done = 0;
1689         unsigned int size;
1690
1691         ENTRY;
1692
1693         size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT);
1694         if (size <= offsetof(struct ldlm_request, lock_handle) ||
1695             (size - offsetof(struct ldlm_request, lock_handle)) /
1696              sizeof(struct lustre_handle) < dlm_req->lock_count)
1697                 RETURN(0);
1698
1699         count = dlm_req->lock_count ? dlm_req->lock_count : 1;
1700         if (first >= count)
1701                 RETURN(0);
1702
1703         if (count == 1 && dlm_req->lock_handle[0].cookie == 0)
1704                 RETURN(0);
1705
1706         /*
1707          * There is no lock on the server at the replay time,
1708          * skip lock cancelling to make replay tests to pass.
1709          */
1710         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1711                 RETURN(0);
1712
1713         LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, starting at %d",
1714                           count, first);
1715
1716         for (i = first; i < count; i++) {
1717                 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
1718                 if (!lock) {
1719                         /* below message checked in replay-single.sh test_36 */
1720                         LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (cookie %llu)",
1721                                           dlm_req->lock_handle[i].cookie);
1722                         continue;
1723                 }
1724
1725                 res = lock->l_resource;
1726                 done++;
1727
1728                 /*
1729                  * This code is an optimization to only attempt lock
1730                  * granting on the resource (that could be CPU-expensive)
1731                  * after we are done cancelling lock in that resource.
1732                  */
1733                 if (res != pres) {
1734                         if (pres != NULL) {
1735                                 ldlm_reprocess_all(pres, NULL);
1736                                 LDLM_RESOURCE_DELREF(pres);
1737                                 ldlm_resource_putref(pres);
1738                         }
1739                         if (res != NULL) {
1740                                 ldlm_resource_getref(res);
1741                                 LDLM_RESOURCE_ADDREF(res);
1742
1743                                 if (!ldlm_is_discard_data(lock))
1744                                         ldlm_lvbo_update(res, lock,
1745                                                          NULL, 1);
1746                         }
1747                         pres = res;
1748                 }
1749
1750                 if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock) &&
1751                     lock->l_blast_sent != 0) {
1752                         timeout_t delay = 0;
1753
1754                         if (ktime_get_seconds() > lock->l_blast_sent)
1755                                 delay = ktime_get_seconds() -
1756                                         lock->l_blast_sent;
1757                         LDLM_DEBUG(lock,
1758                                    "server cancels blocked lock after %ds",
1759                                    delay);
1760                         at_measured(&lock->l_export->exp_bl_lock_at, delay);
1761                 }
1762                 ldlm_lock_cancel(lock);
1763                 LDLM_LOCK_PUT(lock);
1764         }
1765         if (pres != NULL) {
1766                 ldlm_reprocess_all(pres, NULL);
1767                 LDLM_RESOURCE_DELREF(pres);
1768                 ldlm_resource_putref(pres);
1769         }
1770         LDLM_DEBUG_NOLOCK("server-side cancel handler END");
1771         RETURN(done);
1772 }
1773 EXPORT_SYMBOL(ldlm_request_cancel);
1774
1775 /**
1776  * Main LDLM entry point for server code to cancel locks.
1777  *
1778  * Typically gets called from service handler on LDLM_CANCEL opc.
1779  */
1780 int ldlm_handle_cancel(struct ptlrpc_request *req)
1781 {
1782         struct ldlm_request *dlm_req;
1783         int rc;
1784
1785         ENTRY;
1786
1787         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1788         if (dlm_req == NULL) {
1789                 CDEBUG(D_INFO, "bad request buffer for cancel\n");
1790                 RETURN(-EFAULT);
1791         }
1792
1793         if (req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) <
1794             offsetof(struct ldlm_request, lock_handle[1]))
1795                 RETURN(-EPROTO);
1796
1797         if (req->rq_export && req->rq_export->exp_nid_stats &&
1798             req->rq_export->exp_nid_stats->nid_ldlm_stats)
1799                 lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
1800                                      LDLM_CANCEL - LDLM_FIRST_OPC);
1801
1802         rc = req_capsule_server_pack(&req->rq_pill);
1803         if (rc)
1804                 RETURN(rc);
1805
1806         if (!ldlm_request_cancel(req, dlm_req, 0, LATF_STATS))
1807                 req->rq_status = LUSTRE_ESTALE;
1808
1809         RETURN(ptlrpc_reply(req));
1810 }
1811 #endif /* HAVE_SERVER_SUPPORT */
1812
1813 /**
1814  * Server may pass additional information about blocking lock.
1815  * For IBITS locks it is conflicting bits which can be used for
1816  * lock convert instead of cancel.
1817  */
1818 void ldlm_bl_desc2lock(const struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
1819 {
1820         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
1821
1822         check_res_locked(lock->l_resource);
1823         if (ns_is_client(ns) && ld &&
1824             (lock->l_resource->lr_type == LDLM_IBITS)) {
1825                 /*
1826                  * Lock description contains policy of blocking lock,
1827                  * and its cancel_bits is used to pass conflicting bits.
1828                  * NOTE: ld can be NULL or can be not NULL but zeroed if
1829                  * passed from ldlm_bl_thread_blwi(), check below used bits
1830                  * in ld to make sure it is valid description.
1831                  *
1832                  * If server may replace lock resource keeping the same cookie,
1833                  * never use cancel bits from different resource, full cancel
1834                  * is to be used.
1835                  */
1836                 if (ld->l_policy_data.l_inodebits.cancel_bits &&
1837                     ldlm_res_eq(&ld->l_resource.lr_name,
1838                                 &lock->l_resource->lr_name) &&
1839                     !(ldlm_is_cbpending(lock) &&
1840                       lock->l_policy_data.l_inodebits.cancel_bits == 0)) {
1841                         /* always combine conflicting ibits */
1842                         lock->l_policy_data.l_inodebits.cancel_bits |=
1843                                 ld->l_policy_data.l_inodebits.cancel_bits;
1844                 } else {
1845                         /* If cancel_bits are not obtained or
1846                          * if the lock is already CBPENDING and
1847                          * has no cancel_bits set
1848                          * - the full lock is to be cancelled
1849                          */
1850                         lock->l_policy_data.l_inodebits.cancel_bits = 0;
1851                 }
1852         }
1853 }
1854
1855 /**
1856  * Callback handler for receiving incoming blocking ASTs.
1857  *
1858  * This can only happen on client side.
1859  */
1860 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
1861                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
1862 {
1863         int do_ast;
1864
1865         ENTRY;
1866
1867         LDLM_DEBUG(lock, "client blocking AST callback handler");
1868
1869         lock_res_and_lock(lock);
1870
1871         /* get extra information from desc if any */
1872         ldlm_bl_desc2lock(ld, lock);
1873         ldlm_set_cbpending(lock);
1874
1875         do_ast = (!lock->l_readers && !lock->l_writers);
1876         unlock_res_and_lock(lock);
1877
1878         if (do_ast) {
1879                 CDEBUG(D_DLMTRACE,
1880                        "Lock %p already unused, calling callback (%p)\n",
1881                        lock, lock->l_blocking_ast);
1882                 if (lock->l_blocking_ast != NULL)
1883                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
1884                                              LDLM_CB_BLOCKING);
1885         } else {
1886                 CDEBUG(D_DLMTRACE,
1887                        "Lock %p is referenced, will be cancelled later\n",
1888                        lock);
1889         }
1890
1891         LDLM_DEBUG(lock, "client blocking callback handler END");
1892         LDLM_LOCK_RELEASE(lock);
1893         EXIT;
1894 }
1895
1896 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
1897 {
1898         if (req->rq_no_reply)
1899                 return 0;
1900
1901         req->rq_status = rc;
1902         if (!req->rq_packed_final) {
1903                 rc = lustre_pack_reply(req, 1, NULL, NULL);
1904                 if (rc)
1905                         return rc;
1906         }
1907         return ptlrpc_reply(req);
1908 }
1909
1910 /**
1911  * Callback handler for receiving incoming completion ASTs.
1912  *
1913  * This only can happen on client side.
1914  */
1915 static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
1916                                     struct ldlm_namespace *ns,
1917                                     struct ldlm_request *dlm_req,
1918                                     struct ldlm_lock *lock)
1919 {
1920         LIST_HEAD(ast_list);
1921         int lvb_len;
1922         int rc = 0;
1923
1924         ENTRY;
1925
1926         LDLM_DEBUG(lock, "client completion callback handler START");
1927
1928         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
1929                 long to = cfs_time_seconds(1);
1930
1931                 ldlm_callback_reply(req, 0);
1932
1933                 while (to > 0) {
1934                         to = schedule_timeout_interruptible(to);
1935                         if (ldlm_is_granted(lock) ||
1936                             ldlm_is_destroyed(lock))
1937                                 break;
1938                 }
1939         }
1940
1941         lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
1942         if (lvb_len < 0) {
1943                 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
1944                 GOTO(out, rc = lvb_len);
1945         } else if (lvb_len > 0) {
1946                 if (lock->l_lvb_len > 0) {
1947                         /* for extent lock, lvb contains ost_lvb{}. */
1948                         LASSERT(lock->l_lvb_data != NULL);
1949
1950                         if (unlikely(lock->l_lvb_len < lvb_len)) {
1951                                 LDLM_ERROR(lock,
1952                                            "Replied LVB is larger than expectation, expected = %d, replied = %d",
1953                                            lock->l_lvb_len, lvb_len);
1954                                 GOTO(out, rc = -EINVAL);
1955                         }
1956                 }
1957         }
1958
1959         lock_res_and_lock(lock);
1960
1961         if (!ldlm_res_eq(&dlm_req->lock_desc.l_resource.lr_name,
1962                          &lock->l_resource->lr_name)) {
1963                 ldlm_resource_unlink_lock(lock);
1964                 unlock_res_and_lock(lock);
1965                 rc = ldlm_lock_change_resource(ns, lock,
1966                                 &dlm_req->lock_desc.l_resource.lr_name);
1967                 if (rc < 0) {
1968                         LDLM_ERROR(lock, "Failed to allocate resource");
1969                         GOTO(out, rc);
1970                 }
1971                 LDLM_DEBUG(lock, "completion AST, new resource");
1972                 lock_res_and_lock(lock);
1973         }
1974
1975         if (ldlm_is_failed(lock)) {
1976                 unlock_res_and_lock(lock);
1977                 LDLM_LOCK_RELEASE(lock);
1978                 RETURN(-EINVAL);
1979         }
1980
1981         if (ldlm_is_destroyed(lock) ||
1982             ldlm_is_granted(lock)) {
1983                 /* b=11300: the lock has already been granted */
1984                 unlock_res_and_lock(lock);
1985                 LDLM_DEBUG(lock, "Double grant race happened");
1986                 GOTO(out, rc = 0);
1987         }
1988
1989         /*
1990          * If we receive the completion AST before the actual enqueue returned,
1991          * then we might need to switch lock modes, resources, or extents.
1992          */
1993         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
1994                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
1995                 LDLM_DEBUG(lock, "completion AST, new lock mode");
1996         }
1997
1998         if (lock->l_resource->lr_type != LDLM_PLAIN) {
1999                 ldlm_convert_policy_to_local(req->rq_export,
2000                                           dlm_req->lock_desc.l_resource.lr_type,
2001                                           &dlm_req->lock_desc.l_policy_data,
2002                                           &lock->l_policy_data);
2003                 LDLM_DEBUG(lock, "completion AST, new policy data");
2004         }
2005
2006         ldlm_resource_unlink_lock(lock);
2007
2008         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
2009                 /*
2010                  * BL_AST locks are not needed in LRU.
2011                  * Let ldlm_cancel_lru() be fast.
2012                  */
2013                 ldlm_lock_remove_from_lru(lock);
2014                 ldlm_bl_desc2lock(&dlm_req->lock_desc, lock);
2015                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
2016                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
2017         }
2018
2019         if (lock->l_lvb_len > 0) {
2020                 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
2021                                    lock->l_lvb_data, lvb_len);
2022                 if (rc < 0) {
2023                         unlock_res_and_lock(lock);
2024                         GOTO(out, rc);
2025                 }
2026         }
2027
2028         ldlm_grant_lock(lock, &ast_list);
2029         unlock_res_and_lock(lock);
2030
2031         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
2032
2033         /*
2034          * Let Enqueue to call osc_lock_upcall() and initialize
2035          * l_ast_data
2036          */
2037         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
2038
2039         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
2040
2041         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
2042                           lock);
2043         GOTO(out, rc);
2044
2045 out:
2046         if (rc < 0) {
2047                 lock_res_and_lock(lock);
2048                 ldlm_set_failed(lock);
2049                 unlock_res_and_lock(lock);
2050                 wake_up(&lock->l_waitq);
2051         }
2052         LDLM_LOCK_RELEASE(lock);
2053
2054         return 0;
2055 }
2056
2057 /**
2058  * Callback handler for receiving incoming glimpse ASTs.
2059  *
2060  * This only can happen on client side.  After handling the glimpse AST
2061  * we also consider dropping the lock here if it is unused locally for a
2062  * long time.
2063  */
2064 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
2065                                     struct ldlm_namespace *ns,
2066                                     struct ldlm_request *dlm_req,
2067                                     struct ldlm_lock *lock)
2068 {
2069         struct ldlm_lock_desc *ld = &dlm_req->lock_desc;
2070         int rc = -ENOSYS;
2071
2072         ENTRY;
2073
2074         LDLM_DEBUG(lock, "client glimpse AST callback handler");
2075
2076         if (lock->l_glimpse_ast != NULL)
2077                 rc = lock->l_glimpse_ast(lock, req);
2078
2079         if (req->rq_repmsg != NULL) {
2080                 ptlrpc_reply(req);
2081         } else {
2082                 req->rq_status = rc;
2083                 ptlrpc_error(req);
2084         }
2085
2086         lock_res_and_lock(lock);
2087         if (lock->l_granted_mode == LCK_PW &&
2088             !lock->l_readers && !lock->l_writers &&
2089             ktime_after(ktime_get(),
2090                         ktime_add(lock->l_last_used, ns->ns_dirty_age_limit))) {
2091                 unlock_res_and_lock(lock);
2092
2093                 /* For MDS glimpse it is always DOM lock, set corresponding
2094                  * cancel_bits to perform lock convert if needed
2095                  */
2096                 if (lock->l_resource->lr_type == LDLM_IBITS)
2097                         ld->l_policy_data.l_inodebits.cancel_bits =
2098                                                         MDS_INODELOCK_DOM;
2099                 if (ldlm_bl_to_thread_lock(ns, ld, lock))
2100                         ldlm_handle_bl_callback(ns, ld, lock);
2101
2102                 EXIT;
2103                 return;
2104         }
2105         unlock_res_and_lock(lock);
2106         LDLM_LOCK_RELEASE(lock);
2107         EXIT;
2108 }
2109
2110 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
2111                                enum ldlm_cancel_flags cancel_flags)
2112 {
2113         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
2114
2115         ENTRY;
2116
2117         spin_lock(&blp->blp_lock);
2118         if (blwi->blwi_lock &&
2119             ldlm_is_discard_data(blwi->blwi_lock)) {
2120                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
2121                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
2122         } else {
2123                 /* other blocking callbacks are added to the regular list */
2124                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
2125         }
2126         spin_unlock(&blp->blp_lock);
2127
2128         wake_up(&blp->blp_waitq);
2129
2130         /*
2131          * can not check blwi->blwi_flags as blwi could be already freed in
2132          * LCF_ASYNC mode
2133          */
2134         if (!(cancel_flags & LCF_ASYNC))
2135                 wait_for_completion(&blwi->blwi_comp);
2136
2137         RETURN(0);
2138 }
2139
2140 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
2141                              struct ldlm_namespace *ns,
2142                              struct ldlm_lock_desc *ld,
2143                              struct list_head *cancels, int count,
2144                              struct ldlm_lock *lock,
2145                              enum ldlm_cancel_flags cancel_flags)
2146 {
2147         init_completion(&blwi->blwi_comp);
2148         INIT_LIST_HEAD(&blwi->blwi_head);
2149
2150         if (current->flags & PF_MEMALLOC)
2151                 blwi->blwi_mem_pressure = 1;
2152
2153         blwi->blwi_ns = ns;
2154         blwi->blwi_flags = cancel_flags;
2155         if (ld != NULL)
2156                 blwi->blwi_ld = *ld;
2157         if (count) {
2158                 list_splice_init(cancels, &blwi->blwi_head);
2159                 blwi->blwi_count = count;
2160         } else {
2161                 blwi->blwi_lock = lock;
2162         }
2163 }
2164
2165 /**
2166  * Queues a list of locks \a cancels containing \a count locks
2167  * for later processing by a blocking thread.  If \a count is zero,
2168  * then the lock referenced as \a lock is queued instead.
2169  *
2170  * The blocking thread would then call ->l_blocking_ast callback in the lock.
2171  * If list addition fails an error is returned and caller is supposed to
2172  * call ->l_blocking_ast itself.
2173  */
2174 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
2175                              struct ldlm_lock_desc *ld,
2176                              struct ldlm_lock *lock,
2177                              struct list_head *cancels, int count,
2178                              enum ldlm_cancel_flags cancel_flags)
2179 {
2180         ENTRY;
2181
2182         if (cancels && count == 0)
2183                 RETURN(0);
2184
2185         if (cancel_flags & LCF_ASYNC) {
2186                 struct ldlm_bl_work_item *blwi;
2187
2188                 OBD_ALLOC(blwi, sizeof(*blwi));
2189                 if (blwi == NULL)
2190                         RETURN(-ENOMEM);
2191                 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
2192
2193                 RETURN(__ldlm_bl_to_thread(blwi, cancel_flags));
2194         } else {
2195                 /*
2196                  * if it is synchronous call do minimum mem alloc, as it could
2197                  * be triggered from kernel shrinker
2198                  */
2199                 struct ldlm_bl_work_item blwi;
2200
2201                 memset(&blwi, 0, sizeof(blwi));
2202                 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
2203                 RETURN(__ldlm_bl_to_thread(&blwi, cancel_flags));
2204         }
2205 }
2206
2207
2208 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
2209                            struct ldlm_lock *lock)
2210 {
2211         return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
2212 }
2213
2214 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
2215                            struct list_head *cancels, int count,
2216                            enum ldlm_cancel_flags cancel_flags)
2217 {
2218         return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
2219 }
2220
2221 int ldlm_bl_to_thread_ns(struct ldlm_namespace *ns)
2222 {
2223         return ldlm_bl_to_thread(ns, NULL, NULL, NULL, 0, LCF_ASYNC);
2224 }
2225
2226 int ldlm_bl_thread_wakeup(void)
2227 {
2228         wake_up(&ldlm_state->ldlm_bl_pool->blp_waitq);
2229         return 0;
2230 }
2231
2232 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
2233 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
2234 {
2235         struct obd_device *obd = req->rq_export->exp_obd;
2236         char *key;
2237         void *val;
2238         int keylen, vallen;
2239         int rc = -ENOSYS;
2240
2241         ENTRY;
2242
2243         DEBUG_REQ(D_HSM, req, "%s: handle setinfo", obd->obd_name);
2244
2245         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
2246
2247         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2248         if (key == NULL) {
2249                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
2250                 RETURN(-EFAULT);
2251         }
2252         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
2253                                       RCL_CLIENT);
2254         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
2255         if (val == NULL) {
2256                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
2257                 RETURN(-EFAULT);
2258         }
2259         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
2260                                       RCL_CLIENT);
2261
2262         /* We are responsible for swabbing contents of val */
2263
2264         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
2265                 /* Pass it on to mdc (the "export" in this case) */
2266                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
2267                                         req->rq_export,
2268                                         sizeof(KEY_HSM_COPYTOOL_SEND),
2269                                         KEY_HSM_COPYTOOL_SEND,
2270                                         vallen, val, NULL);
2271         else
2272                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key '%s'", key);
2273
2274         return rc;
2275 }
2276
2277 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
2278                                         const char *msg, int rc,
2279                                         const struct lustre_handle *handle)
2280 {
2281         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
2282                   "%s, NID=%s lock=%#llx: rc = %d",
2283                   msg, libcfs_id2str(req->rq_peer),
2284                   handle ? handle->cookie : 0, rc);
2285         if (req->rq_no_reply)
2286                 CWARN("No reply was sent, maybe cause b=21636.\n");
2287         else if (rc)
2288                 CWARN("Send reply failed, maybe cause b=21636.\n");
2289 }
2290
2291 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
2292 static int ldlm_callback_handler(struct ptlrpc_request *req)
2293 {
2294         struct ldlm_namespace *ns;
2295         struct ldlm_request *dlm_req;
2296         struct ldlm_lock *lock;
2297         int rc;
2298
2299         ENTRY;
2300
2301         /*
2302          * Requests arrive in sender's byte order.  The ptlrpc service
2303          * handler has already checked and, if necessary, byte-swapped the
2304          * incoming request message body, but I am responsible for the
2305          * message buffers.
2306          */
2307
2308         /* do nothing for sec context finalize */
2309         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
2310                 RETURN(0);
2311
2312         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2313
2314         if (req->rq_export == NULL) {
2315                 rc = ldlm_callback_reply(req, -ENOTCONN);
2316                 ldlm_callback_errmsg(req, "Operate on unconnected server",
2317                                      rc, NULL);
2318                 RETURN(0);
2319         }
2320
2321         LASSERT(req->rq_export != NULL);
2322         LASSERT(req->rq_export->exp_obd != NULL);
2323
2324         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2325         case LDLM_BL_CALLBACK:
2326                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) {
2327                         if (cfs_fail_err)
2328                                 ldlm_callback_reply(req, -(int)cfs_fail_err);
2329                         RETURN(0);
2330                 }
2331                 break;
2332         case LDLM_CP_CALLBACK:
2333                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
2334                         RETURN(0);
2335                 break;
2336         case LDLM_GL_CALLBACK:
2337                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
2338                         RETURN(0);
2339                 break;
2340         case LDLM_SET_INFO:
2341                 rc = ldlm_handle_setinfo(req);
2342                 ldlm_callback_reply(req, rc);
2343                 RETURN(0);
2344         default:
2345                 CERROR("unknown opcode %u\n",
2346                        lustre_msg_get_opc(req->rq_reqmsg));
2347                 ldlm_callback_reply(req, -EPROTO);
2348                 RETURN(0);
2349         }
2350
2351         ns = req->rq_export->exp_obd->obd_namespace;
2352         LASSERT(ns != NULL);
2353
2354         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
2355
2356         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2357         if (dlm_req == NULL) {
2358                 rc = ldlm_callback_reply(req, -EPROTO);
2359                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
2360                                      NULL);
2361                 RETURN(0);
2362         }
2363
2364         /*
2365          * Force a known safe race, send a cancel to the server for a lock
2366          * which the server has already started a blocking callback on.
2367          */
2368         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
2369             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
2370                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
2371                 if (rc < 0)
2372                         CERROR("ldlm_cli_cancel: %d\n", rc);
2373         }
2374
2375         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
2376         if (!lock) {
2377                 CDEBUG(D_DLMTRACE,
2378                        "callback on lock %#llx - lock disappeared\n",
2379                        dlm_req->lock_handle[0].cookie);
2380                 rc = ldlm_callback_reply(req, -EINVAL);
2381                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
2382                                      &dlm_req->lock_handle[0]);
2383                 RETURN(0);
2384         }
2385
2386         if (ldlm_is_fail_loc(lock) &&
2387             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
2388                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
2389
2390         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
2391         lock_res_and_lock(lock);
2392         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
2393                                               LDLM_FL_AST_MASK);
2394         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
2395                 /*
2396                  * If somebody cancels lock and cache is already dropped,
2397                  * or lock is failed before cp_ast received on client,
2398                  * we can tell the server we have no lock. Otherwise, we
2399                  * should send cancel after dropping the cache.
2400                  */
2401                 if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
2402                      ldlm_is_failed(lock)) {
2403                         LDLM_DEBUG(lock,
2404                                    "callback on lock %llx - lock disappeared",
2405                                    dlm_req->lock_handle[0].cookie);
2406                         unlock_res_and_lock(lock);
2407                         LDLM_LOCK_RELEASE(lock);
2408                         rc = ldlm_callback_reply(req, -EINVAL);
2409                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
2410                                              &dlm_req->lock_handle[0]);
2411                         RETURN(0);
2412                 }
2413                 /*
2414                  * BL_AST locks are not needed in LRU.
2415                  * Let ldlm_cancel_lru() be fast.
2416                  */
2417                 ldlm_lock_remove_from_lru(lock);
2418                 ldlm_set_bl_ast(lock);
2419         }
2420         if (lock->l_remote_handle.cookie == 0)
2421                 lock->l_remote_handle = dlm_req->lock_handle[1];
2422         unlock_res_and_lock(lock);
2423
2424         /*
2425          * We want the ost thread to get this reply so that it can respond
2426          * to ost requests (write cache writeback) that might be triggered
2427          * in the callback.
2428          *
2429          * But we'd also like to be able to indicate in the reply that we're
2430          * cancelling right now, because it's unused, or have an intent result
2431          * in the reply, so we might have to push the responsibility for sending
2432          * the reply down into the AST handlers, alas.
2433          */
2434
2435         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2436         case LDLM_BL_CALLBACK:
2437                 CDEBUG(D_INODE, "blocking ast\n");
2438                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
2439                 if (!ldlm_is_cancel_on_block(lock)) {
2440                         rc = ldlm_callback_reply(req, 0);
2441                         if (req->rq_no_reply || rc)
2442                                 ldlm_callback_errmsg(req, "Normal process", rc,
2443                                                      &dlm_req->lock_handle[0]);
2444                 }
2445                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
2446                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
2447                 break;
2448         case LDLM_CP_CALLBACK:
2449                 CDEBUG(D_INODE, "completion ast\n");
2450                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
2451                 rc = ldlm_handle_cp_callback(req, ns, dlm_req, lock);
2452                 if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE))
2453                         ldlm_callback_reply(req, rc);
2454                 break;
2455         case LDLM_GL_CALLBACK:
2456                 CDEBUG(D_INODE, "glimpse ast\n");
2457                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
2458                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
2459                 break;
2460         default:
2461                 LBUG(); /* checked above */
2462         }
2463
2464         RETURN(0);
2465 }
2466
2467 #ifdef HAVE_SERVER_SUPPORT
2468 /**
2469  * Main handler for canceld thread.
2470  *
2471  * Separated into its own thread to avoid deadlocks.
2472  */
2473 static int ldlm_cancel_handler(struct ptlrpc_request *req)
2474 {
2475         int rc;
2476
2477         ENTRY;
2478
2479         /*
2480          * Requests arrive in sender's byte order.  The ptlrpc service
2481          * handler has already checked and, if necessary, byte-swapped the
2482          * incoming request message body, but I am responsible for the
2483          * message buffers.
2484          */
2485
2486         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2487
2488         if (req->rq_export == NULL) {
2489                 struct ldlm_request *dlm_req;
2490
2491                 CERROR("%s from %s arrived at %llu with bad export cookie %llu\n",
2492                        ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
2493                        libcfs_nid2str(req->rq_peer.nid),
2494                        (unsigned long long)req->rq_arrival_time.tv_sec,
2495                        lustre_msg_get_handle(req->rq_reqmsg)->cookie);
2496
2497                 if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
2498                         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
2499                         dlm_req = req_capsule_client_get(&req->rq_pill,
2500                                                          &RMF_DLM_REQ);
2501                         if (dlm_req != NULL)
2502                                 ldlm_lock_dump_handle(D_ERROR,
2503                                                       &dlm_req->lock_handle[0]);
2504                 }
2505                 ldlm_callback_reply(req, -ENOTCONN);
2506                 RETURN(0);
2507         }
2508
2509         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2510         /* XXX FIXME move this back to mds/handler.c, b=249 */
2511         case LDLM_CANCEL:
2512                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2513                 CDEBUG(D_INODE, "cancel\n");
2514                 if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) ||
2515                     CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND) ||
2516                     CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_EVICT))
2517                         RETURN(0);
2518                 rc = ldlm_handle_cancel(req);
2519                 break;
2520         case LDLM_CONVERT:
2521         {
2522                 struct ldlm_request *dlm_req;
2523
2524                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
2525                 CDEBUG(D_INODE, "convert\n");
2526
2527                 dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2528                 if (dlm_req == NULL) {
2529                         CDEBUG(D_INFO, "bad request buffer for cancel\n");
2530                         rc = ldlm_callback_reply(req, -EPROTO);
2531                 } else {
2532                         req->rq_status = ldlm_handle_convert0(req, dlm_req);
2533                         rc = ptlrpc_reply(req);
2534                 }
2535                 break;
2536         }
2537         default:
2538                 CERROR("invalid opcode %d\n",
2539                        lustre_msg_get_opc(req->rq_reqmsg));
2540                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
2541                 rc = ldlm_callback_reply(req, -EINVAL);
2542         }
2543
2544         RETURN(rc);
2545 }
2546
2547 static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req,
2548                                         struct ldlm_lock *lock)
2549 {
2550         struct ldlm_request *dlm_req;
2551         struct lustre_handle lockh;
2552         int rc = 0;
2553         int i;
2554
2555         ENTRY;
2556
2557         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2558         if (dlm_req == NULL)
2559                 RETURN(0);
2560
2561         ldlm_lock2handle(lock, &lockh);
2562         for (i = 0; i < dlm_req->lock_count; i++) {
2563                 if (lustre_handle_equal(&dlm_req->lock_handle[i],
2564                                         &lockh)) {
2565                         DEBUG_REQ(D_RPCTRACE, req,
2566                                   "Prio raised by lock %#llx", lockh.cookie);
2567                         rc = 1;
2568                         break;
2569                 }
2570         }
2571
2572         RETURN(rc);
2573 }
2574
2575 static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req)
2576 {
2577         struct ldlm_request *dlm_req;
2578         int rc = 0;
2579         int i;
2580         unsigned int size;
2581
2582         ENTRY;
2583
2584         /* no prolong in recovery */
2585         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
2586                 RETURN(0);
2587
2588         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2589         if (dlm_req == NULL)
2590                 RETURN(-EFAULT);
2591
2592         size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT);
2593         if (size <= offsetof(struct ldlm_request, lock_handle) ||
2594             (size - offsetof(struct ldlm_request, lock_handle)) /
2595              sizeof(struct lustre_handle) < dlm_req->lock_count)
2596                 RETURN(-EPROTO);
2597
2598         for (i = 0; i < dlm_req->lock_count; i++) {
2599                 struct ldlm_lock *lock;
2600
2601                 lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
2602                 if (lock == NULL)
2603                         continue;
2604
2605                 rc = ldlm_is_ast_sent(lock) ? 1 : 0;
2606                 if (rc)
2607                         LDLM_DEBUG(lock, "hpreq cancel/convert lock");
2608                 LDLM_LOCK_PUT(lock);
2609
2610                 if (rc)
2611                         break;
2612         }
2613
2614         RETURN(rc);
2615 }
2616
2617 static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = {
2618         .hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
2619         .hpreq_check      = ldlm_cancel_hpreq_check,
2620         .hpreq_fini       = NULL,
2621 };
2622
2623 static int ldlm_hpreq_handler(struct ptlrpc_request *req)
2624 {
2625         ENTRY;
2626
2627         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2628
2629         if (req->rq_export == NULL)
2630                 RETURN(0);
2631
2632         if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) {
2633                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2634                 req->rq_ops = &ldlm_cancel_hpreq_ops;
2635         } else if (LDLM_CONVERT == lustre_msg_get_opc(req->rq_reqmsg)) {
2636                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
2637                 req->rq_ops = &ldlm_cancel_hpreq_ops;
2638         }
2639         RETURN(0);
2640 }
2641
2642 static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2643                                struct hlist_node *hnode, void *data)
2644
2645 {
2646         struct list_head *rpc_list = data;
2647         struct ldlm_lock *lock = cfs_hash_object(hs, hnode);
2648
2649         lock_res_and_lock(lock);
2650
2651         if (!ldlm_is_granted(lock)) {
2652                 unlock_res_and_lock(lock);
2653                 return 0;
2654         }
2655
2656         LASSERT(lock->l_resource);
2657         if (lock->l_resource->lr_type != LDLM_IBITS &&
2658             lock->l_resource->lr_type != LDLM_PLAIN) {
2659                 unlock_res_and_lock(lock);
2660                 return 0;
2661         }
2662
2663         if (ldlm_is_ast_sent(lock)) {
2664                 unlock_res_and_lock(lock);
2665                 return 0;
2666         }
2667
2668         LASSERT(lock->l_blocking_ast);
2669         LASSERT(!lock->l_blocking_lock);
2670
2671         ldlm_set_ast_sent(lock);
2672         if (lock->l_export && lock->l_export->exp_lock_hash) {
2673                 /*
2674                  * NB: it's safe to call cfs_hash_del() even lock isn't
2675                  * in exp_lock_hash.
2676                  */
2677                 /*
2678                  * In the function below, .hs_keycmp resolves to
2679                  * ldlm_export_lock_keycmp()
2680                  */
2681                 /* coverity[overrun-buffer-val] */
2682                 cfs_hash_del(lock->l_export->exp_lock_hash,
2683                              &lock->l_remote_handle, &lock->l_exp_hash);
2684         }
2685
2686         list_add_tail(&lock->l_rk_ast, rpc_list);
2687         LDLM_LOCK_GET(lock);
2688
2689         unlock_res_and_lock(lock);
2690         return 0;
2691 }
2692
2693 void ldlm_revoke_export_locks(struct obd_export *exp)
2694 {
2695         LIST_HEAD(rpc_list);
2696
2697         ENTRY;
2698
2699         cfs_hash_for_each_nolock(exp->exp_lock_hash,
2700                                  ldlm_revoke_lock_cb, &rpc_list, 0);
2701         ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
2702                           LDLM_WORK_REVOKE_AST);
2703
2704         EXIT;
2705 }
2706 EXPORT_SYMBOL(ldlm_revoke_export_locks);
2707 #endif /* HAVE_SERVER_SUPPORT */
2708
2709 static int ldlm_bl_get_work(struct ldlm_bl_pool *blp,
2710                             struct ldlm_bl_work_item **p_blwi,
2711                             struct obd_export **p_exp)
2712 {
2713         struct ldlm_bl_work_item *blwi = NULL;
2714         static unsigned int num_bl;
2715         static unsigned int num_stale;
2716         int num_th = atomic_read(&blp->blp_num_threads);
2717
2718         *p_exp = obd_stale_export_get();
2719
2720         spin_lock(&blp->blp_lock);
2721         if (*p_exp != NULL) {
2722                 if (num_th == 1 || ++num_stale < num_th) {
2723                         spin_unlock(&blp->blp_lock);
2724                         return 1;
2725                 }
2726                 num_stale = 0;
2727         }
2728
2729         /* process a request from the blp_list at least every blp_num_threads */
2730         if (!list_empty(&blp->blp_list) &&
2731             (list_empty(&blp->blp_prio_list) || num_bl == 0))
2732                 blwi = list_entry(blp->blp_list.next,
2733                                   struct ldlm_bl_work_item, blwi_entry);
2734         else
2735                 if (!list_empty(&blp->blp_prio_list))
2736                         blwi = list_entry(blp->blp_prio_list.next,
2737                                           struct ldlm_bl_work_item,
2738                                           blwi_entry);
2739
2740         if (blwi) {
2741                 if (++num_bl >= num_th)
2742                         num_bl = 0;
2743                 list_del(&blwi->blwi_entry);
2744         }
2745         spin_unlock(&blp->blp_lock);
2746         *p_blwi = blwi;
2747
2748         if (*p_exp != NULL && *p_blwi != NULL) {
2749                 obd_stale_export_put(*p_exp);
2750                 *p_exp = NULL;
2751         }
2752
2753         return (*p_blwi != NULL || *p_exp != NULL) ? 1 : 0;
2754 }
2755
2756 /* This only contains temporary data until the thread starts */
2757 struct ldlm_bl_thread_data {
2758         struct ldlm_bl_pool     *bltd_blp;
2759         struct completion       bltd_comp;
2760         int                     bltd_num;
2761 };
2762
2763 static int ldlm_bl_thread_main(void *arg);
2764
2765 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp, bool check_busy)
2766 {
2767         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
2768         struct task_struct *task;
2769
2770         init_completion(&bltd.bltd_comp);
2771
2772         bltd.bltd_num = atomic_inc_return(&blp->blp_num_threads);
2773         if (bltd.bltd_num >= blp->blp_max_threads) {
2774                 atomic_dec(&blp->blp_num_threads);
2775                 return 0;
2776         }
2777
2778         LASSERTF(bltd.bltd_num > 0, "thread num:%d\n", bltd.bltd_num);
2779         if (check_busy &&
2780             atomic_read(&blp->blp_busy_threads) < (bltd.bltd_num - 1)) {
2781                 atomic_dec(&blp->blp_num_threads);
2782                 return 0;
2783         }
2784
2785         task = kthread_run(ldlm_bl_thread_main, &bltd, "ldlm_bl_%02d",
2786                            bltd.bltd_num);
2787         if (IS_ERR(task)) {
2788                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
2789                        bltd.bltd_num, PTR_ERR(task));
2790                 atomic_dec(&blp->blp_num_threads);
2791                 return PTR_ERR(task);
2792         }
2793         wait_for_completion(&bltd.bltd_comp);
2794
2795         return 0;
2796 }
2797
2798 /* Not fatal if racy and have a few too many threads */
2799 static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp,
2800                                       struct ldlm_bl_work_item *blwi)
2801 {
2802         if (atomic_read(&blp->blp_num_threads) >= blp->blp_max_threads)
2803                 return 0;
2804
2805         if (atomic_read(&blp->blp_busy_threads) <
2806             atomic_read(&blp->blp_num_threads))
2807                 return 0;
2808
2809         if (blwi != NULL && (blwi->blwi_ns == NULL ||
2810                              blwi->blwi_mem_pressure))
2811                 return 0;
2812
2813         return 1;
2814 }
2815
2816 static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
2817                                struct ldlm_bl_work_item *blwi)
2818 {
2819         /* '1' for consistency with code that checks !mpflag to restore */
2820         unsigned int mpflags = 1;
2821
2822         ENTRY;
2823
2824         if (blwi->blwi_ns == NULL)
2825                 /* added by ldlm_cleanup() */
2826                 RETURN(LDLM_ITER_STOP);
2827
2828         if (blwi->blwi_mem_pressure)
2829                 mpflags = memalloc_noreclaim_save();
2830
2831         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
2832
2833         if (blwi->blwi_count) {
2834                 int count;
2835                 /*
2836                  * The special case when we cancel locks in lru
2837                  * asynchronously, we pass the list of locks here.
2838                  * Thus locks are marked LDLM_FL_CANCELING, but NOT
2839                  * canceled locally yet.
2840                  */
2841                 count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
2842                                                    blwi->blwi_count,
2843                                                    LCF_BL_AST);
2844                 ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
2845                                      blwi->blwi_flags);
2846         } else if (blwi->blwi_lock) {
2847                 ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
2848                                         blwi->blwi_lock);
2849         } else {
2850                 ldlm_pool_recalc(&blwi->blwi_ns->ns_pool, true);
2851                 spin_lock(&blwi->blwi_ns->ns_lock);
2852                 blwi->blwi_ns->ns_rpc_recalc = 0;
2853                 spin_unlock(&blwi->blwi_ns->ns_lock);
2854                 ldlm_namespace_put(blwi->blwi_ns);
2855         }
2856
2857         if (blwi->blwi_mem_pressure)
2858                 memalloc_noreclaim_restore(mpflags);
2859
2860         if (blwi->blwi_flags & LCF_ASYNC)
2861                 OBD_FREE(blwi, sizeof(*blwi));
2862         else
2863                 complete(&blwi->blwi_comp);
2864
2865         RETURN(0);
2866 }
2867
2868 /**
2869  * Cancel stale locks on export. Cancel blocked locks first.
2870  * If the given export has blocked locks, the next in the list may have
2871  * them too, thus cancel not blocked locks only if the current export has
2872  * no blocked locks.
2873  **/
2874 static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp,
2875                                   struct obd_export *exp)
2876 {
2877         int num;
2878
2879         ENTRY;
2880
2881         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4);
2882
2883         num = ldlm_export_cancel_blocked_locks(exp);
2884         if (num == 0)
2885                 ldlm_export_cancel_locks(exp);
2886
2887         obd_stale_export_put(exp);
2888
2889         RETURN(0);
2890 }
2891
2892
2893 /**
2894  * Main blocking requests processing thread.
2895  *
2896  * Callers put locks into its queue by calling ldlm_bl_to_thread.
2897  * This thread in the end ends up doing actual call to ->l_blocking_ast
2898  * for queued locks.
2899  */
2900 static int ldlm_bl_thread_main(void *arg)
2901 {
2902         struct lu_env *env;
2903         struct ldlm_bl_pool *blp;
2904         struct ldlm_bl_thread_data *bltd = arg;
2905         int rc;
2906
2907         ENTRY;
2908
2909         OBD_ALLOC_PTR(env);
2910         if (!env)
2911                 RETURN(-ENOMEM);
2912         rc = lu_env_init(env, LCT_DT_THREAD);
2913         if (rc)
2914                 GOTO(out_env, rc);
2915         rc = lu_env_add(env);
2916         if (rc)
2917                 GOTO(out_env_fini, rc);
2918
2919         blp = bltd->bltd_blp;
2920
2921         complete(&bltd->bltd_comp);
2922         /* cannot use bltd after this, it is only on caller's stack */
2923
2924         while (1) {
2925                 struct ldlm_bl_work_item *blwi = NULL;
2926                 struct obd_export *exp = NULL;
2927                 int rc;
2928
2929                 rc = ldlm_bl_get_work(blp, &blwi, &exp);
2930
2931                 if (rc == 0)
2932                         wait_event_idle_exclusive(blp->blp_waitq,
2933                                                   ldlm_bl_get_work(blp, &blwi,
2934                                                                    &exp));
2935                 atomic_inc(&blp->blp_busy_threads);
2936
2937                 if (ldlm_bl_thread_need_create(blp, blwi))
2938                         /* discard the return value, we tried */
2939                         ldlm_bl_thread_start(blp, true);
2940
2941                 if (exp)
2942                         rc = ldlm_bl_thread_exports(blp, exp);
2943                 else if (blwi)
2944                         rc = ldlm_bl_thread_blwi(blp, blwi);
2945
2946                 atomic_dec(&blp->blp_busy_threads);
2947
2948                 if (rc == LDLM_ITER_STOP)
2949                         break;
2950
2951                 /*
2952                  * If there are many namespaces, we will not sleep waiting for
2953                  * work, and must do a cond_resched to avoid holding the CPU
2954                  * for too long
2955                  */
2956                 cond_resched();
2957         }
2958
2959         atomic_dec(&blp->blp_num_threads);
2960         complete(&blp->blp_comp);
2961
2962         lu_env_remove(env);
2963 out_env_fini:
2964         lu_env_fini(env);
2965 out_env:
2966         OBD_FREE_PTR(env);
2967         RETURN(rc);
2968 }
2969
2970
2971 static int ldlm_setup(void);
2972 static int ldlm_cleanup(void);
2973
2974 int ldlm_get_ref(void)
2975 {
2976         int rc = 0;
2977
2978         ENTRY;
2979         mutex_lock(&ldlm_ref_mutex);
2980         if (++ldlm_refcount == 1) {
2981                 rc = ldlm_setup();
2982                 if (rc)
2983                         ldlm_refcount--;
2984         }
2985         mutex_unlock(&ldlm_ref_mutex);
2986
2987         RETURN(rc);
2988 }
2989
2990 void ldlm_put_ref(void)
2991 {
2992         ENTRY;
2993         mutex_lock(&ldlm_ref_mutex);
2994         if (ldlm_refcount == 1) {
2995                 int rc = ldlm_cleanup();
2996
2997                 if (rc)
2998                         CERROR("ldlm_cleanup failed: %d\n", rc);
2999                 else
3000                         ldlm_refcount--;
3001         } else {
3002                 ldlm_refcount--;
3003         }
3004         mutex_unlock(&ldlm_ref_mutex);
3005
3006         EXIT;
3007 }
3008
3009 /*
3010  * Export handle<->lock hash operations.
3011  */
3012 static unsigned
3013 ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned int mask)
3014 {
3015         return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
3016 }
3017
3018 static void *
3019 ldlm_export_lock_key(struct hlist_node *hnode)
3020 {
3021         struct ldlm_lock *lock;
3022
3023         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
3024         return &lock->l_remote_handle;
3025 }
3026
3027 static void
3028 ldlm_export_lock_keycpy(struct hlist_node *hnode, void *key)
3029 {
3030         struct ldlm_lock     *lock;
3031
3032         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
3033         lock->l_remote_handle = *(struct lustre_handle *)key;
3034 }
3035
3036 static int
3037 ldlm_export_lock_keycmp(const void *key, struct hlist_node *hnode)
3038 {
3039         return lustre_handle_equal(ldlm_export_lock_key(hnode), key);
3040 }
3041
3042 static void *
3043 ldlm_export_lock_object(struct hlist_node *hnode)
3044 {
3045         return hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
3046 }
3047
3048 static void
3049 ldlm_export_lock_get(struct cfs_hash *hs, struct hlist_node *hnode)
3050 {
3051         struct ldlm_lock *lock;
3052
3053         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
3054         LDLM_LOCK_GET(lock);
3055 }
3056
3057 static void
3058 ldlm_export_lock_put(struct cfs_hash *hs, struct hlist_node *hnode)
3059 {
3060         struct ldlm_lock *lock;
3061
3062         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
3063         LDLM_LOCK_RELEASE(lock);
3064 }
3065
3066 static struct cfs_hash_ops ldlm_export_lock_ops = {
3067         .hs_hash        = ldlm_export_lock_hash,
3068         .hs_key         = ldlm_export_lock_key,
3069         .hs_keycmp      = ldlm_export_lock_keycmp,
3070         .hs_keycpy      = ldlm_export_lock_keycpy,
3071         .hs_object      = ldlm_export_lock_object,
3072         .hs_get         = ldlm_export_lock_get,
3073         .hs_put         = ldlm_export_lock_put,
3074         .hs_put_locked  = ldlm_export_lock_put,
3075 };
3076
3077 int ldlm_init_export(struct obd_export *exp)
3078 {
3079         int rc;
3080
3081         ENTRY;
3082
3083         exp->exp_lock_hash =
3084                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
3085                                 HASH_EXP_LOCK_CUR_BITS,
3086                                 HASH_EXP_LOCK_MAX_BITS,
3087                                 HASH_EXP_LOCK_BKT_BITS, 0,
3088                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
3089                                 &ldlm_export_lock_ops,
3090                                 CFS_HASH_DEFAULT | CFS_HASH_REHASH_KEY |
3091                                 CFS_HASH_NBLK_CHANGE);
3092
3093         if (!exp->exp_lock_hash)
3094                 RETURN(-ENOMEM);
3095
3096         rc = ldlm_init_flock_export(exp);
3097         if (rc)
3098                 GOTO(err, rc);
3099
3100         RETURN(0);
3101 err:
3102         ldlm_destroy_export(exp);
3103         RETURN(rc);
3104 }
3105 EXPORT_SYMBOL(ldlm_init_export);
3106
3107 void ldlm_destroy_export(struct obd_export *exp)
3108 {
3109         ENTRY;
3110         cfs_hash_putref(exp->exp_lock_hash);
3111         exp->exp_lock_hash = NULL;
3112
3113         ldlm_destroy_flock_export(exp);
3114         EXIT;
3115 }
3116 EXPORT_SYMBOL(ldlm_destroy_export);
3117
3118 static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj,
3119                                                       struct attribute *attr,
3120                                                       char *buf)
3121 {
3122         return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay);
3123 }
3124
3125 static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj,
3126                                                        struct attribute *attr,
3127                                                        const char *buffer,
3128                                                        size_t count)
3129 {
3130         int rc;
3131         unsigned long val;
3132
3133         rc = kstrtoul(buffer, 10, &val);
3134         if (rc)
3135                 return rc;
3136
3137         ldlm_cancel_unused_locks_before_replay = val;
3138
3139         return count;
3140 }
3141 LUSTRE_RW_ATTR(cancel_unused_locks_before_replay);
3142
3143 static struct attribute *ldlm_attrs[] = {
3144         &lustre_attr_cancel_unused_locks_before_replay.attr,
3145         NULL,
3146 };
3147
3148 static struct attribute_group ldlm_attr_group = {
3149         .attrs = ldlm_attrs,
3150 };
3151
3152 static int ldlm_setup(void)
3153 {
3154         static struct ptlrpc_service_conf       conf;
3155         struct ldlm_bl_pool                    *blp = NULL;
3156 #ifdef HAVE_SERVER_SUPPORT
3157         struct task_struct *task;
3158 #endif /* HAVE_SERVER_SUPPORT */
3159         int i;
3160         int rc = 0;
3161
3162         ENTRY;
3163
3164         if (ldlm_state != NULL)
3165                 RETURN(-EALREADY);
3166
3167         OBD_ALLOC(ldlm_state, sizeof(*ldlm_state));
3168         if (ldlm_state == NULL)
3169                 RETURN(-ENOMEM);
3170
3171         ldlm_kobj = kobject_create_and_add("ldlm", &lustre_kset->kobj);
3172         if (!ldlm_kobj)
3173                 GOTO(out, -ENOMEM);
3174
3175         rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group);
3176         if (rc)
3177                 GOTO(out, rc);
3178
3179         ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj);
3180         if (!ldlm_ns_kset)
3181                 GOTO(out, -ENOMEM);
3182
3183         ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj);
3184         if (!ldlm_svc_kset)
3185                 GOTO(out, -ENOMEM);
3186
3187         rc = ldlm_debugfs_setup();
3188         if (rc != 0)
3189                 GOTO(out, rc);
3190
3191         memset(&conf, 0, sizeof(conf));
3192         conf = (typeof(conf)) {
3193                 .psc_name               = "ldlm_cbd",
3194                 .psc_watchdog_factor    = 2,
3195                 .psc_buf                = {
3196                         .bc_nbufs               = LDLM_CLIENT_NBUFS,
3197                         .bc_buf_size            = LDLM_BUFSIZE,
3198                         .bc_req_max_size        = LDLM_MAXREQSIZE,
3199                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
3200                         .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
3201                         .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
3202                 },
3203                 .psc_thr                = {
3204                         .tc_thr_name            = "ldlm_cb",
3205                         .tc_thr_factor          = LDLM_THR_FACTOR,
3206                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
3207                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
3208                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
3209                         .tc_nthrs_user          = ldlm_num_threads,
3210                         .tc_cpu_bind            = ldlm_cpu_bind,
3211                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
3212                 },
3213                 .psc_cpt                = {
3214                         .cc_pattern             = ldlm_cpts,
3215                         .cc_affinity            = true,
3216                 },
3217                 .psc_ops                = {
3218                         .so_req_handler         = ldlm_callback_handler,
3219                 },
3220         };
3221         ldlm_state->ldlm_cb_service = \
3222                         ptlrpc_register_service(&conf, ldlm_svc_kset,
3223                                                 ldlm_svc_debugfs_dir);
3224         if (IS_ERR(ldlm_state->ldlm_cb_service)) {
3225                 CERROR("failed to start service\n");
3226                 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
3227                 ldlm_state->ldlm_cb_service = NULL;
3228                 GOTO(out, rc);
3229         }
3230
3231 #ifdef HAVE_SERVER_SUPPORT
3232         memset(&conf, 0, sizeof(conf));
3233         conf = (typeof(conf)) {
3234                 .psc_name               = "ldlm_canceld",
3235                 .psc_watchdog_factor    = 6,
3236                 .psc_buf                = {
3237                         .bc_nbufs               = LDLM_SERVER_NBUFS,
3238                         .bc_buf_size            = LDLM_BUFSIZE,
3239                         .bc_req_max_size        = LDLM_MAXREQSIZE,
3240                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
3241                         .bc_req_portal          = LDLM_CANCEL_REQUEST_PORTAL,
3242                         .bc_rep_portal          = LDLM_CANCEL_REPLY_PORTAL,
3243
3244                 },
3245                 .psc_thr                = {
3246                         .tc_thr_name            = "ldlm_cn",
3247                         .tc_thr_factor          = LDLM_THR_FACTOR,
3248                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
3249                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
3250                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
3251                         .tc_nthrs_user          = ldlm_num_threads,
3252                         .tc_cpu_bind            = ldlm_cpu_bind,
3253                         .tc_ctx_tags            = LCT_MD_THREAD | \
3254                                                   LCT_DT_THREAD | \
3255                                                   LCT_CL_THREAD,
3256                 },
3257                 .psc_cpt                = {
3258                         .cc_pattern             = ldlm_cpts,
3259                         .cc_affinity            = true,
3260                 },
3261                 .psc_ops                = {
3262                         .so_req_handler         = ldlm_cancel_handler,
3263                         .so_hpreq_handler       = ldlm_hpreq_handler,
3264                 },
3265         };
3266         ldlm_state->ldlm_cancel_service = \
3267                         ptlrpc_register_service(&conf, ldlm_svc_kset,
3268                                                 ldlm_svc_debugfs_dir);
3269         if (IS_ERR(ldlm_state->ldlm_cancel_service)) {
3270                 CERROR("failed to start service\n");
3271                 rc = PTR_ERR(ldlm_state->ldlm_cancel_service);
3272                 ldlm_state->ldlm_cancel_service = NULL;
3273                 GOTO(out, rc);
3274         }
3275 #endif /* HAVE_SERVER_SUPPORT */
3276
3277         OBD_ALLOC(blp, sizeof(*blp));
3278         if (blp == NULL)
3279                 GOTO(out, rc = -ENOMEM);
3280         ldlm_state->ldlm_bl_pool = blp;
3281
3282         spin_lock_init(&blp->blp_lock);
3283         INIT_LIST_HEAD(&blp->blp_list);
3284         INIT_LIST_HEAD(&blp->blp_prio_list);
3285         init_waitqueue_head(&blp->blp_waitq);
3286         atomic_set(&blp->blp_num_threads, 0);
3287         atomic_set(&blp->blp_busy_threads, 0);
3288
3289         if (ldlm_num_threads == 0) {
3290                 blp->blp_min_threads = LDLM_NTHRS_INIT;
3291                 blp->blp_max_threads = LDLM_NTHRS_MAX;
3292         } else {
3293                 blp->blp_min_threads = blp->blp_max_threads = \
3294                         min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
3295                                                          ldlm_num_threads));
3296         }
3297
3298         for (i = 0; i < blp->blp_min_threads; i++) {
3299                 rc = ldlm_bl_thread_start(blp, false);
3300                 if (rc < 0)
3301                         GOTO(out, rc);
3302         }
3303
3304 #ifdef HAVE_SERVER_SUPPORT
3305         task = kthread_run(expired_lock_main, NULL, "ldlm_elt");
3306         if (IS_ERR(task)) {
3307                 rc = PTR_ERR(task);
3308                 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
3309                 GOTO(out, rc);
3310         }
3311
3312         wait_event(expired_lock_wait_queue,
3313                    expired_lock_thread_state == ELT_READY);
3314 #endif /* HAVE_SERVER_SUPPORT */
3315
3316         rc = ldlm_pools_init();
3317         if (rc) {
3318                 CERROR("Failed to initialize LDLM pools: %d\n", rc);
3319                 GOTO(out, rc);
3320         }
3321
3322         rc = ldlm_reclaim_setup();
3323         if (rc) {
3324                 CERROR("Failed to setup reclaim thread: rc = %d\n", rc);
3325                 GOTO(out, rc);
3326         }
3327         RETURN(0);
3328
3329  out:
3330         ldlm_cleanup();
3331         RETURN(rc);
3332 }
3333
3334 static int ldlm_cleanup(void)
3335 {
3336         ENTRY;
3337
3338         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
3339             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
3340                 CERROR("ldlm still has namespaces; clean these up first.\n");
3341                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
3342                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
3343                 RETURN(-EBUSY);
3344         }
3345
3346         ldlm_reclaim_cleanup();
3347         ldlm_pools_fini();
3348
3349         if (ldlm_state->ldlm_bl_pool != NULL) {
3350                 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
3351
3352                 while (atomic_read(&blp->blp_num_threads) > 0) {
3353                         struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
3354
3355                         init_completion(&blp->blp_comp);
3356
3357                         spin_lock(&blp->blp_lock);
3358                         list_add_tail(&blwi.blwi_entry, &blp->blp_list);
3359                         wake_up(&blp->blp_waitq);
3360                         spin_unlock(&blp->blp_lock);
3361
3362                         wait_for_completion(&blp->blp_comp);
3363                 }
3364
3365                 OBD_FREE(blp, sizeof(*blp));
3366         }
3367
3368         if (ldlm_state->ldlm_cb_service != NULL)
3369                 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
3370 #ifdef HAVE_SERVER_SUPPORT
3371         if (ldlm_state->ldlm_cancel_service != NULL)
3372                 ptlrpc_unregister_service(ldlm_state->ldlm_cancel_service);
3373 #endif
3374
3375         if (ldlm_ns_kset)
3376                 kset_unregister(ldlm_ns_kset);
3377         if (ldlm_svc_kset)
3378                 kset_unregister(ldlm_svc_kset);
3379         if (ldlm_kobj) {
3380                 sysfs_remove_group(ldlm_kobj, &ldlm_attr_group);
3381                 kobject_put(ldlm_kobj);
3382         }
3383
3384         ldlm_debugfs_cleanup();
3385
3386 #ifdef HAVE_SERVER_SUPPORT
3387         if (expired_lock_thread_state != ELT_STOPPED) {
3388                 expired_lock_thread_state = ELT_TERMINATE;
3389                 wake_up(&expired_lock_wait_queue);
3390                 wait_event(expired_lock_wait_queue,
3391                            expired_lock_thread_state == ELT_STOPPED);
3392         }
3393 #endif
3394
3395         OBD_FREE(ldlm_state, sizeof(*ldlm_state));
3396         ldlm_state = NULL;
3397
3398         RETURN(0);
3399 }
3400
3401 int ldlm_init(void)
3402 {
3403         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
3404                                                sizeof(struct ldlm_resource), 0,
3405                                                SLAB_HWCACHE_ALIGN, NULL);
3406         if (ldlm_resource_slab == NULL)
3407                 return -ENOMEM;
3408
3409         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
3410                               sizeof(struct ldlm_lock), 0,
3411                               SLAB_HWCACHE_ALIGN, NULL);
3412         if (ldlm_lock_slab == NULL)
3413                 goto out_resource;
3414
3415         ldlm_interval_slab = kmem_cache_create("interval_node",
3416                                         sizeof(struct ldlm_interval),
3417                                         0, SLAB_HWCACHE_ALIGN, NULL);
3418         if (ldlm_interval_slab == NULL)
3419                 goto out_lock;
3420
3421         ldlm_interval_tree_slab = kmem_cache_create("interval_tree",
3422                         sizeof(struct ldlm_interval_tree) * LCK_MODE_NUM,
3423                         0, SLAB_HWCACHE_ALIGN, NULL);
3424         if (ldlm_interval_tree_slab == NULL)
3425                 goto out_interval;
3426
3427 #ifdef HAVE_SERVER_SUPPORT
3428         ldlm_inodebits_slab = kmem_cache_create("ldlm_ibits_node",
3429                                                 sizeof(struct ldlm_ibits_node),
3430                                                 0, SLAB_HWCACHE_ALIGN, NULL);
3431         if (ldlm_inodebits_slab == NULL)
3432                 goto out_interval_tree;
3433
3434         ldlm_glimpse_work_kmem = kmem_cache_create("ldlm_glimpse_work_kmem",
3435                                         sizeof(struct ldlm_glimpse_work),
3436                                         0, 0, NULL);
3437         if (ldlm_glimpse_work_kmem == NULL)
3438                 goto out_inodebits;
3439 #endif
3440
3441 #if LUSTRE_TRACKS_LOCK_EXP_REFS
3442         class_export_dump_hook = ldlm_dump_export_locks;
3443 #endif
3444         return 0;
3445 #ifdef HAVE_SERVER_SUPPORT
3446 out_inodebits:
3447         kmem_cache_destroy(ldlm_inodebits_slab);
3448 out_interval_tree:
3449         kmem_cache_destroy(ldlm_interval_tree_slab);
3450 #endif
3451 out_interval:
3452         kmem_cache_destroy(ldlm_interval_slab);
3453 out_lock:
3454         kmem_cache_destroy(ldlm_lock_slab);
3455 out_resource:
3456         kmem_cache_destroy(ldlm_resource_slab);
3457
3458         return -ENOMEM;
3459 }
3460
3461 void ldlm_exit(void)
3462 {
3463         if (ldlm_refcount)
3464                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
3465         synchronize_rcu();
3466         kmem_cache_destroy(ldlm_resource_slab);
3467         /*
3468          * ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
3469          * rcu_barrier() to wait all outstanding RCU callbacks to complete,
3470          * so that ldlm_lock_free() get a chance to be called.
3471          */
3472         rcu_barrier();
3473         kmem_cache_destroy(ldlm_lock_slab);
3474         kmem_cache_destroy(ldlm_interval_slab);
3475         kmem_cache_destroy(ldlm_interval_tree_slab);
3476 #ifdef HAVE_SERVER_SUPPORT
3477         kmem_cache_destroy(ldlm_inodebits_slab);
3478         kmem_cache_destroy(ldlm_glimpse_work_kmem);
3479 #endif
3480 }