Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LDLM
38 #ifndef __KERNEL__
39 #include <signal.h>
40 #include <liblustre.h>
41 #endif
42
43 #include <lustre_dlm.h>
44 #include <obd_class.h>
45 #include <obd.h>
46
47 #include "ldlm_internal.h"
48
49 int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
50 CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
51                 "lock enqueue timeout minimum");
52
53 static void interrupted_completion_wait(void *data)
54 {
55 }
56
57 struct lock_wait_data {
58         struct ldlm_lock *lwd_lock;
59         __u32             lwd_conn_cnt;
60 };
61
62 struct ldlm_async_args {
63         struct lustre_handle lock_handle;
64 };
65
66 int ldlm_expired_completion_wait(void *data)
67 {
68         struct lock_wait_data *lwd = data;
69         struct ldlm_lock *lock = lwd->lwd_lock;
70         struct obd_import *imp;
71         struct obd_device *obd;
72
73         ENTRY;
74         if (lock->l_conn_export == NULL) {
75                 static cfs_time_t next_dump = 0, last_dump = 0;
76
77                 if (ptlrpc_check_suspend())
78                         RETURN(0);
79
80                 LDLM_ERROR(lock, "lock timed out (enqueued at "CFS_TIME_T", "
81                            CFS_DURATION_T"s ago); not entering recovery in "
82                            "server code, just going back to sleep",
83                            lock->l_enqueued_time.tv_sec,
84                            cfs_time_sub(cfs_time_current_sec(),
85                            lock->l_enqueued_time.tv_sec));
86                 if (cfs_time_after(cfs_time_current(), next_dump)) {
87                         last_dump = next_dump;
88                         next_dump = cfs_time_shift(300);
89                         ldlm_namespace_dump(D_DLMTRACE,
90                                             lock->l_resource->lr_namespace);
91                         if (last_dump == 0)
92                                 libcfs_debug_dumplog();
93                 }
94                 RETURN(0);
95         }
96
97         obd = lock->l_conn_export->exp_obd;
98         imp = obd->u.cli.cl_import;
99         ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
100         LDLM_ERROR(lock, "lock timed out (enqueued at "CFS_TIME_T", "
101                   CFS_DURATION_T"s ago), entering recovery for %s@%s",
102                   lock->l_enqueued_time.tv_sec,
103                   cfs_time_sub(cfs_time_current_sec(),
104                   lock->l_enqueued_time.tv_sec),
105                   obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
106
107         RETURN(0);
108 }
109
110 /* We use the same basis for both server side and client side functions
111    from a single node. */
112 int ldlm_get_enq_timeout(struct ldlm_lock *lock)
113 {
114         int timeout = at_get(&lock->l_resource->lr_namespace->ns_at_estimate);
115         if (AT_OFF)
116                 return obd_timeout / 2;
117         /* Since these are non-updating timeouts, we should be conservative.
118            It would be nice to have some kind of "early reply" mechanism for
119            lock callbacks too... */
120         timeout = timeout + (timeout >> 1); /* 150% */
121         return max(timeout, ldlm_enqueue_min);
122 }
123
124 static int is_granted_or_cancelled(struct ldlm_lock *lock)
125 {
126         int ret = 0;
127
128         lock_res_and_lock(lock);
129         if (((lock->l_req_mode == lock->l_granted_mode) &&
130              !(lock->l_flags & LDLM_FL_CP_REQD)) ||
131             (lock->l_flags & LDLM_FL_FAILED))
132                 ret = 1;
133         unlock_res_and_lock(lock);
134
135         return ret;
136 }
137
138 /**
139  * Helper function for ldlm_completion_ast(), updating timings when lock is
140  * actually granted.
141  */
142 static int ldlm_completion_tail(struct ldlm_lock *lock)
143 {
144         long delay;
145         int  result;
146
147         if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
148                 LDLM_DEBUG(lock, "client-side enqueue: destroyed");
149                 result = -EIO;
150         } else {
151                 delay = cfs_time_sub(cfs_time_current_sec(),
152                                      lock->l_enqueued_time.tv_sec);
153                 LDLM_DEBUG(lock, "client-side enqueue: granted after "
154                            CFS_DURATION_T"s", delay);
155
156                 /* Update our time estimate */
157                 at_add(&lock->l_resource->lr_namespace->ns_at_estimate, delay);
158                 result = 0;
159         }
160         return result;
161 }
162
163 /**
164  * Implementation of ->l_completion_ast() for a client that doesn't wait
165  * until lock is granted. Suitable for locks enqueued through ptlrpcd or
166  * other threads that cannot block for long.
167  */
168 int ldlm_completion_ast_async(struct ldlm_lock *lock, int flags, void *data)
169 {
170         ENTRY;
171
172         if (flags == LDLM_FL_WAIT_NOREPROC) {
173                 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
174                 RETURN(0);
175         }
176
177         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
178                        LDLM_FL_BLOCK_CONV))) {
179                 cfs_waitq_signal(&lock->l_waitq);
180                 RETURN(ldlm_completion_tail(lock));
181         }
182
183         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
184                    "going forward");
185         ldlm_lock_dump(D_OTHER, lock, 0);
186         RETURN(0);
187 }
188
189 /**
190  * Client side LDLM "completion" AST. This is called in several cases:
191  *
192  *     - when a reply to an ENQUEUE rpc is received from the server
193  *       (ldlm_cli_enqueue_fini()). Lock might be granted or not granted at
194  *       this point (determined by flags);
195  *
196  *     - when LDLM_CP_CALLBACK rpc comes to client to notify it that lock has
197  *       been granted;
198  *
199  *     - when ldlm_lock_match(LDLM_FL_LVB_READY) is about to wait until lock
200  *       gets correct lvb;
201  *
202  *     - to force all locks when resource is destroyed (cleanup_resource());
203  *
204  *     - during lock conversion (not used currently).
205  *
206  * If lock is not granted in the first case, this function waits until second
207  * or penultimate cases happen in some other thread.
208  *
209  */
210 int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
211 {
212         /* XXX ALLOCATE - 160 bytes */
213         struct lock_wait_data lwd;
214         struct obd_device *obd;
215         struct obd_import *imp = NULL;
216         struct l_wait_info lwi;
217         __u32 timeout;
218         int rc = 0;
219         ENTRY;
220
221         if (flags == LDLM_FL_WAIT_NOREPROC) {
222                 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
223                 goto noreproc;
224         }
225
226         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
227                        LDLM_FL_BLOCK_CONV))) {
228                 cfs_waitq_signal(&lock->l_waitq);
229                 RETURN(0);
230         }
231
232         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
233                    "sleeping");
234         ldlm_lock_dump(D_OTHER, lock, 0);
235
236 noreproc:
237
238         obd = class_exp2obd(lock->l_conn_export);
239
240         /* if this is a local lock, then there is no import */
241         if (obd != NULL) {
242                 imp = obd->u.cli.cl_import;
243         }
244
245         /* Wait a long time for enqueue - server may have to callback a
246            lock from another client.  Server will evict the other client if it
247            doesn't respond reasonably, and then give us the lock. */
248         timeout = ldlm_get_enq_timeout(lock) * 2;
249
250         lwd.lwd_lock = lock;
251
252         if (lock->l_flags & LDLM_FL_NO_TIMEOUT) {
253                 LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
254                 lwi = LWI_INTR(interrupted_completion_wait, &lwd);
255         } else {
256                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
257                                        ldlm_expired_completion_wait,
258                                        interrupted_completion_wait, &lwd);
259         }
260
261         if (imp != NULL) {
262                 spin_lock(&imp->imp_lock);
263                 lwd.lwd_conn_cnt = imp->imp_conn_cnt;
264                 spin_unlock(&imp->imp_lock);
265         }
266
267         /* Go to sleep until the lock is granted or cancelled. */
268         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
269
270         if (rc) {
271                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
272                            rc);
273                 RETURN(rc);
274         }
275
276         RETURN(ldlm_completion_tail(lock));
277 }
278
279 /**
280  * A helper to build a blocking ast function
281  *
282  * Perform a common operation for blocking asts:
283  * defferred lock cancellation.
284  *
285  * \param lock the lock blocking or canceling ast was called on
286  * \retval 0
287  * \see mdt_blocking_ast
288  * \see ldlm_blocking_ast
289  */
290 int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock)
291 {
292         int do_ast;
293         ENTRY;
294
295         lock->l_flags |= LDLM_FL_CBPENDING;
296         do_ast = (!lock->l_readers && !lock->l_writers);
297         unlock_res_and_lock(lock);
298
299         if (do_ast) {
300                 struct lustre_handle lockh;
301                 int rc;
302
303                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
304                 ldlm_lock2handle(lock, &lockh);
305                 rc = ldlm_cli_cancel(&lockh);
306                 if (rc < 0)
307                         CERROR("ldlm_cli_cancel: %d\n", rc);
308         } else {
309                 LDLM_DEBUG(lock, "Lock still has references, will be "
310                            "cancelled later");
311         }
312         RETURN(0);
313 }
314
315 /**
316  * Server blocking AST
317  *
318  * ->l_blocking_ast() callback for LDLM locks acquired by server-side
319  * OBDs.
320  *
321  * \param lock the lock which blocks a request or cancelling lock
322  * \param desc unused
323  * \param data unused
324  * \param flag indicates whether this cancelling or blocking callback
325  * \retval 0
326  * \see ldlm_blocking_ast_nocheck
327  */
328 int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
329                       void *data, int flag)
330 {
331         ENTRY;
332
333         if (flag == LDLM_CB_CANCELING) {
334                 /* Don't need to do anything here. */
335                 RETURN(0);
336         }
337
338         lock_res_and_lock(lock);
339         /* Get this: if ldlm_blocking_ast is racing with intent_policy, such
340          * that ldlm_blocking_ast is called just before intent_policy method
341          * takes the ns_lock, then by the time we get the lock, we might not
342          * be the correct blocking function anymore.  So check, and return
343          * early, if so. */
344         if (lock->l_blocking_ast != ldlm_blocking_ast) {
345                 unlock_res_and_lock(lock);
346                 RETURN(0);
347         }
348         RETURN(ldlm_blocking_ast_nocheck(lock));
349 }
350
351 /*
352  * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See
353  * comment in filter_intent_policy() on why you may need this.
354  */
355 int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
356 {
357         /*
358          * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for
359          * that is rather subtle: with OST-side locking, it may so happen that
360          * _all_ extent locks are held by the OST. If client wants to obtain
361          * current file size it calls ll{,u}_glimpse_size(), and (as locks are
362          * on the server), dummy glimpse callback fires and does
363          * nothing. Client still receives correct file size due to the
364          * following fragment in filter_intent_policy():
365          *
366          * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB
367          * if (rc != 0 && res->lr_namespace->ns_lvbo &&
368          *     res->lr_namespace->ns_lvbo->lvbo_update) {
369          *         res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
370          * }
371          *
372          * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and
373          * returns correct file size to the client.
374          */
375         return -ELDLM_NO_LOCK_DATA;
376 }
377
378 int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
379                            const struct ldlm_res_id *res_id,
380                            ldlm_type_t type, ldlm_policy_data_t *policy,
381                            ldlm_mode_t mode, int *flags,
382                            ldlm_blocking_callback blocking,
383                            ldlm_completion_callback completion,
384                            ldlm_glimpse_callback glimpse,
385                            void *data, __u32 lvb_len, void *lvb_swabber,
386                            const __u64 *client_cookie,
387                            struct lustre_handle *lockh)
388 {
389         struct ldlm_lock *lock;
390         int err;
391         const struct ldlm_callback_suite cbs = { .lcs_completion = completion,
392                                                  .lcs_blocking   = blocking,
393                                                  .lcs_glimpse    = glimpse,
394         };
395         ENTRY;
396
397         LASSERT(!(*flags & LDLM_FL_REPLAY));
398         if (unlikely(ns_is_client(ns))) {
399                 CERROR("Trying to enqueue local lock in a shadow namespace\n");
400                 LBUG();
401         }
402
403         lock = ldlm_lock_create(ns, res_id, type, mode, &cbs, data, lvb_len);
404         if (unlikely(!lock))
405                 GOTO(out_nolock, err = -ENOMEM);
406         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
407
408         ldlm_lock_addref_internal(lock, mode);
409         ldlm_lock2handle(lock, lockh);
410         lock_res_and_lock(lock);
411         lock->l_flags |= LDLM_FL_LOCAL;
412         if (*flags & LDLM_FL_ATOMIC_CB)
413                 lock->l_flags |= LDLM_FL_ATOMIC_CB;
414         lock->l_lvb_swabber = lvb_swabber;
415         unlock_res_and_lock(lock);
416         if (policy != NULL)
417                 lock->l_policy_data = *policy;
418         if (client_cookie != NULL)
419                 lock->l_client_cookie = *client_cookie;
420         if (type == LDLM_EXTENT)
421                 lock->l_req_extent = policy->l_extent;
422
423         err = ldlm_lock_enqueue(ns, &lock, policy, flags);
424         if (unlikely(err != ELDLM_OK))
425                 GOTO(out, err);
426
427         if (policy != NULL)
428                 *policy = lock->l_policy_data;
429
430         LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
431                           lock);
432
433         if (lock->l_completion_ast)
434                 lock->l_completion_ast(lock, *flags, NULL);
435
436         LDLM_DEBUG(lock, "client-side local enqueue END");
437         EXIT;
438  out:
439         LDLM_LOCK_RELEASE(lock);
440  out_nolock:
441         return err;
442 }
443
444 static void failed_lock_cleanup(struct ldlm_namespace *ns,
445                                 struct ldlm_lock *lock,
446                                 struct lustre_handle *lockh, int mode)
447 {
448         /* Set a flag to prevent us from sending a CANCEL (bug 407) */
449         lock_res_and_lock(lock);
450         lock->l_flags |= LDLM_FL_LOCAL_ONLY;
451         unlock_res_and_lock(lock);
452         LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
453
454         ldlm_lock_decref_and_cancel(lockh, mode);
455
456         /* XXX - HACK because we shouldn't call ldlm_lock_destroy()
457          *       from llite/file.c/ll_file_flock(). */
458         if (lock->l_resource->lr_type == LDLM_FLOCK) {
459                 ldlm_lock_destroy(lock);
460         }
461 }
462
463 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
464                           ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
465                           int *flags, void *lvb, __u32 lvb_len,
466                           void *lvb_swabber, struct lustre_handle *lockh,int rc)
467 {
468         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
469         int is_replay = *flags & LDLM_FL_REPLAY;
470         struct lustre_handle old_hash_key;
471         struct ldlm_lock *lock;
472         struct ldlm_reply *reply;
473         int cleanup_phase = 1;
474         ENTRY;
475
476         lock = ldlm_handle2lock(lockh);
477         /* ldlm_cli_enqueue is holding a reference on this lock. */
478         if (!lock) {
479                 LASSERT(type == LDLM_FLOCK);
480                 RETURN(-ENOLCK);
481         }
482
483         if (rc != ELDLM_OK) {
484                 LASSERT(!is_replay);
485                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
486                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
487                 if (rc == ELDLM_LOCK_ABORTED) {
488                         /* Before we return, swab the reply */
489                         reply = req_capsule_server_get(&req->rq_pill,
490                                                        &RMF_DLM_REP);
491                         if (reply == NULL)
492                                 rc = -EPROTO;
493                         if (lvb_len) {
494                                 struct ost_lvb *tmplvb;
495
496                                 req_capsule_set_size(&req->rq_pill,
497                                                      &RMF_DLM_LVB, RCL_SERVER,
498                                                      lvb_len);
499                             tmplvb = req_capsule_server_swab_get(&req->rq_pill,
500                                                                  &RMF_DLM_LVB,
501                                                                  lvb_swabber);
502                                 if (tmplvb == NULL)
503                                         GOTO(cleanup, rc = -EPROTO);
504                                 if (lvb != NULL)
505                                         memcpy(lvb, tmplvb, lvb_len);
506                         }
507                 }
508                 GOTO(cleanup, rc);
509         }
510
511         reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
512         if (reply == NULL)
513                 GOTO(cleanup, rc = -EPROTO);
514
515         /* lock enqueued on the server */
516         cleanup_phase = 0;
517
518         lock_res_and_lock(lock);
519         old_hash_key = lock->l_remote_handle;
520         lock->l_remote_handle = reply->lock_handle;
521
522         /* Key change rehash lock in per-export hash with new key */
523         if (exp->exp_lock_hash)
524                 lustre_hash_rehash_key(exp->exp_lock_hash, &old_hash_key,
525                                        &lock->l_remote_handle,
526                                        &lock->l_exp_hash);
527
528         *flags = reply->lock_flags;
529         lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS;
530         /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match()
531          * to wait with no timeout as well */
532         lock->l_flags |= reply->lock_flags & LDLM_FL_NO_TIMEOUT;
533         unlock_res_and_lock(lock);
534
535         CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n",
536                lock, reply->lock_handle.cookie, *flags);
537
538         /* If enqueue returned a blocked lock but the completion handler has
539          * already run, then it fixed up the resource and we don't need to do it
540          * again. */
541         if ((*flags) & LDLM_FL_LOCK_CHANGED) {
542                 int newmode = reply->lock_desc.l_req_mode;
543                 LASSERT(!is_replay);
544                 if (newmode && newmode != lock->l_req_mode) {
545                         LDLM_DEBUG(lock, "server returned different mode %s",
546                                    ldlm_lockname[newmode]);
547                         lock->l_req_mode = newmode;
548                 }
549
550                 if (memcmp(reply->lock_desc.l_resource.lr_name.name,
551                           lock->l_resource->lr_name.name,
552                           sizeof(struct ldlm_res_id))) {
553                         CDEBUG(D_INFO, "remote intent success, locking "
554                                         "(%ld,%ld,%ld) instead of "
555                                         "(%ld,%ld,%ld)\n",
556                               (long)reply->lock_desc.l_resource.lr_name.name[0],
557                               (long)reply->lock_desc.l_resource.lr_name.name[1],
558                               (long)reply->lock_desc.l_resource.lr_name.name[2],
559                               (long)lock->l_resource->lr_name.name[0],
560                               (long)lock->l_resource->lr_name.name[1],
561                               (long)lock->l_resource->lr_name.name[2]);
562
563                         rc = ldlm_lock_change_resource(ns, lock,
564                                         &reply->lock_desc.l_resource.lr_name);
565                         if (rc || lock->l_resource == NULL)
566                                 GOTO(cleanup, rc = -ENOMEM);
567                         LDLM_DEBUG(lock, "client-side enqueue, new resource");
568                 }
569                 if (with_policy)
570                         if (!(type == LDLM_IBITS && !(exp->exp_connect_flags &
571                                                     OBD_CONNECT_IBITS)))
572                                 lock->l_policy_data =
573                                                  reply->lock_desc.l_policy_data;
574                 if (type != LDLM_PLAIN)
575                         LDLM_DEBUG(lock,"client-side enqueue, new policy data");
576         }
577
578         if ((*flags) & LDLM_FL_AST_SENT ||
579             /* Cancel extent locks as soon as possible on a liblustre client,
580              * because it cannot handle asynchronous ASTs robustly (see
581              * bug 7311). */
582             (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
583                 lock_res_and_lock(lock);
584                 lock->l_flags |= LDLM_FL_CBPENDING |  LDLM_FL_BL_AST;
585                 unlock_res_and_lock(lock);
586                 LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
587         }
588
589         /* If the lock has already been granted by a completion AST, don't
590          * clobber the LVB with an older one. */
591         if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {
592                 void *tmplvb;
593
594                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
595                                      lvb_len);
596                 tmplvb = req_capsule_server_swab_get(&req->rq_pill,
597                                                      &RMF_DLM_LVB,
598                                                      lvb_swabber);
599                 if (tmplvb == NULL)
600                         GOTO(cleanup, rc = -EPROTO);
601                 memcpy(lock->l_lvb_data, tmplvb, lvb_len);
602         }
603
604         if (!is_replay) {
605                 rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
606                 if (lock->l_completion_ast != NULL) {
607                         int err = lock->l_completion_ast(lock, *flags, NULL);
608                         if (!rc)
609                                 rc = err;
610                         if (rc && type != LDLM_FLOCK) /* bug 9425, bug 10250 */
611                                 cleanup_phase = 1;
612                 }
613         }
614
615         if (lvb_len && lvb != NULL) {
616                 /* Copy the LVB here, and not earlier, because the completion
617                  * AST (if any) can override what we got in the reply */
618                 memcpy(lvb, lock->l_lvb_data, lvb_len);
619         }
620
621         LDLM_DEBUG(lock, "client-side enqueue END");
622         EXIT;
623 cleanup:
624         if (cleanup_phase == 1 && rc)
625                 failed_lock_cleanup(ns, lock, lockh, mode);
626         /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
627         LDLM_LOCK_PUT(lock);
628         LDLM_LOCK_RELEASE(lock);
629         return rc;
630 }
631
632 /* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
633  * a single page on the send/receive side. XXX: 512 should be changed
634  * to more adequate value. */
635 static inline int ldlm_req_handles_avail(int req_size, int off)
636 {
637         int avail;
638
639         avail = min_t(int, LDLM_MAXREQSIZE, CFS_PAGE_SIZE - 512) - req_size;
640         if (likely(avail >= 0))
641                 avail /= (int)sizeof(struct lustre_handle);
642         else
643                 avail = 0;
644         avail += LDLM_LOCKREQ_HANDLES - off;
645
646         return avail;
647 }
648
649 static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
650                                              enum req_location loc,
651                                              int off)
652 {
653         int size = req_capsule_msg_size(pill, loc);
654         return ldlm_req_handles_avail(size, off);
655 }
656
657 static inline int ldlm_format_handles_avail(struct obd_import *imp,
658                                             const struct req_format *fmt,
659                                             enum req_location loc, int off)
660 {
661         int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
662         return ldlm_req_handles_avail(size, off);
663 }
664
665 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
666  * @count locks in @cancels. */
667 int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
668                       int version, int opc, int canceloff,
669                       struct list_head *cancels, int count)
670 {
671         struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
672         struct req_capsule      *pill = &req->rq_pill;
673         struct ldlm_request     *dlm = NULL;
674         int flags, avail, to_free, bufcount, pack = 0;
675         CFS_LIST_HEAD(head);
676         int rc;
677         ENTRY;
678
679         if (cancels == NULL)
680                 cancels = &head;
681         if (exp_connect_cancelset(exp)) {
682                 /* Estimate the amount of available space in the request. */
683                 bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
684                 avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
685
686                 flags = ns_connect_lru_resize(ns) ?
687                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
688                 to_free = !ns_connect_lru_resize(ns) &&
689                           opc == LDLM_ENQUEUE ? 1 : 0;
690
691                 /* Cancel lru locks here _only_ if the server supports
692                  * EARLY_CANCEL. Otherwise we have to send extra CANCEL
693                  * rpc, what will make us slower. */
694                 if (avail > count)
695                         count += ldlm_cancel_lru_local(ns, cancels, to_free,
696                                                        avail - count, 0, flags);
697                 if (avail > count)
698                         pack = count;
699                 else
700                         pack = avail;
701                 req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT,
702                                      ldlm_request_bufsize(pack, opc));
703         }
704
705         rc = ptlrpc_request_pack(req, version, opc);
706         if (rc) {
707                 ldlm_lock_list_put(cancels, l_bl_ast, count);
708                 RETURN(rc);
709         }
710
711         if (exp_connect_cancelset(exp)) {
712                 if (canceloff) {
713                         dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
714                         LASSERT(dlm);
715                         /* Skip first lock handler in ldlm_request_pack(),
716                          * this method will incrment @lock_count according
717                          * to the lock handle amount actually written to
718                          * the buffer. */
719                         dlm->lock_count = canceloff;
720                 }
721                 /* Pack into the request @pack lock handles. */
722                 ldlm_cli_cancel_list(cancels, pack, req, 0);
723                 /* Prepare and send separate cancel rpc for others. */
724                 ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
725         } else {
726                 ldlm_lock_list_put(cancels, l_bl_ast, count);
727         }
728         RETURN(0);
729 }
730
731 int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
732                           struct list_head *cancels, int count)
733 {
734         return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
735                                  LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
736 }
737
738 /* If a request has some specific initialisation it is passed in @reqp,
739  * otherwise it is created in ldlm_cli_enqueue.
740  *
741  * Supports sync and async requests, pass @async flag accordingly. If a
742  * request was created in ldlm_cli_enqueue and it is the async request,
743  * pass it to the caller in @reqp. */
744 int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
745                      struct ldlm_enqueue_info *einfo,
746                      const struct ldlm_res_id *res_id,
747                      ldlm_policy_data_t *policy, int *flags,
748                      void *lvb, __u32 lvb_len, void *lvb_swabber,
749                      struct lustre_handle *lockh, int async)
750 {
751         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
752         struct ldlm_lock      *lock;
753         struct ldlm_request   *body;
754         int                    is_replay = *flags & LDLM_FL_REPLAY;
755         int                    req_passed_in = 1;
756         int                    rc, err;
757         struct ptlrpc_request *req;
758         ENTRY;
759
760         LASSERT(exp != NULL);
761
762         /* If we're replaying this lock, just check some invariants.
763          * If we're creating a new lock, get everything all setup nice. */
764         if (is_replay) {
765                 lock = ldlm_handle2lock_long(lockh, 0);
766                 LASSERT(lock != NULL);
767                 LDLM_DEBUG(lock, "client-side enqueue START");
768                 LASSERT(exp == lock->l_conn_export);
769         } else {
770                 const struct ldlm_callback_suite cbs = {
771                         .lcs_completion = einfo->ei_cb_cp,
772                         .lcs_blocking   = einfo->ei_cb_bl,
773                         .lcs_glimpse    = einfo->ei_cb_gl,
774                         .lcs_weigh      = einfo->ei_cb_wg
775                 };
776                 lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
777                                         einfo->ei_mode, &cbs, einfo->ei_cbdata,
778                                         lvb_len);
779                 if (lock == NULL)
780                         RETURN(-ENOMEM);
781                 /* for the local lock, add the reference */
782                 ldlm_lock_addref_internal(lock, einfo->ei_mode);
783                 ldlm_lock2handle(lock, lockh);
784                 lock->l_lvb_swabber = lvb_swabber;
785                 if (policy != NULL) {
786                         /* INODEBITS_INTEROP: If the server does not support
787                          * inodebits, we will request a plain lock in the
788                          * descriptor (ldlm_lock2desc() below) but use an
789                          * inodebits lock internally with both bits set.
790                          */
791                         if (einfo->ei_type == LDLM_IBITS &&
792                             !(exp->exp_connect_flags & OBD_CONNECT_IBITS))
793                                 lock->l_policy_data.l_inodebits.bits =
794                                         MDS_INODELOCK_LOOKUP |
795                                         MDS_INODELOCK_UPDATE;
796                         else
797                                 lock->l_policy_data = *policy;
798                 }
799
800                 if (einfo->ei_type == LDLM_EXTENT)
801                         lock->l_req_extent = policy->l_extent;
802                 LDLM_DEBUG(lock, "client-side enqueue START");
803         }
804
805         /* lock not sent to server yet */
806
807         if (reqp == NULL || *reqp == NULL) {
808                 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
809                                                 &RQF_LDLM_ENQUEUE,
810                                                 LUSTRE_DLM_VERSION,
811                                                 LDLM_ENQUEUE);
812                 if (req == NULL) {
813                         failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
814                         LDLM_LOCK_RELEASE(lock);
815                         RETURN(-ENOMEM);
816                 }
817                 req_passed_in = 0;
818                 if (reqp)
819                         *reqp = req;
820         } else {
821                 int len;
822
823                 req = *reqp;
824                 len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
825                                            RCL_CLIENT);
826                 LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
827                          DLM_LOCKREQ_OFF, len, (int)sizeof(*body));
828         }
829
830         lock->l_conn_export = exp;
831         lock->l_export = NULL;
832         lock->l_blocking_ast = einfo->ei_cb_bl;
833
834         /* Dump lock data into the request buffer */
835         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
836         ldlm_lock2desc(lock, &body->lock_desc);
837         body->lock_flags = *flags;
838         body->lock_handle[0] = *lockh;
839
840         /* Continue as normal. */
841         if (!req_passed_in) {
842                 if (lvb_len > 0) {
843                         req_capsule_extend(&req->rq_pill,
844                                            &RQF_LDLM_ENQUEUE_LVB);
845                         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
846                                              RCL_SERVER, lvb_len);
847                 }
848                 ptlrpc_request_set_replen(req);
849         }
850
851         /*
852          * Liblustre client doesn't get extent locks, except for O_APPEND case
853          * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
854          * [i_size, OBD_OBJECT_EOF] lock is taken.
855          */
856         LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
857                      policy->l_extent.end == OBD_OBJECT_EOF));
858
859         if (async) {
860                 LASSERT(reqp != NULL);
861                 RETURN(0);
862         }
863
864         LDLM_DEBUG(lock, "sending request");
865         rc = ptlrpc_queue_wait(req);
866         err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
867                                     einfo->ei_mode, flags, lvb, lvb_len,
868                                     lvb_swabber, lockh, rc);
869
870         /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
871          * one reference that we took */
872         if (err == -ENOLCK)
873                 LDLM_LOCK_RELEASE(lock);
874         else
875                 rc = err;
876
877         if (!req_passed_in && req != NULL) {
878                 ptlrpc_req_finished(req);
879                 if (reqp)
880                         *reqp = NULL;
881         }
882
883         RETURN(rc);
884 }
885
886 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
887                                   __u32 *flags)
888 {
889         struct ldlm_resource *res;
890         int rc;
891         ENTRY;
892         if (ns_is_client(lock->l_resource->lr_namespace)) {
893                 CERROR("Trying to cancel local lock\n");
894                 LBUG();
895         }
896         LDLM_DEBUG(lock, "client-side local convert");
897
898         res = ldlm_lock_convert(lock, new_mode, flags);
899         if (res) {
900                 ldlm_reprocess_all(res);
901                 rc = 0;
902         } else {
903                 rc = EDEADLOCK;
904         }
905         LDLM_DEBUG(lock, "client-side local convert handler END");
906         LDLM_LOCK_PUT(lock);
907         RETURN(rc);
908 }
909
910 /* FIXME: one of ldlm_cli_convert or the server side should reject attempted
911  * conversion of locks which are on the waiting or converting queue */
912 /* Caller of this code is supposed to take care of lock readers/writers
913    accounting */
914 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
915 {
916         struct ldlm_request   *body;
917         struct ldlm_reply     *reply;
918         struct ldlm_lock      *lock;
919         struct ldlm_resource  *res;
920         struct ptlrpc_request *req;
921         int                    rc;
922         ENTRY;
923
924         lock = ldlm_handle2lock(lockh);
925         if (!lock) {
926                 LBUG();
927                 RETURN(-EINVAL);
928         }
929         *flags = 0;
930
931         if (lock->l_conn_export == NULL)
932                 RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
933
934         LDLM_DEBUG(lock, "client-side convert");
935
936         req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
937                                         &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
938                                         LDLM_CONVERT);
939         if (req == NULL) {
940                 LDLM_LOCK_PUT(lock);
941                 RETURN(-ENOMEM);
942         }
943
944         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
945         body->lock_handle[0] = lock->l_remote_handle;
946
947         body->lock_desc.l_req_mode = new_mode;
948         body->lock_flags = *flags;
949
950
951         ptlrpc_request_set_replen(req);
952         rc = ptlrpc_queue_wait(req);
953         if (rc != ELDLM_OK)
954                 GOTO(out, rc);
955
956         reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
957         if (reply == NULL)
958                 GOTO(out, rc = -EPROTO);
959
960         if (req->rq_status)
961                 GOTO(out, rc = req->rq_status);
962
963         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
964         if (res != NULL) {
965                 ldlm_reprocess_all(res);
966                 /* Go to sleep until the lock is granted. */
967                 /* FIXME: or cancelled. */
968                 if (lock->l_completion_ast) {
969                         rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
970                                                     NULL);
971                         if (rc)
972                                 GOTO(out, rc);
973                 }
974         } else {
975                 rc = EDEADLOCK;
976         }
977         EXIT;
978  out:
979         LDLM_LOCK_PUT(lock);
980         ptlrpc_req_finished(req);
981         return rc;
982 }
983
984 /* Cancel locks locally.
985  * Returns:
986  * LDLM_FL_LOCAL_ONLY if tere is no need in a CANCEL rpc to the server;
987  * LDLM_FL_CANCELING otherwise;
988  * LDLM_FL_BL_AST if there is a need in a separate CANCEL rpc. */
989 static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
990 {
991         int rc = LDLM_FL_LOCAL_ONLY;
992         ENTRY;
993
994         if (lock->l_conn_export) {
995                 int local_only;
996
997                 LDLM_DEBUG(lock, "client-side cancel");
998                 /* Set this flag to prevent others from getting new references*/
999                 lock_res_and_lock(lock);
1000                 lock->l_flags |= LDLM_FL_CBPENDING;
1001                 local_only = (lock->l_flags &
1002                               (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
1003                 ldlm_cancel_callback(lock);
1004                 rc = (lock->l_flags & LDLM_FL_BL_AST) ?
1005                         LDLM_FL_BL_AST : LDLM_FL_CANCELING;
1006                 unlock_res_and_lock(lock);
1007
1008                 if (local_only) {
1009                         CDEBUG(D_DLMTRACE, "not sending request (at caller's "
1010                                "instruction)\n");
1011                         rc = LDLM_FL_LOCAL_ONLY;
1012                 }
1013                 ldlm_lock_cancel(lock);
1014         } else {
1015                 if (ns_is_client(lock->l_resource->lr_namespace)) {
1016                         LDLM_ERROR(lock, "Trying to cancel local lock");
1017                         LBUG();
1018                 }
1019                 LDLM_DEBUG(lock, "server-side local cancel");
1020                 ldlm_lock_cancel(lock);
1021                 ldlm_reprocess_all(lock->l_resource);
1022                 LDLM_DEBUG(lock, "server-side local cancel handler END");
1023         }
1024
1025         RETURN(rc);
1026 }
1027
1028 /* Pack @count locks in @head into ldlm_request buffer at the offset @off,
1029    of the request @req. */
1030 static void ldlm_cancel_pack(struct ptlrpc_request *req,
1031                              struct list_head *head, int count)
1032 {
1033         struct ldlm_request *dlm;
1034         struct ldlm_lock *lock;
1035         int max, packed = 0;
1036         ENTRY;
1037
1038         dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
1039         LASSERT(dlm != NULL);
1040
1041         /* Check the room in the request buffer. */
1042         max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) -
1043                 sizeof(struct ldlm_request);
1044         max /= sizeof(struct lustre_handle);
1045         max += LDLM_LOCKREQ_HANDLES;
1046         LASSERT(max >= dlm->lock_count + count);
1047
1048         /* XXX: it would be better to pack lock handles grouped by resource.
1049          * so that the server cancel would call filter_lvbo_update() less
1050          * frequently. */
1051         list_for_each_entry(lock, head, l_bl_ast) {
1052                 if (!count--)
1053                         break;
1054                 LASSERT(lock->l_conn_export);
1055                 /* Pack the lock handle to the given request buffer. */
1056                 LDLM_DEBUG(lock, "packing");
1057                 dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
1058                 packed++;
1059         }
1060         CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
1061         EXIT;
1062 }
1063
1064 /* Prepare and send a batched cancel rpc, it will include count lock handles
1065  * of locks given in @head. */
1066 int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
1067                         int count, int flags)
1068 {
1069         struct ptlrpc_request *req = NULL;
1070         struct obd_import *imp;
1071         int free, sent = 0;
1072         int rc = 0;
1073         ENTRY;
1074
1075         LASSERT(exp != NULL);
1076         LASSERT(count > 0);
1077
1078         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, obd_fail_val);
1079
1080         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
1081                 RETURN(count);
1082
1083         free = ldlm_format_handles_avail(class_exp2cliimp(exp),
1084                                          &RQF_LDLM_CANCEL, RCL_CLIENT, 0);
1085         if (count > free)
1086                 count = free;
1087
1088         while (1) {
1089                 int bufcount;
1090
1091                 imp = class_exp2cliimp(exp);
1092                 if (imp == NULL || imp->imp_invalid) {
1093                         CDEBUG(D_DLMTRACE,
1094                                "skipping cancel on invalid import %p\n", imp);
1095                         RETURN(count);
1096                 }
1097
1098                 req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
1099                 if (req == NULL)
1100                         GOTO(out, rc = -ENOMEM);
1101
1102                 bufcount = req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT);
1103                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
1104                                      ldlm_request_bufsize(count, LDLM_CANCEL));
1105
1106                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
1107                 if (rc) {
1108                         ptlrpc_request_free(req);
1109                         GOTO(out, rc);
1110                 }
1111                 req->rq_no_resend = 1;
1112                 req->rq_no_delay = 1;
1113
1114                 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
1115                 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
1116                 ptlrpc_at_set_req_timeout(req);
1117
1118                 ldlm_cancel_pack(req, cancels, count);
1119
1120                 ptlrpc_request_set_replen(req);
1121                 if (flags & LDLM_FL_ASYNC) {
1122                         ptlrpcd_add_req(req);
1123                         sent = count;
1124                         GOTO(out, 0);
1125                 } else {
1126                         rc = ptlrpc_queue_wait(req);
1127                 }
1128                 if (rc == ESTALE) {
1129                         CDEBUG(D_DLMTRACE, "client/server (nid %s) "
1130                                "out of sync -- not fatal\n",
1131                                libcfs_nid2str(req->rq_import->
1132                                               imp_connection->c_peer.nid));
1133                         rc = 0;
1134                 } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
1135                            req->rq_import_generation == imp->imp_generation) {
1136                         ptlrpc_req_finished(req);
1137                         continue;
1138                 } else if (rc != ELDLM_OK) {
1139                         CERROR("Got rc %d from cancel RPC: canceling "
1140                                "anyway\n", rc);
1141                         break;
1142                 }
1143                 sent = count;
1144                 break;
1145         }
1146
1147         ptlrpc_req_finished(req);
1148         EXIT;
1149 out:
1150         return sent ? sent : rc;
1151 }
1152
1153 static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
1154 {
1155         LASSERT(imp != NULL);
1156         return &imp->imp_obd->obd_namespace->ns_pool;
1157 }
1158
1159 /**
1160  * Update client's obd pool related fields with new SLV and Limit from \a req.
1161  */
1162 int ldlm_cli_update_pool(struct ptlrpc_request *req)
1163 {
1164         struct obd_device *obd;
1165         __u64 old_slv, new_slv;
1166         __u32 new_limit;
1167         ENTRY;
1168
1169         if (unlikely(!req->rq_import || !req->rq_import->imp_obd ||
1170                      !imp_connect_lru_resize(req->rq_import)))
1171         {
1172                 /*
1173                  * Do nothing for corner cases.
1174                  */
1175                 RETURN(0);
1176         }
1177
1178         /*
1179          * In some cases RPC may contain slv and limit zeroed out. This is
1180          * the case when server does not support lru resize feature. This is
1181          * also possible in some recovery cases when server side reqs have no
1182          * ref to obd export and thus access to server side namespace is no
1183          * possible.
1184          */
1185         if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
1186             lustre_msg_get_limit(req->rq_repmsg) == 0) {
1187                 DEBUG_REQ(D_HA, req, "Zero SLV or Limit found "
1188                           "(SLV: "LPU64", Limit: %u)",
1189                           lustre_msg_get_slv(req->rq_repmsg),
1190                           lustre_msg_get_limit(req->rq_repmsg));
1191                 RETURN(0);
1192         }
1193
1194         new_limit = lustre_msg_get_limit(req->rq_repmsg);
1195         new_slv = lustre_msg_get_slv(req->rq_repmsg);
1196         obd = req->rq_import->imp_obd;
1197
1198         /*
1199          * Set new SLV and Limit to obd fields to make accessible for pool
1200          * thread. We do not access obd_namespace and pool directly here
1201          * as there is no reliable way to make sure that they are still
1202          * alive in cleanup time. Evil races are possible which may cause
1203          * oops in that time.
1204          */
1205         write_lock(&obd->obd_pool_lock);
1206         old_slv = obd->obd_pool_slv;
1207         obd->obd_pool_slv = new_slv;
1208         obd->obd_pool_limit = new_limit;
1209         write_unlock(&obd->obd_pool_lock);
1210
1211         RETURN(0);
1212 }
1213 EXPORT_SYMBOL(ldlm_cli_update_pool);
1214
1215 int ldlm_cli_cancel(struct lustre_handle *lockh)
1216 {
1217         struct obd_export *exp;
1218         int avail, flags, count = 1, rc = 0;
1219         struct ldlm_namespace *ns;
1220         struct ldlm_lock *lock;
1221         CFS_LIST_HEAD(cancels);
1222         ENTRY;
1223
1224         /* concurrent cancels on the same handle can happen */
1225         lock = ldlm_handle2lock_long(lockh, LDLM_FL_CANCELING);
1226         if (lock == NULL) {
1227                 LDLM_DEBUG_NOLOCK("lock is already being destroyed\n");
1228                 RETURN(0);
1229         }
1230
1231         rc = ldlm_cli_cancel_local(lock);
1232         if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) {
1233                 LDLM_LOCK_RELEASE(lock);
1234                 RETURN(rc < 0 ? rc : 0);
1235         }
1236         /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
1237          * rpc which goes to canceld portal, so we can cancel other lru locks
1238          * here and send them all as one LDLM_CANCEL rpc. */
1239         LASSERT(list_empty(&lock->l_bl_ast));
1240         list_add(&lock->l_bl_ast, &cancels);
1241
1242         exp = lock->l_conn_export;
1243         if (exp_connect_cancelset(exp)) {
1244                 avail = ldlm_format_handles_avail(class_exp2cliimp(exp),
1245                                                   &RQF_LDLM_CANCEL,
1246                                                   RCL_CLIENT, 0);
1247                 LASSERT(avail > 0);
1248
1249                 ns = lock->l_resource->lr_namespace;
1250                 flags = ns_connect_lru_resize(ns) ?
1251                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
1252                 count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
1253                                                LDLM_FL_BL_AST, flags);
1254         }
1255         ldlm_cli_cancel_list(&cancels, count, NULL, 0);
1256         RETURN(0);
1257 }
1258
1259 /* XXX until we will have compound requests and can cut cancels from generic rpc
1260  * we need send cancels with LDLM_FL_BL_AST flag as separate rpc */
1261 static int ldlm_cancel_list(struct list_head *cancels, int count, int flags)
1262 {
1263         CFS_LIST_HEAD(head);
1264         struct ldlm_lock *lock, *next;
1265         int left = 0, bl_ast = 0, rc;
1266
1267         left = count;
1268         list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
1269                 if (left-- == 0)
1270                         break;
1271
1272                 if (flags & LDLM_FL_LOCAL_ONLY) {
1273                         rc = LDLM_FL_LOCAL_ONLY;
1274                         ldlm_lock_cancel(lock);
1275                 } else {
1276                         rc = ldlm_cli_cancel_local(lock);
1277                 }
1278                 if (!(flags & LDLM_FL_BL_AST) && (rc == LDLM_FL_BL_AST)) {
1279                         LDLM_DEBUG(lock, "Cancel lock separately");
1280                         list_del_init(&lock->l_bl_ast);
1281                         list_add(&lock->l_bl_ast, &head);
1282                         bl_ast ++;
1283                         continue;
1284                 }
1285                 if (rc == LDLM_FL_LOCAL_ONLY) {
1286                         /* CANCEL RPC should not be sent to server. */
1287                         list_del_init(&lock->l_bl_ast);
1288                         LDLM_LOCK_RELEASE(lock);
1289                         count--;
1290                 }
1291
1292         }
1293         if (bl_ast > 0) {
1294                 count -= bl_ast;
1295                 ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
1296         }
1297
1298         RETURN(count);
1299 }
1300
1301 /**
1302  * Callback function for shrink policy. Makes decision whether to keep
1303  * \a lock in LRU for current \a LRU size \a unused, added in current scan
1304  * \a added and number of locks to be preferably canceled \a count.
1305  *
1306  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1307  *
1308  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1309  */
1310 static ldlm_policy_res_t ldlm_cancel_shrink_policy(struct ldlm_namespace *ns,
1311                                                    struct ldlm_lock *lock,
1312                                                    int unused, int added,
1313                                                    int count)
1314 {
1315         int lock_cost;
1316         __u64 page_nr;
1317
1318         /*
1319          * Stop lru processing when we reached passed @count or checked all
1320          * locks in lru.
1321          */
1322         if (count && added >= count)
1323                 return LDLM_POLICY_KEEP_LOCK;
1324
1325         if (lock->l_resource->lr_type == LDLM_EXTENT) {
1326                 if (lock->l_weigh_ast) {
1327                         /*
1328                          * For liblustre, l_weigh_ast should return 0 since it
1329                          * don't cache pages
1330                          */
1331                         page_nr = lock->l_weigh_ast(lock);
1332                 } else {
1333                 struct ldlm_extent *l_extent;
1334
1335                 /*
1336                  * For all extent locks cost is 1 + number of pages in
1337                  * their extent.
1338                  */
1339                 l_extent = &lock->l_policy_data.l_extent;
1340                         page_nr = l_extent->end - l_extent->start;
1341                 do_div(page_nr, CFS_PAGE_SIZE);
1342                 }
1343                 lock_cost = 1 + page_nr;
1344         } else {
1345                 /*
1346                  * For all locks which are not extent ones cost is 1
1347                  */
1348                 lock_cost = 1;
1349         }
1350
1351         /*
1352          * Keep all expensive locks in lru for the memory pressure time
1353          * cancel policy. They anyways may be canceled by lru resize
1354          * pplicy if they have not small enough CLV.
1355          */
1356         return lock_cost > ns->ns_shrink_thumb ?
1357                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1358 }
1359
1360 /**
1361  * Callback function for lru-resize policy. Makes decision whether to keep
1362  * \a lock in LRU for current \a LRU size \a unused, added in current scan
1363  * \a added and number of locks to be preferably canceled \a count.
1364  *
1365  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1366  *
1367  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1368  */
1369 static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
1370                                                  struct ldlm_lock *lock,
1371                                                  int unused, int added,
1372                                                  int count)
1373 {
1374         cfs_time_t cur = cfs_time_current();
1375         struct ldlm_pool *pl = &ns->ns_pool;
1376         __u64 slv, lvf, lv;
1377         cfs_time_t la;
1378
1379         /*
1380          * Stop lru processing when we reached passed @count or checked all
1381          * locks in lru.
1382          */
1383         if (count && added >= count)
1384                 return LDLM_POLICY_KEEP_LOCK;
1385
1386         slv = ldlm_pool_get_slv(pl);
1387         lvf = ldlm_pool_get_lvf(pl);
1388         la = cfs_duration_sec(cfs_time_sub(cur,
1389                               lock->l_last_used));
1390
1391         /*
1392          * Stop when slv is not yet come from server or lv is smaller than
1393          * it is.
1394          */
1395         lv = lvf * la * unused;
1396
1397         /*
1398          * Inform pool about current CLV to see it via proc.
1399          */
1400         ldlm_pool_set_clv(pl, lv);
1401         return (slv == 1 || lv < slv) ?
1402                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1403 }
1404
1405 /**
1406  * Callback function for proc used policy. Makes decision whether to keep
1407  * \a lock in LRU for current \a LRU size \a unused, added in current scan
1408  * \a added and number of locks to be preferably canceled \a count.
1409  *
1410  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1411  *
1412  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1413  */
1414 static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
1415                                                    struct ldlm_lock *lock,
1416                                                    int unused, int added,
1417                                                    int count)
1418 {
1419         /*
1420          * Stop lru processing when we reached passed @count or checked all
1421          * locks in lru.
1422          */
1423         return (added >= count) ?
1424                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1425 }
1426
1427 /**
1428  * Callback function for aged policy. Makes decision whether to keep
1429  * \a lock in LRU for current \a LRU size \a unused, added in current scan
1430  * \a added and number of locks to be preferably canceled \a count.
1431  *
1432  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1433  *
1434  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1435  */
1436 static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
1437                                                  struct ldlm_lock *lock,
1438                                                  int unused, int added,
1439                                                  int count)
1440 {
1441         /*
1442          * Stop lru processing if young lock is found and we reached passed
1443          * @count.
1444          */
1445         return ((added >= count) &&
1446                 cfs_time_before(cfs_time_current(),
1447                                 cfs_time_add(lock->l_last_used,
1448                                              ns->ns_max_age))) ?
1449                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1450 }
1451
1452 /**
1453  * Callback function for default policy. Makes decision whether to keep
1454  * \a lock in LRU for current \a LRU size \a unused, added in current scan
1455  * \a added and number of locks to be preferably canceled \a count.
1456  *
1457  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1458  *
1459  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1460  */
1461 static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns,
1462                                                     struct ldlm_lock *lock,
1463                                                     int unused, int added,
1464                                                     int count)
1465 {
1466         /*
1467          * Stop lru processing when we reached passed @count or checked all
1468          * locks in lru.
1469          */
1470         return (added >= count) ?
1471                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1472 }
1473
1474 typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *,
1475                                                       struct ldlm_lock *, int,
1476                                                       int, int);
1477
1478 static ldlm_cancel_lru_policy_t
1479 ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
1480 {
1481         if (ns_connect_lru_resize(ns)) {
1482                 if (flags & LDLM_CANCEL_SHRINK)
1483                         return ldlm_cancel_shrink_policy;
1484                 else if (flags & LDLM_CANCEL_LRUR)
1485                         return ldlm_cancel_lrur_policy;
1486                 else if (flags & LDLM_CANCEL_PASSED)
1487                         return ldlm_cancel_passed_policy;
1488         } else {
1489                 if (flags & LDLM_CANCEL_AGED)
1490                         return ldlm_cancel_aged_policy;
1491         }
1492
1493         return ldlm_cancel_default_policy;
1494 }
1495
1496 /* - Free space in lru for @count new locks,
1497  *   redundant unused locks are canceled locally;
1498  * - also cancel locally unused aged locks;
1499  * - do not cancel more than @max locks;
1500  * - GET the found locks and add them into the @cancels list.
1501  *
1502  * A client lock can be added to the l_bl_ast list only when it is
1503  * marked LDLM_FL_CANCELING. Otherwise, somebody is already doing CANCEL.
1504  * There are the following use cases: ldlm_cancel_resource_local(),
1505  * ldlm_cancel_lru_local() and ldlm_cli_cancel(), which check&set this
1506  * flag properly. As any attempt to cancel a lock rely on this flag,
1507  * l_bl_ast list is accessed later without any special locking.
1508  *
1509  * Calling policies for enabled lru resize:
1510  * ----------------------------------------
1511  * flags & LDLM_CANCEL_LRUR - use lru resize policy (SLV from server) to
1512  *                            cancel not more than @count locks;
1513  *
1514  * flags & LDLM_CANCEL_PASSED - cancel @count number of old locks (located at
1515  *                              the beginning of lru list);
1516  *
1517  * flags & LDLM_CANCEL_SHRINK - cancel not more than @count locks according to
1518  *                              memory pressre policy function;
1519  *
1520  * flags & LDLM_CANCEL_AGED -   cancel alocks according to "aged policy".
1521  */
1522 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
1523                           int count, int max, int cancel_flags, int flags)
1524 {
1525         ldlm_cancel_lru_policy_t pf;
1526         struct ldlm_lock *lock, *next;
1527         int added = 0, unused;
1528         ENTRY;
1529
1530         spin_lock(&ns->ns_unused_lock);
1531         unused = ns->ns_nr_unused;
1532
1533         if (!ns_connect_lru_resize(ns))
1534                 count += unused - ns->ns_max_unused;
1535
1536         pf = ldlm_cancel_lru_policy(ns, flags);
1537         LASSERT(pf != NULL);
1538
1539         while (!list_empty(&ns->ns_unused_list)) {
1540                 /* For any flags, stop scanning if @max is reached. */
1541                 if (max && added >= max)
1542                         break;
1543
1544                 list_for_each_entry_safe(lock, next, &ns->ns_unused_list, l_lru){
1545                         /* No locks which got blocking requests. */
1546                         LASSERT(!(lock->l_flags & LDLM_FL_BL_AST));
1547
1548                         /* Somebody is already doing CANCEL. No need in this
1549                          * lock in lru, do not traverse it again. */
1550                         if (!(lock->l_flags & LDLM_FL_CANCELING))
1551                                 break;
1552
1553                         ldlm_lock_remove_from_lru_nolock(lock);
1554                 }
1555                 if (&lock->l_lru == &ns->ns_unused_list)
1556                         break;
1557
1558                 LDLM_LOCK_GET(lock);
1559                 spin_unlock(&ns->ns_unused_lock);
1560                 lu_ref_add(&lock->l_reference, __FUNCTION__, cfs_current());
1561
1562                 /* Pass the lock through the policy filter and see if it
1563                  * should stay in lru.
1564                  *
1565                  * Even for shrinker policy we stop scanning if
1566                  * we find a lock that should stay in the cache.
1567                  * We should take into account lock age anyway
1568                  * as new lock even if it is small of weight is
1569                  * valuable resource.
1570                  *
1571                  * That is, for shrinker policy we drop only
1572                  * old locks, but additionally chose them by
1573                  * their weight. Big extent locks will stay in
1574                  * the cache. */
1575                 if (pf(ns, lock, unused, added, count) ==
1576                     LDLM_POLICY_KEEP_LOCK) {
1577                         lu_ref_del(&lock->l_reference,
1578                                    __FUNCTION__, cfs_current());
1579                         LDLM_LOCK_RELEASE(lock);
1580                         spin_lock(&ns->ns_unused_lock);
1581                         break;
1582                 }
1583
1584                 lock_res_and_lock(lock);
1585                 /* Check flags again under the lock. */
1586                 if ((lock->l_flags & LDLM_FL_CANCELING) ||
1587                     (ldlm_lock_remove_from_lru(lock) == 0)) {
1588                         /* other thread is removing lock from lru or
1589                          * somebody is already doing CANCEL or
1590                          * there is a blocking request which will send
1591                          * cancel by itseft or the lock is matched
1592                          * is already not unused. */
1593                         unlock_res_and_lock(lock);
1594                         lu_ref_del(&lock->l_reference,
1595                                    __FUNCTION__, cfs_current());
1596                         LDLM_LOCK_RELEASE(lock);
1597                         spin_lock(&ns->ns_unused_lock);
1598                         continue;
1599                 }
1600                 LASSERT(!lock->l_readers && !lock->l_writers);
1601
1602                 /* If we have chosen to cancel this lock voluntarily, we
1603                  * better send cancel notification to server, so that it
1604                  * frees appropriate state. This might lead to a race
1605                  * where while we are doing cancel here, server is also
1606                  * silently cancelling this lock. */
1607                 lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
1608
1609                 /* Setting the CBPENDING flag is a little misleading,
1610                  * but prevents an important race; namely, once
1611                  * CBPENDING is set, the lock can accumulate no more
1612                  * readers/writers. Since readers and writers are
1613                  * already zero here, ldlm_lock_decref() won't see
1614                  * this flag and call l_blocking_ast */
1615                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
1616
1617                 /* We can't re-add to l_lru as it confuses the
1618                  * refcounting in ldlm_lock_remove_from_lru() if an AST
1619                  * arrives after we drop ns_lock below. We use l_bl_ast
1620                  * and can't use l_pending_chain as it is used both on
1621                  * server and client nevertheless bug 5666 says it is
1622                  * used only on server */
1623                 LASSERT(list_empty(&lock->l_bl_ast));
1624                 list_add(&lock->l_bl_ast, cancels);
1625                 unlock_res_and_lock(lock);
1626                 lu_ref_del(&lock->l_reference, __FUNCTION__, cfs_current());
1627                 spin_lock(&ns->ns_unused_lock);
1628                 added++;
1629                 unused--;
1630         }
1631         spin_unlock(&ns->ns_unused_lock);
1632         RETURN(ldlm_cancel_list(cancels, added, cancel_flags));
1633 }
1634
1635 /* Returns number of locks which could be canceled next time when
1636  * ldlm_cancel_lru() is called. Used from locks pool shrinker. */
1637 int ldlm_cancel_lru_estimate(struct ldlm_namespace *ns,
1638                              int count, int max, int flags)
1639 {
1640         struct list_head disp = CFS_LIST_HEAD_INIT(disp);
1641         ldlm_cancel_lru_policy_t pf;
1642         struct ldlm_lock *lock;
1643         int added = 0, unused;
1644         int loop_stop = 0;
1645         ENTRY;
1646
1647         pf = ldlm_cancel_lru_policy(ns, flags);
1648         LASSERT(pf != NULL);
1649         spin_lock(&ns->ns_unused_lock);
1650         unused = ns->ns_nr_unused;
1651         list_splice_init(&ns->ns_unused_list, &disp);
1652         while (!list_empty(&disp)) {
1653                 lock = list_entry(disp.next, struct ldlm_lock, l_lru);
1654                 list_move_tail(&lock->l_lru, &ns->ns_unused_list);
1655
1656                 /* For any flags, stop scanning if @max is reached. */
1657                 if (max && added >= max)
1658                         break;
1659
1660                 /* Somebody is already doing CANCEL or there is a
1661                  * blocking request will send cancel. Let's not count
1662                  * this lock. */
1663                 if ((lock->l_flags & LDLM_FL_CANCELING) ||
1664                     (lock->l_flags & LDLM_FL_BL_AST))
1665                         continue;
1666
1667                 LDLM_LOCK_GET(lock);
1668                 spin_unlock(&ns->ns_unused_lock);
1669                 lu_ref_add(&lock->l_reference, __FUNCTION__, cfs_current());
1670
1671                 /* Pass the lock through the policy filter and see if it
1672                  * should stay in lru. */
1673                 if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK)
1674                         loop_stop = 1;
1675
1676                 lu_ref_del(&lock->l_reference, __FUNCTION__, cfs_current());
1677                 LDLM_LOCK_RELEASE(lock);
1678                 spin_lock(&ns->ns_unused_lock);
1679                 if (loop_stop)
1680                         break;
1681
1682                 added++;
1683                 unused--;
1684         }
1685         list_splice(&disp, ns->ns_unused_list.prev);
1686         spin_unlock(&ns->ns_unused_lock);
1687         RETURN(added);
1688 }
1689
1690 /* when called with LDLM_ASYNC the blocking callback will be handled
1691  * in a thread and this function will return after the thread has been
1692  * asked to call the callback.  when called with LDLM_SYNC the blocking
1693  * callback will be performed in this function. */
1694 int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync,
1695                     int flags)
1696 {
1697         CFS_LIST_HEAD(cancels);
1698         int count, rc;
1699         ENTRY;
1700
1701 #ifndef __KERNEL__
1702         sync = LDLM_SYNC; /* force to be sync in user space */
1703 #endif
1704         count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0, flags);
1705         if (sync == LDLM_ASYNC) {
1706                 rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
1707                 if (rc == 0)
1708                         RETURN(count);
1709         }
1710
1711         /* If an error occured in ASYNC mode, or
1712          * this is SYNC mode, cancel the list. */
1713         ldlm_cli_cancel_list(&cancels, count, NULL, 0);
1714         RETURN(count);
1715 }
1716
1717 /* Find and cancel locally unused locks found on resource, matched to the
1718  * given policy, mode. GET the found locks and add them into the @cancels
1719  * list. */
1720 int ldlm_cancel_resource_local(struct ldlm_resource *res,
1721                                struct list_head *cancels,
1722                                ldlm_policy_data_t *policy,
1723                                ldlm_mode_t mode, int lock_flags,
1724                                int cancel_flags, void *opaque)
1725 {
1726         struct ldlm_lock *lock;
1727         int count = 0;
1728         ENTRY;
1729
1730         lock_res(res);
1731         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
1732                 if (opaque != NULL && lock->l_ast_data != opaque) {
1733                         LDLM_ERROR(lock, "data %p doesn't match opaque %p",
1734                                    lock->l_ast_data, opaque);
1735                         //LBUG();
1736                         continue;
1737                 }
1738
1739                 if (lock->l_readers || lock->l_writers) {
1740                         if (cancel_flags & LDLM_FL_WARN) {
1741                                 LDLM_ERROR(lock, "lock in use");
1742                                 //LBUG();
1743                         }
1744                         continue;
1745                 }
1746
1747                 /* If somebody is already doing CANCEL, or blocking ast came,
1748                  * skip this lock. */
1749                 if (lock->l_flags & LDLM_FL_BL_AST ||
1750                     lock->l_flags & LDLM_FL_CANCELING)
1751                         continue;
1752
1753                 if (lockmode_compat(lock->l_granted_mode, mode))
1754                         continue;
1755
1756                 /* If policy is given and this is IBITS lock, add to list only
1757                  * those locks that match by policy. */
1758                 if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
1759                     !(lock->l_policy_data.l_inodebits.bits &
1760                       policy->l_inodebits.bits))
1761                         continue;
1762
1763                 /* See CBPENDING comment in ldlm_cancel_lru */
1764                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
1765                                  lock_flags;
1766
1767                 LASSERT(list_empty(&lock->l_bl_ast));
1768                 list_add(&lock->l_bl_ast, cancels);
1769                 LDLM_LOCK_GET(lock);
1770                 count++;
1771         }
1772         unlock_res(res);
1773
1774         RETURN(ldlm_cancel_list(cancels, count, cancel_flags));
1775 }
1776
1777 /* If @req is NULL, send CANCEL request to server with handles of locks
1778  * in the @cancels. If EARLY_CANCEL is not supported, send CANCEL requests
1779  * separately per lock.
1780  * If @req is not NULL, put handles of locks in @cancels into the request
1781  * buffer at the offset @off.
1782  * Destroy @cancels at the end. */
1783 int ldlm_cli_cancel_list(struct list_head *cancels, int count,
1784                          struct ptlrpc_request *req, int flags)
1785 {
1786         struct ldlm_lock *lock;
1787         int res = 0;
1788         ENTRY;
1789
1790         if (list_empty(cancels) || count == 0)
1791                 RETURN(0);
1792
1793         /* XXX: requests (both batched and not) could be sent in parallel.
1794          * Usually it is enough to have just 1 RPC, but it is possible that
1795          * there are to many locks to be cancelled in LRU or on a resource.
1796          * It would also speed up the case when the server does not support
1797          * the feature. */
1798         while (count > 0) {
1799                 LASSERT(!list_empty(cancels));
1800                 lock = list_entry(cancels->next, struct ldlm_lock, l_bl_ast);
1801                 LASSERT(lock->l_conn_export);
1802
1803                 if (exp_connect_cancelset(lock->l_conn_export)) {
1804                         res = count;
1805                         if (req)
1806                                 ldlm_cancel_pack(req, cancels, count);
1807                         else
1808                                 res = ldlm_cli_cancel_req(lock->l_conn_export,
1809                                                           cancels, count,
1810                                                           flags);
1811                 } else {
1812                         res = ldlm_cli_cancel_req(lock->l_conn_export,
1813                                                   cancels, 1, flags);
1814                 }
1815
1816                 if (res < 0) {
1817                         CERROR("ldlm_cli_cancel_list: %d\n", res);
1818                         res = count;
1819                 }
1820
1821                 count -= res;
1822                 ldlm_lock_list_put(cancels, l_bl_ast, res);
1823         }
1824         LASSERT(count == 0);
1825         RETURN(0);
1826 }
1827
1828 int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
1829                                     const struct ldlm_res_id *res_id,
1830                                     ldlm_policy_data_t *policy,
1831                                     ldlm_mode_t mode, int flags, void *opaque)
1832 {
1833         struct ldlm_resource *res;
1834         CFS_LIST_HEAD(cancels);
1835         int count;
1836         int rc;
1837         ENTRY;
1838
1839         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1840         if (res == NULL) {
1841                 /* This is not a problem. */
1842                 CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]);
1843                 RETURN(0);
1844         }
1845
1846         LDLM_RESOURCE_ADDREF(res);
1847         count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
1848                                            0, flags, opaque);
1849         rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
1850         if (rc != ELDLM_OK)
1851                 CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
1852
1853         LDLM_RESOURCE_DELREF(res);
1854         ldlm_resource_putref(res);
1855         RETURN(0);
1856 }
1857
1858 static inline int have_no_nsresource(struct ldlm_namespace *ns)
1859 {
1860         int no_resource = 0;
1861
1862         spin_lock(&ns->ns_hash_lock);
1863         if (ns->ns_resources == 0)
1864                 no_resource = 1;
1865         spin_unlock(&ns->ns_hash_lock);
1866
1867         RETURN(no_resource);
1868 }
1869
1870 /* Cancel all locks on a namespace (or a specific resource, if given)
1871  * that have 0 readers/writers.
1872  *
1873  * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
1874  * to notify the server. */
1875 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
1876                            const struct ldlm_res_id *res_id,
1877                            int flags, void *opaque)
1878 {
1879         int i;
1880         ENTRY;
1881
1882         if (ns == NULL)
1883                 RETURN(ELDLM_OK);
1884
1885         if (res_id)
1886                 RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, NULL,
1887                                                        LCK_MINMODE, flags,
1888                                                        opaque));
1889
1890         spin_lock(&ns->ns_hash_lock);
1891         for (i = 0; i < RES_HASH_SIZE; i++) {
1892                 struct list_head *tmp;
1893                 tmp = ns->ns_hash[i].next;
1894                 while (tmp != &(ns->ns_hash[i])) {
1895                         struct ldlm_resource *res;
1896                         int rc;
1897
1898                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
1899                         ldlm_resource_getref(res);
1900                         spin_unlock(&ns->ns_hash_lock);
1901
1902                         LDLM_RESOURCE_ADDREF(res);
1903                         rc = ldlm_cli_cancel_unused_resource(ns, &res->lr_name,
1904                                                              NULL, LCK_MINMODE,
1905                                                              flags, opaque);
1906
1907                         if (rc)
1908                                 CERROR("ldlm_cli_cancel_unused ("LPU64"): %d\n",
1909                                        res->lr_name.name[0], rc);
1910
1911                         LDLM_RESOURCE_DELREF(res);
1912                         spin_lock(&ns->ns_hash_lock);
1913                         tmp = tmp->next;
1914                         ldlm_resource_putref_locked(res);
1915                 }
1916         }
1917         spin_unlock(&ns->ns_hash_lock);
1918
1919         RETURN(ELDLM_OK);
1920 }
1921
1922 /* Lock iterators. */
1923
1924 int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
1925                           void *closure)
1926 {
1927         struct list_head *tmp, *next;
1928         struct ldlm_lock *lock;
1929         int rc = LDLM_ITER_CONTINUE;
1930
1931         ENTRY;
1932
1933         if (!res)
1934                 RETURN(LDLM_ITER_CONTINUE);
1935
1936         lock_res(res);
1937         list_for_each_safe(tmp, next, &res->lr_granted) {
1938                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1939
1940                 if (iter(lock, closure) == LDLM_ITER_STOP)
1941                         GOTO(out, rc = LDLM_ITER_STOP);
1942         }
1943
1944         list_for_each_safe(tmp, next, &res->lr_converting) {
1945                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1946
1947                 if (iter(lock, closure) == LDLM_ITER_STOP)
1948                         GOTO(out, rc = LDLM_ITER_STOP);
1949         }
1950
1951         list_for_each_safe(tmp, next, &res->lr_waiting) {
1952                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1953
1954                 if (iter(lock, closure) == LDLM_ITER_STOP)
1955                         GOTO(out, rc = LDLM_ITER_STOP);
1956         }
1957  out:
1958         unlock_res(res);
1959         RETURN(rc);
1960 }
1961
1962 struct iter_helper_data {
1963         ldlm_iterator_t iter;
1964         void *closure;
1965 };
1966
1967 static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
1968 {
1969         struct iter_helper_data *helper = closure;
1970         return helper->iter(lock, helper->closure);
1971 }
1972
1973 static int ldlm_res_iter_helper(struct ldlm_resource *res, void *closure)
1974 {
1975         return ldlm_resource_foreach(res, ldlm_iter_helper, closure);
1976 }
1977
1978 int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
1979                            void *closure)
1980 {
1981         struct iter_helper_data helper = { iter: iter, closure: closure };
1982         return ldlm_namespace_foreach_res(ns, ldlm_res_iter_helper, &helper);
1983 }
1984
1985 int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
1986                                ldlm_res_iterator_t iter, void *closure)
1987 {
1988         int i, rc = LDLM_ITER_CONTINUE;
1989         struct ldlm_resource *res;
1990         struct list_head *tmp;
1991
1992         ENTRY;
1993         spin_lock(&ns->ns_hash_lock);
1994         for (i = 0; i < RES_HASH_SIZE; i++) {
1995                 tmp = ns->ns_hash[i].next;
1996                 while (tmp != &(ns->ns_hash[i])) {
1997                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
1998                         ldlm_resource_getref(res);
1999                         spin_unlock(&ns->ns_hash_lock);
2000                         LDLM_RESOURCE_ADDREF(res);
2001
2002                         rc = iter(res, closure);
2003
2004                         LDLM_RESOURCE_DELREF(res);
2005                         spin_lock(&ns->ns_hash_lock);
2006                         tmp = tmp->next;
2007                         ldlm_resource_putref_locked(res);
2008                         if (rc == LDLM_ITER_STOP)
2009                                 GOTO(out, rc);
2010                 }
2011         }
2012  out:
2013         spin_unlock(&ns->ns_hash_lock);
2014         RETURN(rc);
2015 }
2016
2017 /* non-blocking function to manipulate a lock whose cb_data is being put away.*/
2018 void ldlm_resource_iterate(struct ldlm_namespace *ns,
2019                            const struct ldlm_res_id *res_id,
2020                            ldlm_iterator_t iter, void *data)
2021 {
2022         struct ldlm_resource *res;
2023         ENTRY;
2024
2025         if (ns == NULL) {
2026                 CERROR("must pass in namespace\n");
2027                 LBUG();
2028         }
2029
2030         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
2031         if (res == NULL) {
2032                 EXIT;
2033                 return;
2034         }
2035
2036         LDLM_RESOURCE_ADDREF(res);
2037         ldlm_resource_foreach(res, iter, data);
2038         LDLM_RESOURCE_DELREF(res);
2039         ldlm_resource_putref(res);
2040         EXIT;
2041 }
2042
2043 /* Lock replay */
2044
2045 static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
2046 {
2047         struct list_head *list = closure;
2048
2049         /* we use l_pending_chain here, because it's unused on clients. */
2050         LASSERTF(list_empty(&lock->l_pending_chain),"lock %p next %p prev %p\n",
2051                  lock, &lock->l_pending_chain.next,&lock->l_pending_chain.prev);
2052         /* bug 9573: don't replay locks left after eviction */
2053         if (!(lock->l_flags & LDLM_FL_FAILED))
2054                 list_add(&lock->l_pending_chain, list);
2055         return LDLM_ITER_CONTINUE;
2056 }
2057
2058 static int replay_lock_interpret(const struct lu_env *env,
2059                                  struct ptlrpc_request *req,
2060                                  struct ldlm_async_args *aa, int rc)
2061 {
2062         struct lustre_handle  old_hash_key;
2063         struct ldlm_lock     *lock;
2064         struct ldlm_reply    *reply;
2065         struct obd_export    *exp;
2066
2067         ENTRY;
2068         atomic_dec(&req->rq_import->imp_replay_inflight);
2069         if (rc != ELDLM_OK)
2070                 GOTO(out, rc);
2071
2072
2073         reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2074         if (reply == NULL)
2075                 GOTO(out, rc = -EPROTO);
2076
2077         lock = ldlm_handle2lock(&aa->lock_handle);
2078         if (!lock) {
2079                 CERROR("received replay ack for unknown local cookie "LPX64
2080                        " remote cookie "LPX64 " from server %s id %s\n",
2081                        aa->lock_handle.cookie, reply->lock_handle.cookie,
2082                        req->rq_export->exp_client_uuid.uuid,
2083                        libcfs_id2str(req->rq_peer));
2084                 GOTO(out, rc = -ESTALE);
2085         }
2086
2087         old_hash_key = lock->l_remote_handle;
2088         lock->l_remote_handle = reply->lock_handle;
2089
2090         /* Key change rehash lock in per-export hash with new key */
2091         exp = req->rq_export;
2092         if (exp && exp->exp_lock_hash)
2093                 lustre_hash_rehash_key(exp->exp_lock_hash, &old_hash_key,
2094                                        &lock->l_remote_handle,
2095                                        &lock->l_exp_hash);
2096
2097         LDLM_DEBUG(lock, "replayed lock:");
2098         ptlrpc_import_recovery_state_machine(req->rq_import);
2099         LDLM_LOCK_PUT(lock);
2100 out:
2101         if (rc != ELDLM_OK)
2102                 ptlrpc_connect_import(req->rq_import, NULL);
2103
2104         RETURN(rc);
2105 }
2106
2107 static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
2108 {
2109         struct ptlrpc_request *req;
2110         struct ldlm_async_args *aa;
2111         struct ldlm_request   *body;
2112         int flags;
2113         ENTRY;
2114
2115
2116         /* Bug 11974: Do not replay a lock which is actively being canceled */
2117         if (lock->l_flags & LDLM_FL_CANCELING) {
2118                 LDLM_DEBUG(lock, "Not replaying canceled lock:");
2119                 RETURN(0);
2120         }
2121
2122         /* If this is reply-less callback lock, we cannot replay it, since
2123          * server might have long dropped it, but notification of that event was
2124          * lost by network. (and server granted conflicting lock already) */
2125         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
2126                 LDLM_DEBUG(lock, "Not replaying reply-less lock:");
2127                 ldlm_lock_cancel(lock);
2128                 RETURN(0);
2129         }
2130         /*
2131          * If granted mode matches the requested mode, this lock is granted.
2132          *
2133          * If they differ, but we have a granted mode, then we were granted
2134          * one mode and now want another: ergo, converting.
2135          *
2136          * If we haven't been granted anything and are on a resource list,
2137          * then we're blocked/waiting.
2138          *
2139          * If we haven't been granted anything and we're NOT on a resource list,
2140          * then we haven't got a reply yet and don't have a known disposition.
2141          * This happens whenever a lock enqueue is the request that triggers
2142          * recovery.
2143          */
2144         if (lock->l_granted_mode == lock->l_req_mode)
2145                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
2146         else if (lock->l_granted_mode)
2147                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
2148         else if (!list_empty(&lock->l_res_link))
2149                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
2150         else
2151                 flags = LDLM_FL_REPLAY;
2152
2153         req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
2154                                         LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2155         if (req == NULL)
2156                 RETURN(-ENOMEM);
2157
2158         /* We're part of recovery, so don't wait for it. */
2159         req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
2160
2161         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2162         ldlm_lock2desc(lock, &body->lock_desc);
2163         body->lock_flags = flags;
2164
2165         ldlm_lock2handle(lock, &body->lock_handle[0]);
2166         if (lock->l_lvb_len != 0) {
2167                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
2168                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2169                                      lock->l_lvb_len);
2170         }
2171         ptlrpc_request_set_replen(req);
2172         /* notify the server we've replayed all requests.
2173          * also, we mark the request to be put on a dedicated
2174          * queue to be processed after all request replayes.
2175          * bug 6063 */
2176         lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
2177
2178         LDLM_DEBUG(lock, "replaying lock:");
2179
2180         atomic_inc(&req->rq_import->imp_replay_inflight);
2181         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2182         aa = ptlrpc_req_async_args(req);
2183         aa->lock_handle = body->lock_handle[0];
2184         req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret;
2185         ptlrpcd_add_req(req);
2186
2187         RETURN(0);
2188 }
2189
2190 int ldlm_replay_locks(struct obd_import *imp)
2191 {
2192         struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
2193         CFS_LIST_HEAD(list);
2194         struct ldlm_lock *lock, *next;
2195         int rc = 0;
2196
2197         ENTRY;
2198
2199         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
2200
2201         /* ensure this doesn't fall to 0 before all have been queued */
2202         atomic_inc(&imp->imp_replay_inflight);
2203
2204         (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
2205
2206         list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
2207                 list_del_init(&lock->l_pending_chain);
2208                 if (rc)
2209                         continue; /* or try to do the rest? */
2210                 rc = replay_one_lock(imp, lock);
2211         }
2212
2213         atomic_dec(&imp->imp_replay_inflight);
2214
2215         RETURN(rc);
2216 }