Whamcloud - gitweb
LU-12034 obdclass: put all service's env on the list
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32 /**
33  * This file contains Asynchronous System Trap (AST) handlers and related
34  * LDLM request-processing routines.
35  *
36  * An AST is a callback issued on a lock when its state is changed. There are
37  * several different types of ASTs (callbacks) registered for each lock:
38  *
39  * - completion AST: when a lock is enqueued by some process, but cannot be
40  *   granted immediately due to other conflicting locks on the same resource,
41  *   the completion AST is sent to notify the caller when the lock is
42  *   eventually granted
43  *
44  * - blocking AST: when a lock is granted to some process, if another process
45  *   enqueues a conflicting (blocking) lock on a resource, a blocking AST is
46  *   sent to notify the holder(s) of the lock(s) of the conflicting lock
47  *   request. The lock holder(s) must release their lock(s) on that resource in
48  *   a timely manner or be evicted by the server.
49  *
50  * - glimpse AST: this is used when a process wants information about a lock
51  *   (i.e. the lock value block (LVB)) but does not necessarily require holding
52  *   the lock. If the resource is locked, the lock holder(s) are sent glimpse
53  *   ASTs and the LVB is returned to the caller, and lock holder(s) may CANCEL
54  *   their lock(s) if they are idle. If the resource is not locked, the server
55  *   may grant the lock.
56  */
57
58 #define DEBUG_SUBSYSTEM S_LDLM
59
60 #include <lustre_errno.h>
61 #include <lustre_dlm.h>
62 #include <obd_class.h>
63 #include <obd.h>
64
65 #include "ldlm_internal.h"
66
67 unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
68 module_param(ldlm_enqueue_min, uint, 0644);
69 MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
70
71 /* in client side, whether the cached locks will be canceled before replay */
72 unsigned int ldlm_cancel_unused_locks_before_replay = 1;
73
74 static void interrupted_completion_wait(void *data)
75 {
76 }
77
78 struct lock_wait_data {
79         struct ldlm_lock *lwd_lock;
80         __u32             lwd_conn_cnt;
81 };
82
83 struct ldlm_async_args {
84         struct lustre_handle lock_handle;
85 };
86
87 /**
88  * ldlm_request_bufsize
89  *
90  * If opcode=LDLM_ENQUEUE, 1 slot is already occupied,
91  * LDLM_LOCKREQ_HANDLE -1 slots are available.
92  * Otherwise, LDLM_LOCKREQ_HANDLE slots are available.
93  *
94  * \param[in] count
95  * \param[in] type
96  *
97  * \retval size of the request buffer
98  */
99 int ldlm_request_bufsize(int count, int type)
100 {
101         int avail = LDLM_LOCKREQ_HANDLES;
102
103         if (type == LDLM_ENQUEUE)
104                 avail -= LDLM_ENQUEUE_CANCEL_OFF;
105
106         if (count > avail)
107                 avail = (count - avail) * sizeof(struct lustre_handle);
108         else
109                 avail = 0;
110
111         return sizeof(struct ldlm_request) + avail;
112 }
113
114 int ldlm_expired_completion_wait(void *data)
115 {
116         struct lock_wait_data *lwd = data;
117         struct ldlm_lock *lock = lwd->lwd_lock;
118         struct obd_import *imp;
119         struct obd_device *obd;
120
121         ENTRY;
122         if (lock->l_conn_export == NULL) {
123                 static time64_t next_dump, last_dump;
124
125                 LDLM_ERROR(lock,
126                            "lock timed out (enqueued at %lld, %llds ago); not entering recovery in server code, just going back to sleep",
127                            (s64)lock->l_activity,
128                            (s64)(ktime_get_real_seconds() -
129                                  lock->l_activity));
130                 if (ktime_get_seconds() > next_dump) {
131                         last_dump = next_dump;
132                         next_dump = ktime_get_seconds() + 300;
133                         ldlm_namespace_dump(D_DLMTRACE,
134                                             ldlm_lock_to_ns(lock));
135                         if (last_dump == 0)
136                                 libcfs_debug_dumplog();
137                 }
138                 RETURN(0);
139         }
140
141         obd = lock->l_conn_export->exp_obd;
142         imp = obd->u.cli.cl_import;
143         ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
144         LDLM_ERROR(lock,
145                    "lock timed out (enqueued at %lld, %llds ago), entering recovery for %s@%s",
146                    (s64)lock->l_activity,
147                    (s64)(ktime_get_real_seconds() - lock->l_activity),
148                    obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
149
150         RETURN(0);
151 }
152
153 /**
154  * Calculate the Completion timeout (covering enqueue, BL AST, data flush,
155  * lock cancel, and their replies). Used for lock completion timeout on the
156  * client side.
157  *
158  * \param[in] lock        lock which is waiting the completion callback
159  *
160  * \retval            timeout in seconds to wait for the server reply
161  */
162 /*
163  * We use the same basis for both server side and client side functions
164  * from a single node.
165  */
166 static time64_t ldlm_cp_timeout(struct ldlm_lock *lock)
167 {
168         time64_t timeout;
169
170         if (AT_OFF)
171                 return obd_timeout;
172
173         /*
174          * Wait a long time for enqueue - server may have to callback a
175          * lock from another client.  Server will evict the other client if it
176          * doesn't respond reasonably, and then give us the lock.
177          */
178         timeout = at_get(ldlm_lock_to_ns_at(lock));
179         return max(3 * timeout, (time64_t) ldlm_enqueue_min);
180 }
181
182 /**
183  * Helper function for ldlm_completion_ast(), updating timings when lock is
184  * actually granted.
185  */
186 static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
187 {
188         time64_t delay;
189         int result = 0;
190
191         if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) {
192                 LDLM_DEBUG(lock, "client-side enqueue: destroyed");
193                 result = -EIO;
194         } else if (data == NULL) {
195                 LDLM_DEBUG(lock, "client-side enqueue: granted");
196         } else {
197                 /* Take into AT only CP RPC, not immediately granted locks */
198                 delay = ktime_get_real_seconds() - lock->l_activity;
199                 LDLM_DEBUG(lock, "client-side enqueue: granted after %llds",
200                            (s64)delay);
201
202                 /* Update our time estimate */
203                 at_measured(ldlm_lock_to_ns_at(lock), delay);
204         }
205         return result;
206 }
207
208 /**
209  * Implementation of ->l_completion_ast() for a client, that doesn't wait
210  * until lock is granted. Suitable for locks enqueued through ptlrpcd, of
211  * other threads that cannot block for long.
212  */
213 int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
214 {
215         ENTRY;
216
217         if (flags == LDLM_FL_WAIT_NOREPROC) {
218                 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
219                 RETURN(0);
220         }
221
222         if (!(flags & LDLM_FL_BLOCKED_MASK)) {
223                 wake_up(&lock->l_waitq);
224                 RETURN(ldlm_completion_tail(lock, data));
225         }
226
227         LDLM_DEBUG(lock,
228                    "client-side enqueue returned a blocked lock, going forward");
229         ldlm_reprocess_all(lock->l_resource);
230         RETURN(0);
231 }
232 EXPORT_SYMBOL(ldlm_completion_ast_async);
233
234 /**
235  * Generic LDLM "completion" AST. This is called in several cases:
236  *
237  *     - when a reply to an ENQUEUE RPC is received from the server
238  *       (ldlm_cli_enqueue_fini()). Lock might be granted or not granted at
239  *       this point (determined by flags);
240  *
241  *     - when LDLM_CP_CALLBACK RPC comes to client to notify it that lock has
242  *       been granted;
243  *
244  *     - when ldlm_lock_match(LDLM_FL_LVB_READY) is about to wait until lock
245  *       gets correct lvb;
246  *
247  *     - to force all locks when resource is destroyed (cleanup_resource());
248  *
249  * If lock is not granted in the first case, this function waits until second
250  * or penultimate cases happen in some other thread.
251  *
252  */
253 int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
254 {
255         /* XXX ALLOCATE - 160 bytes */
256         struct lock_wait_data lwd;
257         struct obd_device *obd;
258         struct obd_import *imp = NULL;
259         struct l_wait_info lwi;
260         time64_t timeout;
261         int rc = 0;
262
263         ENTRY;
264
265         if (flags == LDLM_FL_WAIT_NOREPROC) {
266                 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
267                 goto noreproc;
268         }
269
270         if (!(flags & LDLM_FL_BLOCKED_MASK)) {
271                 wake_up(&lock->l_waitq);
272                 RETURN(0);
273         }
274
275         LDLM_DEBUG(lock, "client-side enqueue returned a blocked locksleeping");
276
277 noreproc:
278
279         obd = class_exp2obd(lock->l_conn_export);
280
281         /* if this is a local lock, then there is no import */
282         if (obd != NULL)
283                 imp = obd->u.cli.cl_import;
284
285         timeout = ldlm_cp_timeout(lock);
286
287         lwd.lwd_lock = lock;
288         lock->l_activity = ktime_get_real_seconds();
289
290         if (ldlm_is_no_timeout(lock)) {
291                 LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
292                 lwi = LWI_INTR(interrupted_completion_wait, &lwd);
293         } else {
294                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
295                                        ldlm_expired_completion_wait,
296                                        interrupted_completion_wait, &lwd);
297         }
298
299         if (imp != NULL) {
300                 spin_lock(&imp->imp_lock);
301                 lwd.lwd_conn_cnt = imp->imp_conn_cnt;
302                 spin_unlock(&imp->imp_lock);
303         }
304
305         if (ns_is_client(ldlm_lock_to_ns(lock)) &&
306             OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST,
307                                  OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) {
308                 ldlm_set_fail_loc(lock);
309                 rc = -EINTR;
310         } else {
311                 /* Go to sleep until the lock is granted or cancelled. */
312                 rc = l_wait_event(lock->l_waitq,
313                                   is_granted_or_cancelled(lock), &lwi);
314         }
315
316         if (rc) {
317                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
318                            rc);
319                 RETURN(rc);
320         }
321
322         RETURN(ldlm_completion_tail(lock, data));
323 }
324 EXPORT_SYMBOL(ldlm_completion_ast);
325
326 /**
327  * A helper to build a blocking AST function
328  *
329  * Perform a common operation for blocking ASTs:
330  * defferred lock cancellation.
331  *
332  * \param lock the lock blocking or canceling AST was called on
333  * \retval 0
334  * \see mdt_blocking_ast
335  * \see ldlm_blocking_ast
336  */
337 int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock)
338 {
339         int do_ast;
340
341         ENTRY;
342
343         ldlm_set_cbpending(lock);
344         do_ast = (!lock->l_readers && !lock->l_writers);
345         unlock_res_and_lock(lock);
346
347         if (do_ast) {
348                 struct lustre_handle lockh;
349                 int rc;
350
351                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
352                 ldlm_lock2handle(lock, &lockh);
353                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
354                 if (rc < 0)
355                         CERROR("ldlm_cli_cancel: %d\n", rc);
356         } else {
357                 LDLM_DEBUG(lock,
358                            "Lock still has references, will be cancelled later");
359         }
360         RETURN(0);
361 }
362 EXPORT_SYMBOL(ldlm_blocking_ast_nocheck);
363
364 /**
365  * Server blocking AST
366  *
367  * ->l_blocking_ast() callback for LDLM locks acquired by server-side
368  * OBDs.
369  *
370  * \param lock the lock which blocks a request or cancelling lock
371  * \param desc unused
372  * \param data unused
373  * \param flag indicates whether this cancelling or blocking callback
374  * \retval 0
375  * \see ldlm_blocking_ast_nocheck
376  */
377 int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
378                       void *data, int flag)
379 {
380         ENTRY;
381
382         if (flag == LDLM_CB_CANCELING) {
383                 /* Don't need to do anything here. */
384                 RETURN(0);
385         }
386
387         lock_res_and_lock(lock);
388         /*
389          * Get this: if ldlm_blocking_ast is racing with intent_policy, such
390          * that ldlm_blocking_ast is called just before intent_policy method
391          * takes the lr_lock, then by the time we get the lock, we might not
392          * be the correct blocking function anymore.  So check, and return
393          * early, if so.
394          */
395         if (lock->l_blocking_ast != ldlm_blocking_ast) {
396                 unlock_res_and_lock(lock);
397                 RETURN(0);
398         }
399         RETURN(ldlm_blocking_ast_nocheck(lock));
400 }
401 EXPORT_SYMBOL(ldlm_blocking_ast);
402
403 /**
404  * Implements ldlm_lock::l_glimpse_ast for extent locks acquired on the server.
405  *
406  * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for that is
407  * rather subtle: with OST-side locking, it may so happen that _all_ extent
408  * locks are held by the OST. If client wants to obtain the current file size
409  * it calls ll_glimpse_size(), and (as all locks are held only on the server),
410  * this dummy glimpse callback fires and does nothing. The client still
411  * receives the correct file size due to the following fragment of code in
412  * ldlm_cb_interpret():
413  *
414  *      if (rc == -ELDLM_NO_LOCK_DATA) {
415  *              LDLM_DEBUG(lock, "lost race - client has a lock but no"
416  *                         "inode");
417  *              ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
418  *      }
419  *
420  * That is, after the glimpse returns this error, ofd_lvbo_update() is called
421  * and returns the updated file attributes from the inode to the client.
422  *
423  * See also comment in ofd_intent_policy() on why servers must set a non-NULL
424  * l_glimpse_ast when grabbing DLM locks.  Otherwise, the server will assume
425  * that the object is in the process of being destroyed.
426  *
427  * \param[in] lock      DLM lock being glimpsed, unused
428  * \param[in] reqp      pointer to ptlrpc_request, unused
429  *
430  * \retval              -ELDLM_NO_LOCK_DATA to get attributes from disk object
431  */
432 int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
433 {
434         return -ELDLM_NO_LOCK_DATA;
435 }
436
437 /**
438  * Enqueue a local lock (typically on a server).
439  */
440 int ldlm_cli_enqueue_local(const struct lu_env *env,
441                            struct ldlm_namespace *ns,
442                            const struct ldlm_res_id *res_id,
443                            enum ldlm_type type, union ldlm_policy_data *policy,
444                            enum ldlm_mode mode, __u64 *flags,
445                            ldlm_blocking_callback blocking,
446                            ldlm_completion_callback completion,
447                            ldlm_glimpse_callback glimpse,
448                            void *data, __u32 lvb_len, enum lvb_type lvb_type,
449                            const __u64 *client_cookie,
450                            struct lustre_handle *lockh)
451 {
452         struct ldlm_lock *lock;
453         int err;
454         const struct ldlm_callback_suite cbs = { .lcs_completion = completion,
455                                                  .lcs_blocking   = blocking,
456                                                  .lcs_glimpse    = glimpse,
457         };
458
459         ENTRY;
460
461         LASSERT(!(*flags & LDLM_FL_REPLAY));
462         if (unlikely(ns_is_client(ns))) {
463                 CERROR("Trying to enqueue local lock in a shadow namespace\n");
464                 LBUG();
465         }
466
467         lock = ldlm_lock_create(ns, res_id, type, mode, &cbs, data, lvb_len,
468                                 lvb_type);
469         if (IS_ERR(lock))
470                 GOTO(out_nolock, err = PTR_ERR(lock));
471
472         err = ldlm_lvbo_init(lock->l_resource);
473         if (err < 0) {
474                 LDLM_ERROR(lock, "delayed lvb init failed (rc %d)", err);
475                 ldlm_lock_destroy_nolock(lock);
476                 GOTO(out, err);
477         }
478
479         ldlm_lock2handle(lock, lockh);
480
481         /*
482          * NB: we don't have any lock now (lock_res_and_lock)
483          * because it's a new lock
484          */
485         ldlm_lock_addref_internal_nolock(lock, mode);
486         ldlm_set_local(lock);
487         if (*flags & LDLM_FL_ATOMIC_CB)
488                 ldlm_set_atomic_cb(lock);
489
490         if (*flags & LDLM_FL_CANCEL_ON_BLOCK)
491                 ldlm_set_cancel_on_block(lock);
492
493         if (policy != NULL)
494                 lock->l_policy_data = *policy;
495         if (client_cookie != NULL)
496                 lock->l_client_cookie = *client_cookie;
497         if (type == LDLM_EXTENT) {
498                 /* extent lock without policy is a bug */
499                 if (policy == NULL)
500                         LBUG();
501
502                 lock->l_req_extent = policy->l_extent;
503         }
504
505         err = ldlm_lock_enqueue(env, ns, &lock, policy, flags);
506         if (unlikely(err != ELDLM_OK))
507                 GOTO(out, err);
508
509         if (policy != NULL)
510                 *policy = lock->l_policy_data;
511
512         if (lock->l_completion_ast)
513                 lock->l_completion_ast(lock, *flags, NULL);
514
515         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
516         EXIT;
517  out:
518         LDLM_LOCK_RELEASE(lock);
519  out_nolock:
520         return err;
521 }
522 EXPORT_SYMBOL(ldlm_cli_enqueue_local);
523
524 static void failed_lock_cleanup(struct ldlm_namespace *ns,
525                                 struct ldlm_lock *lock, int mode)
526 {
527         int need_cancel = 0;
528
529         /* Set a flag to prevent us from sending a CANCEL (b=407) */
530         lock_res_and_lock(lock);
531         /* Check that lock is not granted or failed, we might race. */
532         if (!ldlm_is_granted(lock) && !ldlm_is_failed(lock)) {
533                 /*
534                  * Make sure that this lock will not be found by raced
535                  * bl_ast and -EINVAL reply is sent to server anyways.
536                  * b=17645
537                  */
538                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
539                                  LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
540                 need_cancel = 1;
541         }
542         unlock_res_and_lock(lock);
543
544         if (need_cancel)
545                 LDLM_DEBUG(lock,
546                            "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING");
547         else
548                 LDLM_DEBUG(lock, "lock was granted or failed in race");
549
550         /*
551          * XXX - HACK because we shouldn't call ldlm_lock_destroy()
552          *       from llite/file.c/ll_file_flock().
553          */
554         /*
555          * This code makes for the fact that we do not have blocking handler on
556          * a client for flock locks. As such this is the place where we must
557          * completely kill failed locks. (interrupted and those that
558          * were waiting to be granted when server evicted us.
559          */
560         if (lock->l_resource->lr_type == LDLM_FLOCK) {
561                 lock_res_and_lock(lock);
562                 if (!ldlm_is_destroyed(lock)) {
563                         ldlm_resource_unlink_lock(lock);
564                         ldlm_lock_decref_internal_nolock(lock, mode);
565                         ldlm_lock_destroy_nolock(lock);
566                 }
567                 unlock_res_and_lock(lock);
568         } else {
569                 ldlm_lock_decref_internal(lock, mode);
570         }
571 }
572
573 /**
574  * Finishing portion of client lock enqueue code.
575  *
576  * Called after receiving reply from server.
577  */
578 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
579                           enum ldlm_type type, __u8 with_policy,
580                           enum ldlm_mode mode, __u64 *flags, void *lvb,
581                           __u32 lvb_len, const struct lustre_handle *lockh,
582                           int rc)
583 {
584         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
585         const struct lu_env *env = NULL;
586         int is_replay = *flags & LDLM_FL_REPLAY;
587         struct ldlm_lock *lock;
588         struct ldlm_reply *reply;
589         int cleanup_phase = 1;
590
591         ENTRY;
592
593         if (req && req->rq_svc_thread)
594                 env = req->rq_svc_thread->t_env;
595
596         lock = ldlm_handle2lock(lockh);
597         /* ldlm_cli_enqueue is holding a reference on this lock. */
598         if (!lock) {
599                 LASSERT(type == LDLM_FLOCK);
600                 RETURN(-ENOLCK);
601         }
602
603         LASSERTF(ergo(lvb_len != 0, lvb_len == lock->l_lvb_len),
604                  "lvb_len = %d, l_lvb_len = %d\n", lvb_len, lock->l_lvb_len);
605
606         if (rc != ELDLM_OK) {
607                 LASSERT(!is_replay);
608                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
609                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
610
611                 if (rc != ELDLM_LOCK_ABORTED)
612                         GOTO(cleanup, rc);
613         }
614
615         /* Before we return, swab the reply */
616         reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
617         if (reply == NULL)
618                 GOTO(cleanup, rc = -EPROTO);
619
620         if (lvb_len > 0) {
621                 int size = 0;
622
623                 size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB,
624                                             RCL_SERVER);
625                 if (size < 0) {
626                         LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", size);
627                         GOTO(cleanup, rc = size);
628                 } else if (unlikely(size > lvb_len)) {
629                         LDLM_ERROR(lock,
630                                    "Replied LVB is larger than expectation, expected = %d, replied = %d",
631                                    lvb_len, size);
632                         GOTO(cleanup, rc = -EINVAL);
633                 }
634                 lvb_len = size;
635         }
636
637         if (rc == ELDLM_LOCK_ABORTED) {
638                 if (lvb_len > 0 && lvb != NULL)
639                         rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
640                                            lvb, lvb_len);
641                 GOTO(cleanup, rc = rc ? : ELDLM_LOCK_ABORTED);
642         }
643
644         /* lock enqueued on the server */
645         cleanup_phase = 0;
646
647         lock_res_and_lock(lock);
648         /* Key change rehash lock in per-export hash with new key */
649         if (exp->exp_lock_hash) {
650                 /*
651                  * In the function below, .hs_keycmp resolves to
652                  * ldlm_export_lock_keycmp()
653                  */
654                 /* coverity[overrun-buffer-val] */
655                 cfs_hash_rehash_key(exp->exp_lock_hash,
656                                     &lock->l_remote_handle,
657                                     &reply->lock_handle,
658                                     &lock->l_exp_hash);
659         } else {
660                 lock->l_remote_handle = reply->lock_handle;
661         }
662
663         *flags = ldlm_flags_from_wire(reply->lock_flags);
664         lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
665                                               LDLM_FL_INHERIT_MASK);
666         unlock_res_and_lock(lock);
667
668         CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: %#llx\n",
669                lock, reply->lock_handle.cookie, *flags);
670
671         /*
672          * If enqueue returned a blocked lock but the completion handler has
673          * already run, then it fixed up the resource and we don't need to do it
674          * again.
675          */
676         if ((*flags) & LDLM_FL_LOCK_CHANGED) {
677                 int newmode = reply->lock_desc.l_req_mode;
678
679                 LASSERT(!is_replay);
680                 if (newmode && newmode != lock->l_req_mode) {
681                         LDLM_DEBUG(lock, "server returned different mode %s",
682                                    ldlm_lockname[newmode]);
683                         lock->l_req_mode = newmode;
684                 }
685
686                 if (!ldlm_res_eq(&reply->lock_desc.l_resource.lr_name,
687                                  &lock->l_resource->lr_name)) {
688                         CDEBUG(D_INFO,
689                                "remote intent success, locking "DLDLMRES", instead of "DLDLMRES"\n",
690                                PLDLMRES(&reply->lock_desc.l_resource),
691                                PLDLMRES(lock->l_resource));
692
693                         rc = ldlm_lock_change_resource(ns, lock,
694                                         &reply->lock_desc.l_resource.lr_name);
695                         if (rc || lock->l_resource == NULL)
696                                 GOTO(cleanup, rc = -ENOMEM);
697                         LDLM_DEBUG(lock, "client-side enqueue, new resource");
698                 }
699
700                 if (with_policy) {
701                         /* We assume lock type cannot change on server*/
702                         ldlm_convert_policy_to_local(exp,
703                                                 lock->l_resource->lr_type,
704                                                 &reply->lock_desc.l_policy_data,
705                                                 &lock->l_policy_data);
706                 }
707
708                 if (type != LDLM_PLAIN)
709                         LDLM_DEBUG(lock,
710                                    "client-side enqueue, new policy data");
711         }
712
713         if ((*flags) & LDLM_FL_AST_SENT) {
714                 lock_res_and_lock(lock);
715                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
716                 unlock_res_and_lock(lock);
717                 LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
718         }
719
720         /*
721          * If the lock has already been granted by a completion AST, don't
722          * clobber the LVB with an older one.
723          */
724         if (lvb_len > 0) {
725                 /*
726                  * We must lock or a racing completion might update lvb without
727                  * letting us know and we'll clobber the correct value.
728                  * Cannot unlock after the check either, a that still leaves
729                  * a tiny window for completion to get in
730                  */
731                 lock_res_and_lock(lock);
732                 if (!ldlm_is_granted(lock))
733                         rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER,
734                                            lock->l_lvb_data, lvb_len);
735                 unlock_res_and_lock(lock);
736                 if (rc < 0) {
737                         cleanup_phase = 1;
738                         GOTO(cleanup, rc);
739                 }
740         }
741
742         if (!is_replay) {
743                 rc = ldlm_lock_enqueue(env, ns, &lock, NULL, flags);
744                 if (lock->l_completion_ast != NULL) {
745                         int err = lock->l_completion_ast(lock, *flags, NULL);
746
747                         if (!rc)
748                                 rc = err;
749                         if (rc)
750                                 cleanup_phase = 1;
751                 }
752         }
753
754         if (lvb_len > 0 && lvb != NULL) {
755                 /*
756                  * Copy the LVB here, and not earlier, because the completion
757                  * AST (if any) can override what we got in the reply
758                  */
759                 memcpy(lvb, lock->l_lvb_data, lvb_len);
760         }
761
762         LDLM_DEBUG(lock, "client-side enqueue END");
763         EXIT;
764 cleanup:
765         if (cleanup_phase == 1 && rc)
766                 failed_lock_cleanup(ns, lock, mode);
767         /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
768         LDLM_LOCK_PUT(lock);
769         LDLM_LOCK_RELEASE(lock);
770         return rc;
771 }
772 EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
773
774 /**
775  * Estimate number of lock handles that would fit into request of given
776  * size.  PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
777  * a single page on the send/receive side. XXX: 512 should be changed to
778  * more adequate value.
779  */
780 static inline int ldlm_req_handles_avail(int req_size, int off)
781 {
782         int avail;
783
784         avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512) - req_size;
785         if (likely(avail >= 0))
786                 avail /= (int)sizeof(struct lustre_handle);
787         else
788                 avail = 0;
789         avail += LDLM_LOCKREQ_HANDLES - off;
790
791         return avail;
792 }
793
794 static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
795                                              enum req_location loc,
796                                              int off)
797 {
798         __u32 size = req_capsule_msg_size(pill, loc);
799
800         return ldlm_req_handles_avail(size, off);
801 }
802
803 static inline int ldlm_format_handles_avail(struct obd_import *imp,
804                                             const struct req_format *fmt,
805                                             enum req_location loc, int off)
806 {
807         __u32 size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
808
809         return ldlm_req_handles_avail(size, off);
810 }
811
812 /**
813  * Cancel LRU locks and pack them into the enqueue request. Pack there the given
814  * \a count locks in \a cancels.
815  *
816  * This is to be called by functions preparing their own requests that
817  * might contain lists of locks to cancel in addition to actual operation
818  * that needs to be performed.
819  */
820 int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
821                       int version, int opc, int canceloff,
822                       struct list_head *cancels, int count)
823 {
824         struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
825         struct req_capsule      *pill = &req->rq_pill;
826         struct ldlm_request     *dlm = NULL;
827         struct list_head        head = LIST_HEAD_INIT(head);
828         enum ldlm_lru_flags lru_flags;
829         int avail, to_free, pack = 0;
830         int rc;
831
832         ENTRY;
833
834         if (cancels == NULL)
835                 cancels = &head;
836         if (ns_connect_cancelset(ns)) {
837                 /* Estimate the amount of available space in the request. */
838                 req_capsule_filled_sizes(pill, RCL_CLIENT);
839                 avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
840
841                 lru_flags = LDLM_LRU_FLAG_NO_WAIT | (ns_connect_lru_resize(ns) ?
842                         LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED);
843                 to_free = !ns_connect_lru_resize(ns) &&
844                         opc == LDLM_ENQUEUE ? 1 : 0;
845
846                 /*
847                  * Cancel LRU locks here _only_ if the server supports
848                  * EARLY_CANCEL. Otherwise we have to send extra CANCEL
849                  * RPC, which will make us slower.
850                  */
851                 if (avail > count)
852                         count += ldlm_cancel_lru_local(ns, cancels, to_free,
853                                                        avail - count, 0,
854                                                        lru_flags);
855                 if (avail > count)
856                         pack = count;
857                 else
858                         pack = avail;
859                 req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT,
860                                      ldlm_request_bufsize(pack, opc));
861         }
862
863         rc = ptlrpc_request_pack(req, version, opc);
864         if (rc) {
865                 ldlm_lock_list_put(cancels, l_bl_ast, count);
866                 RETURN(rc);
867         }
868
869         if (ns_connect_cancelset(ns)) {
870                 if (canceloff) {
871                         dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
872                         LASSERT(dlm);
873                         /*
874                          * Skip first lock handler in ldlm_request_pack(),
875                          * this method will increment @lock_count according
876                          * to the lock handle amount actually written to
877                          * the buffer.
878                          */
879                         dlm->lock_count = canceloff;
880                 }
881                 /* Pack into the request @pack lock handles. */
882                 ldlm_cli_cancel_list(cancels, pack, req, 0);
883                 /* Prepare and send separate cancel RPC for others. */
884                 ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
885         } else {
886                 ldlm_lock_list_put(cancels, l_bl_ast, count);
887         }
888         RETURN(0);
889 }
890 EXPORT_SYMBOL(ldlm_prep_elc_req);
891
892 int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
893                           struct list_head *cancels, int count)
894 {
895         return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
896                                  LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
897 }
898 EXPORT_SYMBOL(ldlm_prep_enqueue_req);
899
900 struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len)
901 {
902         struct ptlrpc_request *req;
903         int rc;
904
905         ENTRY;
906
907         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
908         if (req == NULL)
909                 RETURN(ERR_PTR(-ENOMEM));
910
911         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
912         if (rc) {
913                 ptlrpc_request_free(req);
914                 RETURN(ERR_PTR(rc));
915         }
916
917         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
918         ptlrpc_request_set_replen(req);
919         RETURN(req);
920 }
921 EXPORT_SYMBOL(ldlm_enqueue_pack);
922
923 /**
924  * Client-side lock enqueue.
925  *
926  * If a request has some specific initialisation it is passed in \a reqp,
927  * otherwise it is created in ldlm_cli_enqueue.
928  *
929  * Supports sync and async requests, pass \a async flag accordingly. If a
930  * request was created in ldlm_cli_enqueue and it is the async request,
931  * pass it to the caller in \a reqp.
932  */
933 int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
934                      struct ldlm_enqueue_info *einfo,
935                      const struct ldlm_res_id *res_id,
936                      union ldlm_policy_data const *policy, __u64 *flags,
937                      void *lvb, __u32 lvb_len, enum lvb_type lvb_type,
938                      struct lustre_handle *lockh, int async)
939 {
940         struct ldlm_namespace *ns;
941         struct ldlm_lock      *lock;
942         struct ldlm_request   *body;
943         int                    is_replay = *flags & LDLM_FL_REPLAY;
944         int                    req_passed_in = 1;
945         int                    rc, err;
946         struct ptlrpc_request *req;
947
948         ENTRY;
949
950         LASSERT(exp != NULL);
951
952         ns = exp->exp_obd->obd_namespace;
953
954         /*
955          * If we're replaying this lock, just check some invariants.
956          * If we're creating a new lock, get everything all setup nice.
957          */
958         if (is_replay) {
959                 lock = ldlm_handle2lock_long(lockh, 0);
960                 LASSERT(lock != NULL);
961                 LDLM_DEBUG(lock, "client-side enqueue START");
962                 LASSERT(exp == lock->l_conn_export);
963         } else {
964                 const struct ldlm_callback_suite cbs = {
965                         .lcs_completion = einfo->ei_cb_cp,
966                         .lcs_blocking   = einfo->ei_cb_bl,
967                         .lcs_glimpse    = einfo->ei_cb_gl
968                 };
969                 lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
970                                         einfo->ei_mode, &cbs, einfo->ei_cbdata,
971                                         lvb_len, lvb_type);
972                 if (IS_ERR(lock))
973                         RETURN(PTR_ERR(lock));
974
975                 if (einfo->ei_cb_created)
976                         einfo->ei_cb_created(lock);
977
978                 /* for the local lock, add the reference */
979                 ldlm_lock_addref_internal(lock, einfo->ei_mode);
980                 ldlm_lock2handle(lock, lockh);
981                 if (policy != NULL)
982                         lock->l_policy_data = *policy;
983
984                 if (einfo->ei_type == LDLM_EXTENT) {
985                         /* extent lock without policy is a bug */
986                         if (policy == NULL)
987                                 LBUG();
988
989                         lock->l_req_extent = policy->l_extent;
990                 }
991                 LDLM_DEBUG(lock, "client-side enqueue START, flags %#llx",
992                            *flags);
993         }
994
995         lock->l_conn_export = exp;
996         lock->l_export = NULL;
997         lock->l_blocking_ast = einfo->ei_cb_bl;
998         lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
999         lock->l_activity = ktime_get_real_seconds();
1000
1001         /* lock not sent to server yet */
1002         if (reqp == NULL || *reqp == NULL) {
1003                 req = ldlm_enqueue_pack(exp, lvb_len);
1004                 if (IS_ERR(req)) {
1005                         failed_lock_cleanup(ns, lock, einfo->ei_mode);
1006                         LDLM_LOCK_RELEASE(lock);
1007                         RETURN(PTR_ERR(req));
1008                 }
1009
1010                 req_passed_in = 0;
1011                 if (reqp)
1012                         *reqp = req;
1013         } else {
1014                 int len;
1015
1016                 req = *reqp;
1017                 len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
1018                                            RCL_CLIENT);
1019                 LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
1020                          DLM_LOCKREQ_OFF, len, (int)sizeof(*body));
1021         }
1022
1023         if (*flags & LDLM_FL_NDELAY) {
1024                 DEBUG_REQ(D_DLMTRACE, req, "enque lock with no delay\n");
1025                 req->rq_no_resend = req->rq_no_delay = 1;
1026                 /*
1027                  * probably set a shorter timeout value and handle ETIMEDOUT
1028                  * in osc_lock_upcall() correctly
1029                  */
1030                 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
1031         }
1032
1033         /* Dump lock data into the request buffer */
1034         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1035         ldlm_lock2desc(lock, &body->lock_desc);
1036         body->lock_flags = ldlm_flags_to_wire(*flags);
1037         body->lock_handle[0] = *lockh;
1038
1039         /* extended LDLM opcodes in client stats */
1040         if (exp->exp_obd->obd_svc_stats != NULL) {
1041                 bool glimpse = *flags & LDLM_FL_HAS_INTENT;
1042
1043                 /* OST glimpse has no intent buffer */
1044                 if (req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
1045                                           RCL_CLIENT)) {
1046                         struct ldlm_intent *it;
1047
1048                         it = req_capsule_client_get(&req->rq_pill,
1049                                                     &RMF_LDLM_INTENT);
1050                         glimpse = (it && (it->opc == IT_GLIMPSE));
1051                 }
1052
1053                 if (!glimpse)
1054                         ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
1055                 else
1056                         lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
1057                                              PTLRPC_LAST_CNTR +
1058                                              LDLM_GLIMPSE_ENQUEUE);
1059         }
1060
1061         if (async) {
1062                 LASSERT(reqp != NULL);
1063                 RETURN(0);
1064         }
1065
1066         LDLM_DEBUG(lock, "sending request");
1067
1068         rc = ptlrpc_queue_wait(req);
1069
1070         err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
1071                                     einfo->ei_mode, flags, lvb, lvb_len,
1072                                     lockh, rc);
1073
1074         /*
1075          * If ldlm_cli_enqueue_fini did not find the lock, we need to free
1076          * one reference that we took
1077          */
1078         if (err == -ENOLCK)
1079                 LDLM_LOCK_RELEASE(lock);
1080         else
1081                 rc = err;
1082
1083         if (!req_passed_in && req != NULL) {
1084                 ptlrpc_req_finished(req);
1085                 if (reqp)
1086                         *reqp = NULL;
1087         }
1088
1089         RETURN(rc);
1090 }
1091 EXPORT_SYMBOL(ldlm_cli_enqueue);
1092
1093 /**
1094  * Client-side lock convert reply handling.
1095  *
1096  * Finish client lock converting, checks for concurrent converts
1097  * and clear 'converting' flag so lock can be placed back into LRU.
1098  */
1099 static int lock_convert_interpret(const struct lu_env *env,
1100                                   struct ptlrpc_request *req,
1101                                   void *args, int rc)
1102 {
1103         struct ldlm_async_args *aa = args;
1104         struct ldlm_lock *lock;
1105         struct ldlm_reply *reply;
1106
1107         ENTRY;
1108
1109         lock = ldlm_handle2lock(&aa->lock_handle);
1110         if (!lock) {
1111                 LDLM_DEBUG_NOLOCK("convert ACK for unknown local cookie %#llx",
1112                         aa->lock_handle.cookie);
1113                 RETURN(-ESTALE);
1114         }
1115
1116         LDLM_DEBUG(lock, "CONVERTED lock:");
1117
1118         if (rc != ELDLM_OK)
1119                 GOTO(out, rc);
1120
1121         reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1122         if (reply == NULL)
1123                 GOTO(out, rc = -EPROTO);
1124
1125         if (reply->lock_handle.cookie != aa->lock_handle.cookie) {
1126                 LDLM_ERROR(lock,
1127                            "convert ACK with wrong lock cookie %#llx but cookie %#llx from server %s id %s\n",
1128                            aa->lock_handle.cookie, reply->lock_handle.cookie,
1129                            req->rq_export->exp_client_uuid.uuid,
1130                            libcfs_id2str(req->rq_peer));
1131                 GOTO(out, rc = ELDLM_NO_LOCK_DATA);
1132         }
1133
1134         lock_res_and_lock(lock);
1135         /*
1136          * Lock convert is sent for any new bits to drop, the converting flag
1137          * is dropped when ibits on server are the same as on client. Meanwhile
1138          * that can be so that more later convert will be replied first with
1139          * and clear converting flag, so in case of such race just exit here.
1140          * if lock has no converting bits then
1141          */
1142         if (!ldlm_is_converting(lock)) {
1143                 LDLM_DEBUG(lock,
1144                            "convert ACK for lock without converting flag, reply ibits %#llx",
1145                            reply->lock_desc.l_policy_data.l_inodebits.bits);
1146         } else if (reply->lock_desc.l_policy_data.l_inodebits.bits !=
1147                    lock->l_policy_data.l_inodebits.bits) {
1148                 /*
1149                  * Compare server returned lock ibits and local lock ibits
1150                  * if they are the same we consider convertion is done,
1151                  * otherwise we have more converts inflight and keep
1152                  * converting flag.
1153                  */
1154                 LDLM_DEBUG(lock, "convert ACK with ibits %#llx\n",
1155                            reply->lock_desc.l_policy_data.l_inodebits.bits);
1156         } else {
1157                 ldlm_clear_converting(lock);
1158
1159                 /*
1160                  * Concurrent BL AST may arrive and cause another convert
1161                  * or cancel so just do nothing here if bl_ast is set,
1162                  * finish with convert otherwise.
1163                  */
1164                 if (!ldlm_is_bl_ast(lock)) {
1165                         struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
1166
1167                         /*
1168                          * Drop cancel_bits since there are no more converts
1169                          * and put lock into LRU if it is still not used and
1170                          * is not there yet.
1171                          */
1172                         lock->l_policy_data.l_inodebits.cancel_bits = 0;
1173                         if (!lock->l_readers && !lock->l_writers &&
1174                             !ldlm_is_canceling(lock)) {
1175                                 spin_lock(&ns->ns_lock);
1176                                 /* there is check for list_empty() inside */
1177                                 ldlm_lock_remove_from_lru_nolock(lock);
1178                                 ldlm_lock_add_to_lru_nolock(lock);
1179                                 spin_unlock(&ns->ns_lock);
1180                         }
1181                 }
1182         }
1183         unlock_res_and_lock(lock);
1184 out:
1185         if (rc) {
1186                 int flag;
1187
1188                 lock_res_and_lock(lock);
1189                 if (ldlm_is_converting(lock)) {
1190                         ldlm_clear_converting(lock);
1191                         ldlm_set_cbpending(lock);
1192                         ldlm_set_bl_ast(lock);
1193                         lock->l_policy_data.l_inodebits.cancel_bits = 0;
1194                 }
1195                 unlock_res_and_lock(lock);
1196
1197                 /*
1198                  * fallback to normal lock cancel. If rc means there is no
1199                  * valid lock on server, do only local cancel
1200                  */
1201                 if (rc == ELDLM_NO_LOCK_DATA)
1202                         flag = LCF_LOCAL;
1203                 else
1204                         flag = LCF_ASYNC;
1205
1206                 rc = ldlm_cli_cancel(&aa->lock_handle, flag);
1207                 if (rc < 0)
1208                         LDLM_DEBUG(lock, "failed to cancel lock: rc = %d\n",
1209                                    rc);
1210         }
1211         LDLM_LOCK_PUT(lock);
1212         RETURN(rc);
1213 }
1214
1215 /**
1216  * Client-side IBITS lock convert.
1217  *
1218  * Inform server that lock has been converted instead of canceling.
1219  * Server finishes convert on own side and does reprocess to grant
1220  * all related waiting locks.
1221  *
1222  * Since convert means only ibits downgrading, client doesn't need to
1223  * wait for server reply to finish local converting process so this request
1224  * is made asynchronous.
1225  *
1226  */
1227 int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
1228 {
1229         struct ldlm_request *body;
1230         struct ptlrpc_request *req;
1231         struct ldlm_async_args *aa;
1232         struct obd_export *exp = lock->l_conn_export;
1233
1234         ENTRY;
1235
1236         if (exp == NULL) {
1237                 LDLM_ERROR(lock, "convert must not be called on local locks.");
1238                 RETURN(-EINVAL);
1239         }
1240
1241         /*
1242          * this is better to check earlier and it is done so already,
1243          * but this check is kept too as final one to issue an error
1244          * if any new code will miss such check.
1245          */
1246         if (!exp_connect_lock_convert(exp)) {
1247                 LDLM_ERROR(lock, "server doesn't support lock convert\n");
1248                 RETURN(-EPROTO);
1249         }
1250
1251         if (lock->l_resource->lr_type != LDLM_IBITS) {
1252                 LDLM_ERROR(lock, "convert works with IBITS locks only.");
1253                 RETURN(-EINVAL);
1254         }
1255
1256         LDLM_DEBUG(lock, "client-side convert");
1257
1258         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
1259                                         &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
1260                                         LDLM_CONVERT);
1261         if (req == NULL)
1262                 RETURN(-ENOMEM);
1263
1264         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1265         body->lock_handle[0] = lock->l_remote_handle;
1266
1267         body->lock_desc.l_req_mode = lock->l_req_mode;
1268         body->lock_desc.l_granted_mode = lock->l_granted_mode;
1269
1270         body->lock_desc.l_policy_data.l_inodebits.bits =
1271                                         lock->l_policy_data.l_inodebits.bits;
1272         body->lock_desc.l_policy_data.l_inodebits.cancel_bits = 0;
1273
1274         body->lock_flags = ldlm_flags_to_wire(*flags);
1275         body->lock_count = 1;
1276
1277         ptlrpc_request_set_replen(req);
1278
1279         /*
1280          * Use cancel portals for convert as well as high-priority handling.
1281          */
1282         req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
1283         req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
1284
1285         ptlrpc_at_set_req_timeout(req);
1286
1287         if (exp->exp_obd->obd_svc_stats != NULL)
1288                 lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
1289                                      LDLM_CONVERT - LDLM_FIRST_OPC);
1290
1291         aa = ptlrpc_req_async_args(req);
1292         ldlm_lock2handle(lock, &aa->lock_handle);
1293         req->rq_interpret_reply = lock_convert_interpret;
1294
1295         ptlrpcd_add_req(req);
1296         RETURN(0);
1297 }
1298
1299 /**
1300  * Cancel locks locally.
1301  * Returns:
1302  * \retval LDLM_FL_LOCAL_ONLY if there is no need for a CANCEL RPC to the server
1303  * \retval LDLM_FL_CANCELING otherwise;
1304  * \retval LDLM_FL_BL_AST if there is a need for a separate CANCEL RPC.
1305  */
1306 static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
1307 {
1308         __u64 rc = LDLM_FL_LOCAL_ONLY;
1309
1310         ENTRY;
1311
1312         if (lock->l_conn_export) {
1313                 bool local_only;
1314
1315                 LDLM_DEBUG(lock, "client-side cancel");
1316                 /* Set this flag to prevent others from getting new references*/
1317                 lock_res_and_lock(lock);
1318                 ldlm_set_cbpending(lock);
1319                 local_only = !!(lock->l_flags &
1320                                 (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
1321                 ldlm_cancel_callback(lock);
1322                 rc = (ldlm_is_bl_ast(lock)) ?
1323                         LDLM_FL_BL_AST : LDLM_FL_CANCELING;
1324                 unlock_res_and_lock(lock);
1325
1326                 if (local_only) {
1327                         CDEBUG(D_DLMTRACE,
1328                                "not sending request (at caller's instruction)\n");
1329                         rc = LDLM_FL_LOCAL_ONLY;
1330                 }
1331                 ldlm_lock_cancel(lock);
1332         } else {
1333                 if (ns_is_client(ldlm_lock_to_ns(lock))) {
1334                         LDLM_ERROR(lock, "Trying to cancel local lock");
1335                         LBUG();
1336                 }
1337                 LDLM_DEBUG(lock, "server-side local cancel");
1338                 ldlm_lock_cancel(lock);
1339                 ldlm_reprocess_all(lock->l_resource);
1340         }
1341
1342         RETURN(rc);
1343 }
1344
1345 /**
1346  * Pack \a count locks in \a head into ldlm_request buffer of request \a req.
1347  */
1348 static void ldlm_cancel_pack(struct ptlrpc_request *req,
1349                              struct list_head *head, int count)
1350 {
1351         struct ldlm_request *dlm;
1352         struct ldlm_lock *lock;
1353         int max, packed = 0;
1354
1355         ENTRY;
1356
1357         dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1358         LASSERT(dlm != NULL);
1359
1360         /* Check the room in the request buffer. */
1361         max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) -
1362                 sizeof(struct ldlm_request);
1363         max /= sizeof(struct lustre_handle);
1364         max += LDLM_LOCKREQ_HANDLES;
1365         LASSERT(max >= dlm->lock_count + count);
1366
1367         /*
1368          * XXX: it would be better to pack lock handles grouped by resource.
1369          * so that the server cancel would call filter_lvbo_update() less
1370          * frequently.
1371          */
1372         list_for_each_entry(lock, head, l_bl_ast) {
1373                 if (!count--)
1374                         break;
1375                 LASSERT(lock->l_conn_export);
1376                 /* Pack the lock handle to the given request buffer. */
1377                 LDLM_DEBUG(lock, "packing");
1378                 dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
1379                 packed++;
1380         }
1381         CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
1382         EXIT;
1383 }
1384
1385 /**
1386  * Prepare and send a batched cancel RPC. It will include \a count lock
1387  * handles of locks given in \a cancels list.
1388  */
1389 int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
1390                         int count, enum ldlm_cancel_flags flags)
1391 {
1392         struct ptlrpc_request *req = NULL;
1393         struct obd_import *imp;
1394         int free, sent = 0;
1395         int rc = 0;
1396
1397         ENTRY;
1398
1399         LASSERT(exp != NULL);
1400         LASSERT(count > 0);
1401
1402         CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val);
1403
1404         if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
1405                 RETURN(count);
1406
1407         free = ldlm_format_handles_avail(class_exp2cliimp(exp),
1408                                          &RQF_LDLM_CANCEL, RCL_CLIENT, 0);
1409         if (count > free)
1410                 count = free;
1411
1412         while (1) {
1413                 imp = class_exp2cliimp(exp);
1414                 if (imp == NULL || imp->imp_invalid) {
1415                         CDEBUG(D_DLMTRACE,
1416                                "skipping cancel on invalid import %p\n", imp);
1417                         RETURN(count);
1418                 }
1419
1420                 req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
1421                 if (req == NULL)
1422                         GOTO(out, rc = -ENOMEM);
1423
1424                 req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT);
1425                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
1426                                      ldlm_request_bufsize(count, LDLM_CANCEL));
1427
1428                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
1429                 if (rc) {
1430                         ptlrpc_request_free(req);
1431                         GOTO(out, rc);
1432                 }
1433
1434                 /*
1435                  * If OSP want cancel cross-MDT lock, let's not block it in
1436                  * in recovery, otherwise the lock will not released, if
1437                  * the remote target is also in recovery, and it also need
1438                  * this lock, it might cause deadlock.
1439                  */
1440                 if (exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS &&
1441                     exp->exp_obd->obd_lu_dev != NULL &&
1442                     exp->exp_obd->obd_lu_dev->ld_site != NULL) {
1443                         struct lu_device *top_dev;
1444
1445                         top_dev = exp->exp_obd->obd_lu_dev->ld_site->ls_top_dev;
1446                         if (top_dev != NULL &&
1447                             top_dev->ld_obd->obd_recovering)
1448                                 req->rq_allow_replay = 1;
1449                 }
1450
1451                 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
1452                 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
1453                 ptlrpc_at_set_req_timeout(req);
1454
1455                 ldlm_cancel_pack(req, cancels, count);
1456
1457                 ptlrpc_request_set_replen(req);
1458                 if (flags & LCF_ASYNC) {
1459                         ptlrpcd_add_req(req);
1460                         sent = count;
1461                         GOTO(out, 0);
1462                 }
1463
1464                 rc = ptlrpc_queue_wait(req);
1465                 if (rc == LUSTRE_ESTALE) {
1466                         CDEBUG(D_DLMTRACE,
1467                                "client/server (nid %s) out of sync -- not fatal\n",
1468                                libcfs_nid2str(req->rq_import->imp_connection->c_peer.nid));
1469                         rc = 0;
1470                 } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
1471                            req->rq_import_generation == imp->imp_generation) {
1472                         ptlrpc_req_finished(req);
1473                         continue;
1474                 } else if (rc != ELDLM_OK) {
1475                         /* -ESHUTDOWN is common on umount */
1476                         CDEBUG_LIMIT(rc == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
1477                                      "Got rc %d from cancel RPC: canceling anyway\n",
1478                                      rc);
1479                         break;
1480                 }
1481                 sent = count;
1482                 break;
1483         }
1484
1485         ptlrpc_req_finished(req);
1486         EXIT;
1487 out:
1488         return sent ? sent : rc;
1489 }
1490
1491 static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
1492 {
1493         LASSERT(imp != NULL);
1494         return &imp->imp_obd->obd_namespace->ns_pool;
1495 }
1496
1497 /**
1498  * Update client's OBD pool related fields with new SLV and Limit from \a req.
1499  */
1500 int ldlm_cli_update_pool(struct ptlrpc_request *req)
1501 {
1502         struct obd_device *obd;
1503         __u64 new_slv;
1504         __u32 new_limit;
1505
1506         ENTRY;
1507         if (unlikely(!req->rq_import || !req->rq_import->imp_obd ||
1508                      !imp_connect_lru_resize(req->rq_import)))
1509                 /* Do nothing for corner cases. */
1510                 RETURN(0);
1511
1512         /*
1513          * In some cases RPC may contain SLV and limit zeroed out. This
1514          * is the case when server does not support LRU resize feature.
1515          * This is also possible in some recovery cases when server-side
1516          * reqs have no reference to the OBD export and thus access to
1517          * server-side namespace is not possible.
1518          */
1519         if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
1520             lustre_msg_get_limit(req->rq_repmsg) == 0) {
1521                 DEBUG_REQ(D_HA, req,
1522                           "Zero SLV or Limit found (SLV: %llu, Limit: %u)",
1523                           lustre_msg_get_slv(req->rq_repmsg),
1524                           lustre_msg_get_limit(req->rq_repmsg));
1525                 RETURN(0);
1526         }
1527
1528         new_limit = lustre_msg_get_limit(req->rq_repmsg);
1529         new_slv = lustre_msg_get_slv(req->rq_repmsg);
1530         obd = req->rq_import->imp_obd;
1531
1532         /*
1533          * Set new SLV and limit in OBD fields to make them accessible
1534          * to the pool thread. We do not access obd_namespace and pool
1535          * directly here as there is no reliable way to make sure that
1536          * they are still alive at cleanup time. Evil races are possible
1537          * which may cause Oops at that time.
1538          */
1539         write_lock(&obd->obd_pool_lock);
1540         obd->obd_pool_slv = new_slv;
1541         obd->obd_pool_limit = new_limit;
1542         write_unlock(&obd->obd_pool_lock);
1543
1544         RETURN(0);
1545 }
1546
1547 /**
1548  * Client side lock cancel.
1549  *
1550  * Lock must not have any readers or writers by this time.
1551  */
1552 int ldlm_cli_cancel(const struct lustre_handle *lockh,
1553                     enum ldlm_cancel_flags cancel_flags)
1554 {
1555         struct obd_export *exp;
1556         enum ldlm_lru_flags lru_flags;
1557         int avail, count = 1;
1558         __u64 rc = 0;
1559         struct ldlm_namespace *ns;
1560         struct ldlm_lock *lock;
1561         struct list_head cancels = LIST_HEAD_INIT(cancels);
1562
1563         ENTRY;
1564
1565         lock = ldlm_handle2lock_long(lockh, 0);
1566         if (lock == NULL) {
1567                 LDLM_DEBUG_NOLOCK("lock is already being destroyed");
1568                 RETURN(0);
1569         }
1570
1571         /* Convert lock bits instead of cancel for IBITS locks */
1572         if (cancel_flags & LCF_CONVERT) {
1573                 LASSERT(lock->l_resource->lr_type == LDLM_IBITS);
1574                 LASSERT(lock->l_policy_data.l_inodebits.cancel_bits != 0);
1575
1576                 rc = ldlm_cli_dropbits(lock,
1577                                 lock->l_policy_data.l_inodebits.cancel_bits);
1578                 if (rc == 0) {
1579                         LDLM_LOCK_RELEASE(lock);
1580                         RETURN(0);
1581                 }
1582         }
1583
1584         lock_res_and_lock(lock);
1585         /* Lock is being canceled and the caller doesn't want to wait */
1586         if (ldlm_is_canceling(lock)) {
1587                 if (cancel_flags & LCF_ASYNC) {
1588                         unlock_res_and_lock(lock);
1589                 } else {
1590                         struct l_wait_info lwi = { 0 };
1591
1592                         unlock_res_and_lock(lock);
1593                         l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi);
1594                 }
1595                 LDLM_LOCK_RELEASE(lock);
1596                 RETURN(0);
1597         }
1598
1599         /*
1600          * Lock is being converted, cancel it immediately.
1601          * When convert will end, it releases lock and it will be gone.
1602          */
1603         if (ldlm_is_converting(lock)) {
1604                 /* set back flags removed by convert */
1605                 ldlm_set_cbpending(lock);
1606                 ldlm_set_bl_ast(lock);
1607         }
1608
1609         ldlm_set_canceling(lock);
1610         unlock_res_and_lock(lock);
1611
1612         if (cancel_flags & LCF_LOCAL)
1613                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_LOCAL_CANCEL_PAUSE,
1614                                  cfs_fail_val);
1615
1616         rc = ldlm_cli_cancel_local(lock);
1617         if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) {
1618                 LDLM_LOCK_RELEASE(lock);
1619                 RETURN(0);
1620         }
1621         /*
1622          * Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
1623          * RPC which goes to canceld portal, so we can cancel other LRU locks
1624          * here and send them all as one LDLM_CANCEL RPC.
1625          */
1626         LASSERT(list_empty(&lock->l_bl_ast));
1627         list_add(&lock->l_bl_ast, &cancels);
1628
1629         exp = lock->l_conn_export;
1630         if (exp_connect_cancelset(exp)) {
1631                 avail = ldlm_format_handles_avail(class_exp2cliimp(exp),
1632                                                   &RQF_LDLM_CANCEL,
1633                                                   RCL_CLIENT, 0);
1634                 LASSERT(avail > 0);
1635
1636                 ns = ldlm_lock_to_ns(lock);
1637                 lru_flags = ns_connect_lru_resize(ns) ?
1638                         LDLM_LRU_FLAG_LRUR : LDLM_LRU_FLAG_AGED;
1639                 count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
1640                                                LCF_BL_AST, lru_flags);
1641         }
1642         ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags);
1643         RETURN(0);
1644 }
1645 EXPORT_SYMBOL(ldlm_cli_cancel);
1646
1647 /**
1648  * Locally cancel up to \a count locks in list \a cancels.
1649  * Return the number of cancelled locks.
1650  */
1651 int ldlm_cli_cancel_list_local(struct list_head *cancels, int count,
1652                                enum ldlm_cancel_flags cancel_flags)
1653 {
1654         struct list_head head = LIST_HEAD_INIT(head);
1655         struct ldlm_lock *lock, *next;
1656         int left = 0, bl_ast = 0;
1657         __u64 rc;
1658
1659         left = count;
1660         list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
1661                 if (left-- == 0)
1662                         break;
1663
1664                 if (cancel_flags & LCF_LOCAL) {
1665                         rc = LDLM_FL_LOCAL_ONLY;
1666                         ldlm_lock_cancel(lock);
1667                 } else {
1668                         rc = ldlm_cli_cancel_local(lock);
1669                 }
1670                 /*
1671                  * Until we have compound requests and can send LDLM_CANCEL
1672                  * requests batched with generic RPCs, we need to send cancels
1673                  * with the LDLM_FL_BL_AST flag in a separate RPC from
1674                  * the one being generated now.
1675                  */
1676                 if (!(cancel_flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) {
1677                         LDLM_DEBUG(lock, "Cancel lock separately");
1678                         list_del_init(&lock->l_bl_ast);
1679                         list_add(&lock->l_bl_ast, &head);
1680                         bl_ast++;
1681                         continue;
1682                 }
1683                 if (rc == LDLM_FL_LOCAL_ONLY) {
1684                         /* CANCEL RPC should not be sent to server. */
1685                         list_del_init(&lock->l_bl_ast);
1686                         LDLM_LOCK_RELEASE(lock);
1687                         count--;
1688                 }
1689         }
1690         if (bl_ast > 0) {
1691                 count -= bl_ast;
1692                 ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
1693         }
1694
1695         RETURN(count);
1696 }
1697
1698 /**
1699  * Cancel as many locks as possible w/o sending any RPCs (e.g. to write back
1700  * dirty data, to close a file, ...) or waiting for any RPCs in-flight (e.g.
1701  * readahead requests, ...)
1702  */
1703 static enum ldlm_policy_res
1704 ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
1705                            int unused, int added, int count)
1706 {
1707         enum ldlm_policy_res result = LDLM_POLICY_CANCEL_LOCK;
1708
1709         /*
1710          * don't check added & count since we want to process all locks
1711          * from unused list.
1712          * It's fine to not take lock to access lock->l_resource since
1713          * the lock has already been granted so it won't change.
1714          */
1715         switch (lock->l_resource->lr_type) {
1716                 case LDLM_EXTENT:
1717                 case LDLM_IBITS:
1718                         if (ns->ns_cancel != NULL && ns->ns_cancel(lock) != 0)
1719                                 break;
1720                 default:
1721                         result = LDLM_POLICY_SKIP_LOCK;
1722                         break;
1723         }
1724
1725         RETURN(result);
1726 }
1727
1728 /**
1729  * Callback function for LRU-resize policy. Decides whether to keep
1730  * \a lock in LRU for current \a LRU size \a unused, added in current
1731  * scan \a added and number of locks to be preferably canceled \a count.
1732  *
1733  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1734  *
1735  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1736  */
1737 static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
1738                                                     struct ldlm_lock *lock,
1739                                                     int unused, int added,
1740                                                     int count)
1741 {
1742         ktime_t cur = ktime_get();
1743         struct ldlm_pool *pl = &ns->ns_pool;
1744         u64 slv, lvf, lv;
1745         s64 la;
1746
1747         /*
1748          * Stop LRU processing when we reach past @count or have checked all
1749          * locks in LRU.
1750          */
1751         if (count && added >= count)
1752                 return LDLM_POLICY_KEEP_LOCK;
1753
1754         /*
1755          * Despite of the LV, It doesn't make sense to keep the lock which
1756          * is unused for ns_max_age time.
1757          */
1758         if (ktime_after(ktime_get(),
1759                         ktime_add(lock->l_last_used, ns->ns_max_age)))
1760                 return LDLM_POLICY_CANCEL_LOCK;
1761
1762         slv = ldlm_pool_get_slv(pl);
1763         lvf = ldlm_pool_get_lvf(pl);
1764         la = div_u64(ktime_to_ns(ktime_sub(cur, lock->l_last_used)),
1765                      NSEC_PER_SEC);
1766         lv = lvf * la * unused;
1767
1768         /* Inform pool about current CLV to see it via debugfs. */
1769         ldlm_pool_set_clv(pl, lv);
1770
1771         /*
1772          * Stop when SLV is not yet come from server or lv is smaller than
1773          * it is.
1774          */
1775         if (slv == 0 || lv < slv)
1776                 return LDLM_POLICY_KEEP_LOCK;
1777
1778         return LDLM_POLICY_CANCEL_LOCK;
1779 }
1780
1781 static enum ldlm_policy_res
1782 ldlm_cancel_lrur_no_wait_policy(struct ldlm_namespace *ns,
1783                                 struct ldlm_lock *lock,
1784                                 int unused, int added,
1785                                 int count)
1786 {
1787         enum ldlm_policy_res result;
1788
1789         result = ldlm_cancel_lrur_policy(ns, lock, unused, added, count);
1790         if (result == LDLM_POLICY_KEEP_LOCK)
1791                 return result;
1792
1793         return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
1794 }
1795
1796 /**
1797  * Callback function for debugfs used policy. Makes decision whether to keep
1798  * \a lock in LRU for current \a LRU size \a unused, added in current scan \a
1799  * added and number of locks to be preferably canceled \a count.
1800  *
1801  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1802  *
1803  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1804  */
1805 static enum ldlm_policy_res ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
1806                                                       struct ldlm_lock *lock,
1807                                                       int unused, int added,
1808                                                       int count)
1809 {
1810         /*
1811          * Stop LRU processing when we reach past @count or have checked all
1812          * locks in LRU.
1813          */
1814         return (added >= count) ?
1815                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1816 }
1817
1818 /**
1819  * Callback function for aged policy. Makes decision whether to keep \a lock in
1820  * LRU for current LRU size \a unused, added in current scan \a added and
1821  * number of locks to be preferably canceled \a count.
1822  *
1823  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1824  *
1825  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1826  */
1827 static enum ldlm_policy_res ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
1828                                                     struct ldlm_lock *lock,
1829                                                     int unused, int added,
1830                                                     int count)
1831 {
1832         if ((added >= count) &&
1833             ktime_before(ktime_get(),
1834                          ktime_add(lock->l_last_used, ns->ns_max_age)))
1835                 return LDLM_POLICY_KEEP_LOCK;
1836
1837         return LDLM_POLICY_CANCEL_LOCK;
1838 }
1839
1840 static enum ldlm_policy_res
1841 ldlm_cancel_aged_no_wait_policy(struct ldlm_namespace *ns,
1842                                 struct ldlm_lock *lock,
1843                                 int unused, int added, int count)
1844 {
1845         enum ldlm_policy_res result;
1846
1847         result = ldlm_cancel_aged_policy(ns, lock, unused, added, count);
1848         if (result == LDLM_POLICY_KEEP_LOCK)
1849                 return result;
1850
1851         return ldlm_cancel_no_wait_policy(ns, lock, unused, added, count);
1852 }
1853
1854 /**
1855  * Callback function for default policy. Makes decision whether to keep \a lock
1856  * in LRU for current LRU size \a unused, added in current scan \a added and
1857  * number of locks to be preferably canceled \a count.
1858  *
1859  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1860  *
1861  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1862  */
1863 static
1864 enum ldlm_policy_res ldlm_cancel_default_policy(struct ldlm_namespace *ns,
1865                                                 struct ldlm_lock *lock,
1866                                                 int unused, int added,
1867                                                 int count)
1868 {
1869         /*
1870          * Stop LRU processing when we reach past count or have checked all
1871          * locks in LRU.
1872          */
1873         return (added >= count) ?
1874                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1875 }
1876
1877 typedef enum ldlm_policy_res
1878 (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *ns, struct ldlm_lock *lock,
1879                             int unused, int added, int count);
1880
1881 static ldlm_cancel_lru_policy_t
1882 ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags)
1883 {
1884         if (ns_connect_lru_resize(ns)) {
1885                 if (lru_flags & LDLM_LRU_FLAG_SHRINK)
1886                         /* We kill passed number of old locks. */
1887                         return ldlm_cancel_passed_policy;
1888                 if (lru_flags & LDLM_LRU_FLAG_LRUR) {
1889                         if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
1890                                 return ldlm_cancel_lrur_no_wait_policy;
1891                         else
1892                                 return ldlm_cancel_lrur_policy;
1893                 }
1894                 if (lru_flags & LDLM_LRU_FLAG_PASSED)
1895                         return ldlm_cancel_passed_policy;
1896         } else {
1897                 if (lru_flags & LDLM_LRU_FLAG_AGED) {
1898                         if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
1899                                 return ldlm_cancel_aged_no_wait_policy;
1900                         else
1901                                 return ldlm_cancel_aged_policy;
1902                 }
1903         }
1904         if (lru_flags & LDLM_LRU_FLAG_NO_WAIT)
1905                 return ldlm_cancel_no_wait_policy;
1906
1907         return ldlm_cancel_default_policy;
1908 }
1909
1910 /**
1911  * - Free space in LRU for \a count new locks,
1912  *   redundant unused locks are canceled locally;
1913  * - also cancel locally unused aged locks;
1914  * - do not cancel more than \a max locks;
1915  * - GET the found locks and add them into the \a cancels list.
1916  *
1917  * A client lock can be added to the l_bl_ast list only when it is
1918  * marked LDLM_FL_CANCELING. Otherwise, somebody is already doing
1919  * CANCEL.  There are the following use cases:
1920  * ldlm_cancel_resource_local(), ldlm_cancel_lru_local() and
1921  * ldlm_cli_cancel(), which check and set this flag properly. As any
1922  * attempt to cancel a lock rely on this flag, l_bl_ast list is accessed
1923  * later without any special locking.
1924  *
1925  * Calling policies for enabled LRU resize:
1926  * ----------------------------------------
1927  * flags & LDLM_LRU_FLAG_LRUR - use LRU resize policy (SLV from server) to
1928  *                              cancel not more than \a count locks;
1929  *
1930  * flags & LDLM_LRU_FLAG_PASSED - cancel \a count number of old locks (located
1931  *                              at the beginning of LRU list);
1932  *
1933  * flags & LDLM_LRU_FLAG_SHRINK - cancel not more than \a count locks according
1934  *                              to memory pressre policy function;
1935  *
1936  * flags & LDLM_LRU_FLAG_AGED - cancel \a count locks according to "aged policy"
1937  *
1938  * flags & LDLM_LRU_FLAG_NO_WAIT - cancel as many unused locks as possible
1939  *                              (typically before replaying locks) w/o
1940  *                              sending any RPCs or waiting for any
1941  *                              outstanding RPC to complete.
1942  *
1943  * flags & LDLM_CANCEL_CLEANUP - when cancelling read locks, do not check for
1944  *                              other read locks covering the same pages, just
1945  *                              discard those pages.
1946  */
1947 static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
1948                                  struct list_head *cancels, int count, int max,
1949                                  enum ldlm_lru_flags lru_flags)
1950 {
1951         ldlm_cancel_lru_policy_t pf;
1952         int added = 0;
1953         int no_wait = lru_flags & LDLM_LRU_FLAG_NO_WAIT;
1954
1955         ENTRY;
1956
1957         if (!ns_connect_lru_resize(ns))
1958                 count += ns->ns_nr_unused - ns->ns_max_unused;
1959
1960         pf = ldlm_cancel_lru_policy(ns, lru_flags);
1961         LASSERT(pf != NULL);
1962
1963         /* For any flags, stop scanning if @max is reached. */
1964         while (!list_empty(&ns->ns_unused_list) && (max == 0 || added < max)) {
1965                 struct ldlm_lock *lock;
1966                 struct list_head *item, *next;
1967                 enum ldlm_policy_res result;
1968                 ktime_t last_use = ktime_set(0, 0);
1969
1970                 spin_lock(&ns->ns_lock);
1971                 item = no_wait ? ns->ns_last_pos : &ns->ns_unused_list;
1972                 for (item = item->next, next = item->next;
1973                      item != &ns->ns_unused_list;
1974                      item = next, next = item->next) {
1975                         lock = list_entry(item, struct ldlm_lock, l_lru);
1976
1977                         /* No locks which got blocking requests. */
1978                         LASSERT(!ldlm_is_bl_ast(lock));
1979
1980                         if (!ldlm_is_canceling(lock) &&
1981                             !ldlm_is_converting(lock))
1982                                 break;
1983
1984                         /*
1985                          * Somebody is already doing CANCEL. No need for this
1986                          * lock in LRU, do not traverse it again.
1987                          */
1988                         ldlm_lock_remove_from_lru_nolock(lock);
1989                 }
1990                 if (item == &ns->ns_unused_list) {
1991                         spin_unlock(&ns->ns_lock);
1992                         break;
1993                 }
1994
1995                 last_use = lock->l_last_used;
1996
1997                 LDLM_LOCK_GET(lock);
1998                 spin_unlock(&ns->ns_lock);
1999                 lu_ref_add(&lock->l_reference, __FUNCTION__, current);
2000
2001                 /*
2002                  * Pass the lock through the policy filter and see if it
2003                  * should stay in LRU.
2004                  *
2005                  * Even for shrinker policy we stop scanning if
2006                  * we find a lock that should stay in the cache.
2007                  * We should take into account lock age anyway
2008                  * as a new lock is a valuable resource even if
2009                  * it has a low weight.
2010                  *
2011                  * That is, for shrinker policy we drop only
2012                  * old locks, but additionally choose them by
2013                  * their weight. Big extent locks will stay in
2014                  * the cache.
2015                  */
2016                 result = pf(ns, lock, ns->ns_nr_unused, added, count);
2017                 if (result == LDLM_POLICY_KEEP_LOCK) {
2018                         lu_ref_del(&lock->l_reference, __func__, current);
2019                         LDLM_LOCK_RELEASE(lock);
2020                         break;
2021                 }
2022
2023                 if (result == LDLM_POLICY_SKIP_LOCK) {
2024                         lu_ref_del(&lock->l_reference, __func__, current);
2025                         if (no_wait) {
2026                                 spin_lock(&ns->ns_lock);
2027                                 if (!list_empty(&lock->l_lru) &&
2028                                     lock->l_lru.prev == ns->ns_last_pos)
2029                                         ns->ns_last_pos = &lock->l_lru;
2030                                 spin_unlock(&ns->ns_lock);
2031                         }
2032
2033                         LDLM_LOCK_RELEASE(lock);
2034                         continue;
2035                 }
2036
2037                 lock_res_and_lock(lock);
2038                 /* Check flags again under the lock. */
2039                 if (ldlm_is_canceling(lock) || ldlm_is_converting(lock) ||
2040                     ldlm_lock_remove_from_lru_check(lock, last_use) == 0) {
2041                         /*
2042                          * Another thread is removing lock from LRU, or
2043                          * somebody is already doing CANCEL, or there
2044                          * is a blocking request which will send cancel
2045                          * by itself, or the lock is no longer unused or
2046                          * the lock has been used since the pf() call and
2047                          * pages could be put under it.
2048                          */
2049                         unlock_res_and_lock(lock);
2050                         lu_ref_del(&lock->l_reference, __FUNCTION__, current);
2051                         LDLM_LOCK_RELEASE(lock);
2052                         continue;
2053                 }
2054                 LASSERT(!lock->l_readers && !lock->l_writers);
2055
2056                 /*
2057                  * If we have chosen to cancel this lock voluntarily, we
2058                  * better send cancel notification to server, so that it
2059                  * frees appropriate state. This might lead to a race
2060                  * where while we are doing cancel here, server is also
2061                  * silently cancelling this lock.
2062                  */
2063                 ldlm_clear_cancel_on_block(lock);
2064
2065                 /*
2066                  * Setting the CBPENDING flag is a little misleading,
2067                  * but prevents an important race; namely, once
2068                  * CBPENDING is set, the lock can accumulate no more
2069                  * readers/writers. Since readers and writers are
2070                  * already zero here, ldlm_lock_decref() won't see
2071                  * this flag and call l_blocking_ast
2072                  */
2073                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
2074
2075                 if ((lru_flags & LDLM_LRU_FLAG_CLEANUP) &&
2076                     (lock->l_resource->lr_type == LDLM_EXTENT ||
2077                      ldlm_has_dom(lock)) && lock->l_granted_mode == LCK_PR)
2078                         ldlm_set_discard_data(lock);
2079
2080                 /*
2081                  * We can't re-add to l_lru as it confuses the
2082                  * refcounting in ldlm_lock_remove_from_lru() if an AST
2083                  * arrives after we drop lr_lock below. We use l_bl_ast
2084                  * and can't use l_pending_chain as it is used both on
2085                  * server and client nevertheless b=5666 says it is
2086                  * used only on server
2087                  */
2088                 LASSERT(list_empty(&lock->l_bl_ast));
2089                 list_add(&lock->l_bl_ast, cancels);
2090                 unlock_res_and_lock(lock);
2091                 lu_ref_del(&lock->l_reference, __FUNCTION__, current);
2092                 added++;
2093         }
2094         RETURN(added);
2095 }
2096
2097 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
2098                           int count, int max,
2099                           enum ldlm_cancel_flags cancel_flags,
2100                           enum ldlm_lru_flags lru_flags)
2101 {
2102         int added;
2103
2104         added = ldlm_prepare_lru_list(ns, cancels, count, max, lru_flags);
2105         if (added <= 0)
2106                 return added;
2107
2108         return ldlm_cli_cancel_list_local(cancels, added, cancel_flags);
2109 }
2110
2111 /**
2112  * Cancel at least \a nr locks from given namespace LRU.
2113  *
2114  * When called with LCF_ASYNC the blocking callback will be handled
2115  * in a thread and this function will return after the thread has been
2116  * asked to call the callback.  When called with LCF_ASYNC the blocking
2117  * callback will be performed in this function.
2118  */
2119 int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
2120                     enum ldlm_cancel_flags cancel_flags,
2121                     enum ldlm_lru_flags lru_flags)
2122 {
2123         struct list_head cancels = LIST_HEAD_INIT(cancels);
2124         int count, rc;
2125
2126         ENTRY;
2127
2128         /*
2129          * Just prepare the list of locks, do not actually cancel them yet.
2130          * Locks are cancelled later in a separate thread.
2131          */
2132         count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, lru_flags);
2133         rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
2134         if (rc == 0)
2135                 RETURN(count);
2136
2137         RETURN(0);
2138 }
2139
2140 /**
2141  * Find and cancel locally unused locks found on resource, matched to the
2142  * given policy, mode. GET the found locks and add them into the \a cancels
2143  * list.
2144  */
2145 int ldlm_cancel_resource_local(struct ldlm_resource *res,
2146                                struct list_head *cancels,
2147                                union ldlm_policy_data *policy,
2148                                enum ldlm_mode mode, __u64 lock_flags,
2149                                enum ldlm_cancel_flags cancel_flags,
2150                                void *opaque)
2151 {
2152         struct ldlm_lock *lock;
2153         int count = 0;
2154
2155         ENTRY;
2156
2157         lock_res(res);
2158         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2159                 if (opaque != NULL && lock->l_ast_data != opaque) {
2160                         LDLM_ERROR(lock, "data %p doesn't match opaque %p",
2161                                    lock->l_ast_data, opaque);
2162                         continue;
2163                 }
2164
2165                 if (lock->l_readers || lock->l_writers)
2166                         continue;
2167
2168                 /*
2169                  * If somebody is already doing CANCEL, or blocking AST came,
2170                  * or lock is being converted then skip this lock.
2171                  */
2172                 if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock) ||
2173                     ldlm_is_converting(lock))
2174                         continue;
2175
2176                 if (lockmode_compat(lock->l_granted_mode, mode))
2177                         continue;
2178
2179                 /*
2180                  * If policy is given and this is IBITS lock, add to list only
2181                  * those locks that match by policy.
2182                  * Skip locks with DoM bit always to don't flush data.
2183                  */
2184                 if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
2185                     (!(lock->l_policy_data.l_inodebits.bits &
2186                       policy->l_inodebits.bits) || ldlm_has_dom(lock)))
2187                         continue;
2188
2189                 /* See CBPENDING comment in ldlm_cancel_lru */
2190                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
2191                                  lock_flags;
2192
2193                 LASSERT(list_empty(&lock->l_bl_ast));
2194                 list_add(&lock->l_bl_ast, cancels);
2195                 LDLM_LOCK_GET(lock);
2196                 count++;
2197         }
2198         unlock_res(res);
2199
2200         RETURN(ldlm_cli_cancel_list_local(cancels, count, cancel_flags));
2201 }
2202 EXPORT_SYMBOL(ldlm_cancel_resource_local);
2203
2204 /**
2205  * Cancel client-side locks from a list and send/prepare cancel RPCs to the
2206  * server.
2207  * If \a req is NULL, send CANCEL request to server with handles of locks
2208  * in the \a cancels. If EARLY_CANCEL is not supported, send CANCEL requests
2209  * separately per lock.
2210  * If \a req is not NULL, put handles of locks in \a cancels into the request
2211  * buffer at the offset \a off.
2212  * Destroy \a cancels at the end.
2213  */
2214 int ldlm_cli_cancel_list(struct list_head *cancels, int count,
2215                          struct ptlrpc_request *req,
2216                          enum ldlm_cancel_flags flags)
2217 {
2218         struct ldlm_lock *lock;
2219         int res = 0;
2220
2221         ENTRY;
2222
2223         if (list_empty(cancels) || count == 0)
2224                 RETURN(0);
2225
2226         /*
2227          * XXX: requests (both batched and not) could be sent in parallel.
2228          * Usually it is enough to have just 1 RPC, but it is possible that
2229          * there are too many locks to be cancelled in LRU or on a resource.
2230          * It would also speed up the case when the server does not support
2231          * the feature.
2232          */
2233         while (count > 0) {
2234                 LASSERT(!list_empty(cancels));
2235                 lock = list_entry(cancels->next, struct ldlm_lock,
2236                                   l_bl_ast);
2237                 LASSERT(lock->l_conn_export);
2238
2239                 if (exp_connect_cancelset(lock->l_conn_export)) {
2240                         res = count;
2241                         if (req)
2242                                 ldlm_cancel_pack(req, cancels, count);
2243                         else
2244                                 res = ldlm_cli_cancel_req(lock->l_conn_export,
2245                                                           cancels, count,
2246                                                           flags);
2247                 } else {
2248                         res = ldlm_cli_cancel_req(lock->l_conn_export,
2249                                                   cancels, 1, flags);
2250                 }
2251
2252                 if (res < 0) {
2253                         CDEBUG_LIMIT(res == -ESHUTDOWN ? D_DLMTRACE : D_ERROR,
2254                                      "ldlm_cli_cancel_list: %d\n", res);
2255                         res = count;
2256                 }
2257
2258                 count -= res;
2259                 ldlm_lock_list_put(cancels, l_bl_ast, res);
2260         }
2261         LASSERT(count == 0);
2262         RETURN(0);
2263 }
2264 EXPORT_SYMBOL(ldlm_cli_cancel_list);
2265
2266 /**
2267  * Cancel all locks on a resource that have 0 readers/writers.
2268  *
2269  * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
2270  * to notify the server.
2271  */
2272 int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
2273                                     const struct ldlm_res_id *res_id,
2274                                     union ldlm_policy_data *policy,
2275                                     enum ldlm_mode mode,
2276                                     enum ldlm_cancel_flags flags, void *opaque)
2277 {
2278         struct ldlm_resource *res;
2279         struct list_head cancels = LIST_HEAD_INIT(cancels);
2280         int count;
2281         int rc;
2282
2283         ENTRY;
2284
2285         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
2286         if (IS_ERR(res)) {
2287                 /* This is not a problem. */
2288                 CDEBUG(D_INFO, "No resource %llu\n", res_id->name[0]);
2289                 RETURN(0);
2290         }
2291
2292         LDLM_RESOURCE_ADDREF(res);
2293         count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
2294                                            0, flags | LCF_BL_AST, opaque);
2295         rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
2296         if (rc != ELDLM_OK)
2297                 CERROR("canceling unused lock "DLDLMRES": rc = %d\n",
2298                        PLDLMRES(res), rc);
2299
2300         LDLM_RESOURCE_DELREF(res);
2301         ldlm_resource_putref(res);
2302         RETURN(0);
2303 }
2304 EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource);
2305
2306 struct ldlm_cli_cancel_arg {
2307         int     lc_flags;
2308         void   *lc_opaque;
2309 };
2310
2311 static int
2312 ldlm_cli_hash_cancel_unused(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2313                             struct hlist_node *hnode, void *arg)
2314 {
2315         struct ldlm_resource           *res = cfs_hash_object(hs, hnode);
2316         struct ldlm_cli_cancel_arg     *lc = arg;
2317
2318         ldlm_cli_cancel_unused_resource(ldlm_res_to_ns(res), &res->lr_name,
2319                                         NULL, LCK_MINMODE, lc->lc_flags,
2320                                         lc->lc_opaque);
2321         /* must return 0 for hash iteration */
2322         return 0;
2323 }
2324
2325 /**
2326  * Cancel all locks on a namespace (or a specific resource, if given)
2327  * that have 0 readers/writers.
2328  *
2329  * If flags & LCF_LOCAL, throw the locks away without trying
2330  * to notify the server.
2331  */
2332 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
2333                            const struct ldlm_res_id *res_id,
2334                            enum ldlm_cancel_flags flags, void *opaque)
2335 {
2336         struct ldlm_cli_cancel_arg arg = {
2337                 .lc_flags       = flags,
2338                 .lc_opaque      = opaque,
2339         };
2340
2341         ENTRY;
2342
2343         if (ns == NULL)
2344                 RETURN(ELDLM_OK);
2345
2346         if (res_id != NULL) {
2347                 RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, NULL,
2348                                                        LCK_MINMODE, flags,
2349                                                        opaque));
2350         } else {
2351                 cfs_hash_for_each_nolock(ns->ns_rs_hash,
2352                                          ldlm_cli_hash_cancel_unused, &arg, 0);
2353                 RETURN(ELDLM_OK);
2354         }
2355 }
2356
2357 /* Lock iterators. */
2358
2359 int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
2360                           void *closure)
2361 {
2362         struct list_head *tmp, *next;
2363         struct ldlm_lock *lock;
2364         int rc = LDLM_ITER_CONTINUE;
2365
2366         ENTRY;
2367
2368         if (!res)
2369                 RETURN(LDLM_ITER_CONTINUE);
2370
2371         lock_res(res);
2372         list_for_each_safe(tmp, next, &res->lr_granted) {
2373                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
2374
2375                 if (iter(lock, closure) == LDLM_ITER_STOP)
2376                         GOTO(out, rc = LDLM_ITER_STOP);
2377         }
2378
2379         list_for_each_safe(tmp, next, &res->lr_waiting) {
2380                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
2381
2382                 if (iter(lock, closure) == LDLM_ITER_STOP)
2383                         GOTO(out, rc = LDLM_ITER_STOP);
2384         }
2385 out:
2386         unlock_res(res);
2387         RETURN(rc);
2388 }
2389
2390 struct iter_helper_data {
2391         ldlm_iterator_t iter;
2392         void *closure;
2393 };
2394
2395 static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
2396 {
2397         struct iter_helper_data *helper = closure;
2398
2399         return helper->iter(lock, helper->closure);
2400 }
2401
2402 static int ldlm_res_iter_helper(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2403                                 struct hlist_node *hnode, void *arg)
2404
2405 {
2406         struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2407
2408         return ldlm_resource_foreach(res, ldlm_iter_helper, arg) ==
2409                                      LDLM_ITER_STOP;
2410 }
2411
2412 void ldlm_namespace_foreach(struct ldlm_namespace *ns,
2413                             ldlm_iterator_t iter, void *closure)
2414
2415 {
2416         struct iter_helper_data helper = { .iter = iter, .closure = closure };
2417
2418         cfs_hash_for_each_nolock(ns->ns_rs_hash,
2419                                  ldlm_res_iter_helper, &helper, 0);
2420
2421 }
2422
2423 /*
2424  * non-blocking function to manipulate a lock whose cb_data is being put away.
2425  * return  0:  find no resource
2426  *       > 0:  must be LDLM_ITER_STOP/LDLM_ITER_CONTINUE.
2427  *       < 0:  errors
2428  */
2429 int ldlm_resource_iterate(struct ldlm_namespace *ns,
2430                           const struct ldlm_res_id *res_id,
2431                           ldlm_iterator_t iter, void *data)
2432 {
2433         struct ldlm_resource *res;
2434         int rc;
2435
2436         ENTRY;
2437
2438         LASSERTF(ns != NULL, "must pass in namespace\n");
2439
2440         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
2441         if (IS_ERR(res))
2442                 RETURN(0);
2443
2444         LDLM_RESOURCE_ADDREF(res);
2445         rc = ldlm_resource_foreach(res, iter, data);
2446         LDLM_RESOURCE_DELREF(res);
2447         ldlm_resource_putref(res);
2448         RETURN(rc);
2449 }
2450 EXPORT_SYMBOL(ldlm_resource_iterate);
2451
2452 /* Lock replay */
2453 static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
2454 {
2455         struct list_head *list = closure;
2456
2457         /* we use l_pending_chain here, because it's unused on clients. */
2458         LASSERTF(list_empty(&lock->l_pending_chain),
2459                  "lock %p next %p prev %p\n",
2460                  lock, &lock->l_pending_chain.next,
2461                  &lock->l_pending_chain.prev);
2462         /*
2463          * b=9573: don't replay locks left after eviction, or
2464          * b=17614: locks being actively cancelled. Get a reference
2465          * on a lock so that it does not disapear under us (e.g. due to cancel)
2466          */
2467         if (!(lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_BL_DONE))) {
2468                 list_add(&lock->l_pending_chain, list);
2469                 LDLM_LOCK_GET(lock);
2470         }
2471
2472         return LDLM_ITER_CONTINUE;
2473 }
2474
2475 static int replay_lock_interpret(const struct lu_env *env,
2476                                  struct ptlrpc_request *req, void *args, int rc)
2477 {
2478         struct ldlm_async_args *aa = args;
2479         struct ldlm_lock     *lock;
2480         struct ldlm_reply    *reply;
2481         struct obd_export    *exp;
2482
2483         ENTRY;
2484         atomic_dec(&req->rq_import->imp_replay_inflight);
2485         if (rc != ELDLM_OK)
2486                 GOTO(out, rc);
2487
2488         reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2489         if (reply == NULL)
2490                 GOTO(out, rc = -EPROTO);
2491
2492         lock = ldlm_handle2lock(&aa->lock_handle);
2493         if (!lock) {
2494                 CERROR("received replay ack for unknown local cookie %#llx remote cookie %#llx from server %s id %s\n",
2495                        aa->lock_handle.cookie, reply->lock_handle.cookie,
2496                        req->rq_export->exp_client_uuid.uuid,
2497                        libcfs_id2str(req->rq_peer));
2498                 GOTO(out, rc = -ESTALE);
2499         }
2500
2501         /* Key change rehash lock in per-export hash with new key */
2502         exp = req->rq_export;
2503         if (exp && exp->exp_lock_hash) {
2504                 /*
2505                  * In the function below, .hs_keycmp resolves to
2506                  * ldlm_export_lock_keycmp()
2507                  */
2508                 /* coverity[overrun-buffer-val] */
2509                 cfs_hash_rehash_key(exp->exp_lock_hash,
2510                                     &lock->l_remote_handle,
2511                                     &reply->lock_handle,
2512                                     &lock->l_exp_hash);
2513         } else {
2514                 lock->l_remote_handle = reply->lock_handle;
2515         }
2516
2517         LDLM_DEBUG(lock, "replayed lock:");
2518         ptlrpc_import_recovery_state_machine(req->rq_import);
2519         LDLM_LOCK_PUT(lock);
2520 out:
2521         if (rc != ELDLM_OK)
2522                 ptlrpc_connect_import(req->rq_import);
2523
2524         RETURN(rc);
2525 }
2526
2527 static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
2528 {
2529         struct ptlrpc_request *req;
2530         struct ldlm_async_args *aa;
2531         struct ldlm_request   *body;
2532         int flags;
2533
2534         ENTRY;
2535
2536
2537         /* b=11974: Do not replay a lock which is actively being canceled */
2538         if (ldlm_is_bl_done(lock)) {
2539                 LDLM_DEBUG(lock, "Not replaying canceled lock:");
2540                 RETURN(0);
2541         }
2542
2543         /*
2544          * If this is reply-less callback lock, we cannot replay it, since
2545          * server might have long dropped it, but notification of that event was
2546          * lost by network. (and server granted conflicting lock already)
2547          */
2548         if (ldlm_is_cancel_on_block(lock)) {
2549                 LDLM_DEBUG(lock, "Not replaying reply-less lock:");
2550                 ldlm_lock_cancel(lock);
2551                 RETURN(0);
2552         }
2553
2554         /*
2555          * If granted mode matches the requested mode, this lock is granted.
2556          *
2557          * If we haven't been granted anything and are on a resource list,
2558          * then we're blocked/waiting.
2559          *
2560          * If we haven't been granted anything and we're NOT on a resource list,
2561          * then we haven't got a reply yet and don't have a known disposition.
2562          * This happens whenever a lock enqueue is the request that triggers
2563          * recovery.
2564          */
2565         if (ldlm_is_granted(lock))
2566                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
2567         else if (!list_empty(&lock->l_res_link))
2568                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
2569         else
2570                 flags = LDLM_FL_REPLAY;
2571
2572         req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
2573                                         LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2574         if (req == NULL)
2575                 RETURN(-ENOMEM);
2576
2577         /* We're part of recovery, so don't wait for it. */
2578         req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
2579
2580         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2581         ldlm_lock2desc(lock, &body->lock_desc);
2582         body->lock_flags = ldlm_flags_to_wire(flags);
2583
2584         ldlm_lock2handle(lock, &body->lock_handle[0]);
2585         if (lock->l_lvb_len > 0)
2586                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
2587         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2588                              lock->l_lvb_len);
2589         ptlrpc_request_set_replen(req);
2590         /*
2591          * notify the server we've replayed all requests.
2592          * also, we mark the request to be put on a dedicated
2593          * queue to be processed after all request replayes.
2594          * b=6063
2595          */
2596         lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
2597
2598         LDLM_DEBUG(lock, "replaying lock:");
2599
2600         atomic_inc(&req->rq_import->imp_replay_inflight);
2601         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2602         aa = ptlrpc_req_async_args(req);
2603         aa->lock_handle = body->lock_handle[0];
2604         req->rq_interpret_reply = replay_lock_interpret;
2605         ptlrpcd_add_req(req);
2606
2607         RETURN(0);
2608 }
2609
2610 /**
2611  * Cancel as many unused locks as possible before replay. since we are
2612  * in recovery, we can't wait for any outstanding RPCs to send any RPC
2613  * to the server.
2614  *
2615  * Called only in recovery before replaying locks. there is no need to
2616  * replay locks that are unused. since the clients may hold thousands of
2617  * cached unused locks, dropping the unused locks can greatly reduce the
2618  * load on the servers at recovery time.
2619  */
2620 static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
2621 {
2622         int canceled;
2623         struct list_head cancels = LIST_HEAD_INIT(cancels);
2624
2625         CDEBUG(D_DLMTRACE,
2626                "Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
2627                ldlm_ns_name(ns), ns->ns_nr_unused);
2628
2629         /*
2630          * We don't need to care whether or not LRU resize is enabled
2631          * because the LDLM_LRU_FLAG_NO_WAIT policy doesn't use the
2632          * count parameter
2633          */
2634         canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0,
2635                                          LCF_LOCAL, LDLM_LRU_FLAG_NO_WAIT);
2636
2637         CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n",
2638                            canceled, ldlm_ns_name(ns));
2639 }
2640
2641 int ldlm_replay_locks(struct obd_import *imp)
2642 {
2643         struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
2644         struct list_head list = LIST_HEAD_INIT(list);
2645         struct ldlm_lock *lock, *next;
2646         int rc = 0;
2647
2648         ENTRY;
2649
2650         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
2651
2652         /* don't replay locks if import failed recovery */
2653         if (imp->imp_vbr_failed)
2654                 RETURN(0);
2655
2656         /* ensure this doesn't fall to 0 before all have been queued */
2657         atomic_inc(&imp->imp_replay_inflight);
2658
2659         if (ldlm_cancel_unused_locks_before_replay)
2660                 ldlm_cancel_unused_locks_for_replay(ns);
2661
2662         ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
2663
2664         list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
2665                 list_del_init(&lock->l_pending_chain);
2666                 if (rc) {
2667                         LDLM_LOCK_RELEASE(lock);
2668                         continue; /* or try to do the rest? */
2669                 }
2670                 rc = replay_one_lock(imp, lock);
2671                 LDLM_LOCK_RELEASE(lock);
2672         }
2673
2674         atomic_dec(&imp->imp_replay_inflight);
2675
2676         RETURN(rc);
2677 }