Whamcloud - gitweb
ddedaeb60c8947cb169c65f2f72d3c87008442ae
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LDLM
26 #ifndef __KERNEL__
27 #include <signal.h>
28 #include <liblustre.h>
29 #endif
30
31 #include <lustre_dlm.h>
32 #include <obd_class.h>
33 #include <obd.h>
34
35 #include "ldlm_internal.h"
36
37 int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
38 CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
39                 "lock enqueue timeout minimum");
40
41 static void interrupted_completion_wait(void *data)
42 {
43 }
44
45 struct lock_wait_data {
46         struct ldlm_lock *lwd_lock;
47         __u32             lwd_conn_cnt;
48 };
49
50 struct ldlm_async_args {
51         struct lustre_handle lock_handle;
52 };
53
54 int ldlm_expired_completion_wait(void *data)
55 {
56         struct lock_wait_data *lwd = data;
57         struct ldlm_lock *lock = lwd->lwd_lock;
58         struct obd_import *imp;
59         struct obd_device *obd;
60
61         ENTRY;
62         if (lock->l_conn_export == NULL) {
63                 static cfs_time_t next_dump = 0, last_dump = 0;
64
65                 LDLM_ERROR(lock, "lock timed out (enqueued at %lu, %lus ago); "
66                            "not entering recovery in server code, just going "
67                            "back to sleep", lock->l_enqueued_time.tv_sec,
68                            cfs_time_current_sec() -
69                            lock->l_enqueued_time.tv_sec);
70                 if (cfs_time_after(cfs_time_current(), next_dump)) {
71                         last_dump = next_dump;
72                         next_dump = cfs_time_shift(300);
73                         ldlm_namespace_dump(D_DLMTRACE,
74                                             lock->l_resource->lr_namespace);
75                         if (last_dump == 0)
76                                 libcfs_debug_dumplog();
77                 }
78                 RETURN(0);
79         }
80
81         obd = lock->l_conn_export->exp_obd;
82         imp = obd->u.cli.cl_import;
83         ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
84         LDLM_ERROR(lock, "lock timed out (enqueued at %lu, %lus ago), entering "
85                    "recovery for %s@%s", lock->l_enqueued_time.tv_sec,
86                    CURRENT_SECONDS - lock->l_enqueued_time.tv_sec,
87                    obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
88
89         RETURN(0);
90 }
91
92 /* We use the same basis for both server side and client side functions
93    from a single node. */
94 int ldlm_get_enq_timeout(struct ldlm_lock *lock)
95 {
96         int timeout = at_get(&lock->l_resource->lr_namespace->ns_at_estimate);
97         if (AT_OFF)
98                 return obd_timeout / 2;
99         /* Since these are non-updating timeouts, we should be conservative.
100            It would be nice to have some kind of "early reply" mechanism for
101            lock callbacks too... */
102         timeout = timeout + (timeout >> 1); /* 150% */
103         return max(timeout, ldlm_enqueue_min);
104 }
105
106 static int is_granted_or_cancelled(struct ldlm_lock *lock)
107 {
108         int ret = 0;
109
110         lock_res_and_lock(lock);
111         if (((lock->l_req_mode == lock->l_granted_mode) &&
112              !(lock->l_flags & LDLM_FL_CP_REQD)) ||
113             (lock->l_flags & LDLM_FL_FAILED))
114                 ret = 1;
115         unlock_res_and_lock(lock);
116
117         return ret;
118 }
119
120 int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
121 {
122         /* XXX ALLOCATE - 160 bytes */
123         struct lock_wait_data lwd;
124         struct obd_device *obd;
125         struct obd_import *imp = NULL;
126         struct l_wait_info lwi;
127         __u32 timeout;
128         int rc = 0;
129         ENTRY;
130
131         if (flags == LDLM_FL_WAIT_NOREPROC) {
132                 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
133                 goto noreproc;
134         }
135
136         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
137                        LDLM_FL_BLOCK_CONV))) {
138                 cfs_waitq_signal(&lock->l_waitq);
139                 RETURN(0);
140         }
141
142         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
143                    "sleeping");
144         ldlm_lock_dump(D_OTHER, lock, 0);
145         ldlm_reprocess_all(lock->l_resource);
146
147 noreproc:
148
149         obd = class_exp2obd(lock->l_conn_export);
150
151         /* if this is a local lock, then there is no import */
152         if (obd != NULL) {
153                 imp = obd->u.cli.cl_import;
154         }
155
156         /* Wait a long time for enqueue - server may have to callback a
157            lock from another client.  Server will evict the other client if it
158            doesn't respond reasonably, and then give us the lock. */
159         timeout = ldlm_get_enq_timeout(lock) * 2;
160
161         lwd.lwd_lock = lock;
162
163         if (lock->l_flags & LDLM_FL_NO_TIMEOUT) {
164                 LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
165                 lwi = LWI_INTR(interrupted_completion_wait, &lwd);
166         } else {
167                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
168                                        ldlm_expired_completion_wait,
169                                        interrupted_completion_wait, &lwd);
170         }
171
172         if (imp != NULL) {
173                 spin_lock(&imp->imp_lock);
174                 lwd.lwd_conn_cnt = imp->imp_conn_cnt;
175                 spin_unlock(&imp->imp_lock);
176         }
177
178         /* Go to sleep until the lock is granted or cancelled. */
179         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
180
181         if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
182                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
183                 RETURN(-EIO);
184         }
185
186         if (rc) {
187                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
188                            rc);
189                 RETURN(rc);
190         }
191
192         LDLM_DEBUG(lock, "client-side enqueue waking up: granted after %lds",
193                    cfs_time_current_sec() - lock->l_enqueued_time.tv_sec);
194
195         /* Update our time estimate */
196         at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
197                cfs_time_current_sec() - lock->l_enqueued_time.tv_sec);
198
199         RETURN(0);
200 }
201
202 /*
203  * ->l_blocking_ast() callback for LDLM locks acquired by server-side OBDs.
204  */
205 int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
206                       void *data, int flag)
207 {
208         int do_ast;
209         ENTRY;
210
211         if (flag == LDLM_CB_CANCELING) {
212                 /* Don't need to do anything here. */
213                 RETURN(0);
214         }
215
216         lock_res_and_lock(lock);
217         /* Get this: if ldlm_blocking_ast is racing with intent_policy, such
218          * that ldlm_blocking_ast is called just before intent_policy method
219          * takes the ns_lock, then by the time we get the lock, we might not
220          * be the correct blocking function anymore.  So check, and return
221          * early, if so. */
222         if (lock->l_blocking_ast != ldlm_blocking_ast) {
223                 unlock_res_and_lock(lock);
224                 RETURN(0);
225         }
226
227         lock->l_flags |= LDLM_FL_CBPENDING;
228         do_ast = (!lock->l_readers && !lock->l_writers);
229         unlock_res_and_lock(lock);
230
231         if (do_ast) {
232                 struct lustre_handle lockh;
233                 int rc;
234
235                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
236                 ldlm_lock2handle(lock, &lockh);
237                 rc = ldlm_cli_cancel(&lockh);
238                 if (rc < 0)
239                         CERROR("ldlm_cli_cancel: %d\n", rc);
240         } else {
241                 LDLM_DEBUG(lock, "Lock still has references, will be "
242                            "cancelled later");
243         }
244         RETURN(0);
245 }
246
247 /*
248  * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See
249  * comment in filter_intent_policy() on why you may need this.
250  */
251 int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
252 {
253         /*
254          * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for
255          * that is rather subtle: with OST-side locking, it may so happen that
256          * _all_ extent locks are held by the OST. If client wants to obtain
257          * current file size it calls ll{,u}_glimpse_size(), and (as locks are
258          * on the server), dummy glimpse callback fires and does
259          * nothing. Client still receives correct file size due to the
260          * following fragment in filter_intent_policy():
261          *
262          * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB
263          * if (rc != 0 && res->lr_namespace->ns_lvbo &&
264          *     res->lr_namespace->ns_lvbo->lvbo_update) {
265          *         res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
266          * }
267          *
268          * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and
269          * returns correct file size to the client.
270          */
271         return -ELDLM_NO_LOCK_DATA;
272 }
273
274 int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
275                            struct ldlm_res_id *res_id,
276                            ldlm_type_t type, ldlm_policy_data_t *policy,
277                            ldlm_mode_t mode, int *flags,
278                            ldlm_blocking_callback blocking,
279                            ldlm_completion_callback completion,
280                            ldlm_glimpse_callback glimpse,
281                            void *data, __u32 lvb_len, void *lvb_swabber,
282                            struct lustre_handle *lockh)
283 {
284         struct ldlm_lock *lock;
285         int err;
286         ENTRY;
287
288         LASSERT(!(*flags & LDLM_FL_REPLAY));
289         if (ns_is_client(ns)) {
290                 CERROR("Trying to enqueue local lock in a shadow namespace\n");
291                 LBUG();
292         }
293
294         lock = ldlm_lock_create(ns, *res_id, type, mode, blocking,
295                                 completion, glimpse, data, lvb_len);
296         if (!lock)
297                 GOTO(out_nolock, err = -ENOMEM);
298         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
299
300         ldlm_lock_addref_internal(lock, mode);
301         ldlm_lock2handle(lock, lockh);
302         lock_res_and_lock(lock);
303         lock->l_flags |= LDLM_FL_LOCAL;
304         if (*flags & LDLM_FL_ATOMIC_CB)
305                 lock->l_flags |= LDLM_FL_ATOMIC_CB;
306         lock->l_lvb_swabber = lvb_swabber;
307         unlock_res_and_lock(lock);
308         if (policy != NULL)
309                 lock->l_policy_data = *policy;
310         if (type == LDLM_EXTENT)
311                 lock->l_req_extent = policy->l_extent;
312
313         err = ldlm_lock_enqueue(ns, &lock, policy, flags);
314         if (err != ELDLM_OK)
315                 GOTO(out, err);
316
317         if (policy != NULL)
318                 *policy = lock->l_policy_data;
319         if ((*flags) & LDLM_FL_LOCK_CHANGED)
320                 *res_id = lock->l_resource->lr_name;
321
322         LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
323                           lock);
324
325         if (lock->l_completion_ast)
326                 lock->l_completion_ast(lock, *flags, NULL);
327
328         LDLM_DEBUG(lock, "client-side local enqueue END");
329         EXIT;
330  out:
331         LDLM_LOCK_PUT(lock);
332  out_nolock:
333         return err;
334 }
335
336 static void failed_lock_cleanup(struct ldlm_namespace *ns,
337                                 struct ldlm_lock *lock,
338                                 struct lustre_handle *lockh, int mode)
339 {
340         /* Set a flag to prevent us from sending a CANCEL (bug 407) */
341         lock_res_and_lock(lock);
342         lock->l_flags |= LDLM_FL_LOCAL_ONLY;
343         unlock_res_and_lock(lock);
344         LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
345
346         ldlm_lock_decref_and_cancel(lockh, mode);
347
348         /* XXX - HACK because we shouldn't call ldlm_lock_destroy()
349          *       from llite/file.c/ll_file_flock(). */
350         if (lock->l_resource->lr_type == LDLM_FLOCK) {
351                 ldlm_lock_destroy(lock);
352         }
353 }
354
355 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
356                           ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
357                           int *flags, void *lvb, __u32 lvb_len,
358                           void *lvb_swabber, struct lustre_handle *lockh,int rc)
359 {
360         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
361         int is_replay = *flags & LDLM_FL_REPLAY;
362         struct ldlm_lock *lock;
363         struct ldlm_reply *reply;
364         int cleanup_phase = 1;
365         ENTRY;
366
367         lock = ldlm_handle2lock(lockh);
368         /* ldlm_cli_enqueue is holding a reference on this lock. */
369         if (!lock) {
370                 LASSERT(type == LDLM_FLOCK);
371                 RETURN(-ENOLCK);
372         }
373
374         if (rc != ELDLM_OK) {
375                 LASSERT(!is_replay);
376                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
377                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
378                 if (rc == ELDLM_LOCK_ABORTED) {
379                         /* Before we return, swab the reply */
380                         reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF,
381                                                    sizeof(*reply),
382                                                    lustre_swab_ldlm_reply);
383                         if (reply == NULL) {
384                                 CERROR("Can't unpack ldlm_reply\n");
385                                 rc = -EPROTO;
386                         }
387                         if (lvb_len) {
388                                 void *tmplvb;
389                                 tmplvb = lustre_swab_repbuf(req,
390                                                             DLM_REPLY_REC_OFF,
391                                                             lvb_len,
392                                                             lvb_swabber);
393                                 if (tmplvb == NULL)
394                                         GOTO(cleanup, rc = -EPROTO);
395                                 if (lvb != NULL)
396                                         memcpy(lvb, tmplvb, lvb_len);
397                         }
398                 }
399                 GOTO(cleanup, rc);
400         }
401
402         reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
403                                    lustre_swab_ldlm_reply);
404         if (reply == NULL) {
405                 CERROR("Can't unpack ldlm_reply\n");
406                 GOTO(cleanup, rc = -EPROTO);
407         }
408
409         /* lock enqueued on the server */
410         cleanup_phase = 0;
411
412         lock_res_and_lock(lock);
413         lock->l_remote_handle = reply->lock_handle;
414         *flags = reply->lock_flags;
415         lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS;
416         /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match()
417          * to wait with no timeout as well */
418         lock->l_flags |= reply->lock_flags & LDLM_FL_NO_TIMEOUT;
419         unlock_res_and_lock(lock);
420
421         CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n",
422                lock, reply->lock_handle.cookie, *flags);
423
424         /* If enqueue returned a blocked lock but the completion handler has
425          * already run, then it fixed up the resource and we don't need to do it
426          * again. */
427         if ((*flags) & LDLM_FL_LOCK_CHANGED) {
428                 int newmode = reply->lock_desc.l_req_mode;
429                 LASSERT(!is_replay);
430                 if (newmode && newmode != lock->l_req_mode) {
431                         LDLM_DEBUG(lock, "server returned different mode %s",
432                                    ldlm_lockname[newmode]);
433                         lock->l_req_mode = newmode;
434                 }
435
436                 if (reply->lock_desc.l_resource.lr_name.name[0] !=
437                     lock->l_resource->lr_name.name[0]) {
438                         CDEBUG(D_INFO, "remote intent success, locking %ld "
439                                "instead of %ld\n",
440                               (long)reply->lock_desc.l_resource.lr_name.name[0],
441                                (long)lock->l_resource->lr_name.name[0]);
442
443                         rc = ldlm_lock_change_resource(ns, lock,
444                                            reply->lock_desc.l_resource.lr_name);
445                         if (rc || lock->l_resource == NULL)
446                                 GOTO(cleanup, rc = -ENOMEM);
447                         LDLM_DEBUG(lock, "client-side enqueue, new resource");
448                 }
449                 if (with_policy)
450                         if (!(type == LDLM_IBITS && !(exp->exp_connect_flags &
451                                                     OBD_CONNECT_IBITS)))
452                                 lock->l_policy_data =
453                                                  reply->lock_desc.l_policy_data;
454                 if (type != LDLM_PLAIN)
455                         LDLM_DEBUG(lock,"client-side enqueue, new policy data");
456         }
457
458         if ((*flags) & LDLM_FL_AST_SENT ||
459             /* Cancel extent locks as soon as possible on a liblustre client,
460              * because it cannot handle asynchronous ASTs robustly (see
461              * bug 7311). */
462             (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
463                 lock_res_and_lock(lock);
464                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
465                 unlock_res_and_lock(lock);
466                 LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
467         }
468
469         /* If the lock has already been granted by a completion AST, don't
470          * clobber the LVB with an older one. */
471         if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {
472                 void *tmplvb;
473                 tmplvb = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, lvb_len,
474                                             lvb_swabber);
475                 if (tmplvb == NULL)
476                         GOTO(cleanup, rc = -EPROTO);
477                 memcpy(lock->l_lvb_data, tmplvb, lvb_len);
478         }
479
480         if (!is_replay) {
481                 rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
482                 if (lock->l_completion_ast != NULL) {
483                         int err = lock->l_completion_ast(lock, *flags, NULL);
484                         if (!rc)
485                                 rc = err;
486                         if (rc && type != LDLM_FLOCK) /* bug 9425, bug 10250 */
487                                 cleanup_phase = 1;
488                 }
489         }
490
491         if (lvb_len && lvb != NULL) {
492                 /* Copy the LVB here, and not earlier, because the completion
493                  * AST (if any) can override what we got in the reply */
494                 memcpy(lvb, lock->l_lvb_data, lvb_len);
495         }
496
497         LDLM_DEBUG(lock, "client-side enqueue END");
498         EXIT;
499 cleanup:
500         if (cleanup_phase == 1 && rc)
501                 failed_lock_cleanup(ns, lock, lockh, mode);
502         /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
503         LDLM_LOCK_PUT(lock);
504         LDLM_LOCK_PUT(lock);
505         return rc;
506 }
507
508 /* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
509  * a single page on the send/receive side. XXX: 512 should be changed
510  * to more adequate value. */
511 static inline int ldlm_req_handles_avail(struct obd_export *exp,
512                                          int *size, int bufcount, int off)
513 {
514         int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
515
516         avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
517                                  bufcount, size);
518         avail /= sizeof(struct lustre_handle);
519         avail += LDLM_LOCKREQ_HANDLES - off;
520
521         return avail;
522 }
523
524 static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
525 {
526         int size[2] = { sizeof(struct ptlrpc_body),
527                         sizeof(struct ldlm_request) };
528         return ldlm_req_handles_avail(exp, size, 2, 0);
529 }
530
531 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
532  * @count locks in @cancels. */
533 struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
534                                          int opc, int bufcount, int *size,
535                                          int bufoff, int canceloff,
536                                          struct list_head *cancels, int count)
537 {
538         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
539         int flags, avail, to_free, pack = 0;
540         struct ldlm_request *dlm = NULL;
541         struct ptlrpc_request *req;
542         CFS_LIST_HEAD(head);
543         ENTRY;
544
545         if (cancels == NULL)
546                 cancels = &head;
547         if (exp_connect_cancelset(exp)) {
548                 /* Estimate the amount of free space in the request. */
549                 LASSERT(bufoff < bufcount);
550
551                 avail = ldlm_req_handles_avail(exp, size, bufcount, canceloff);
552                 flags = ns_connect_lru_resize(ns) ? 
553                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
554                 to_free = !ns_connect_lru_resize(ns) &&
555                           opc == LDLM_ENQUEUE ? 1 : 0;
556
557                 /* Cancel lru locks here _only_ if the server supports 
558                  * EARLY_CANCEL. Otherwise we have to send extra CANCEL
559                  * rpc, what will make us slower. */
560                 if (avail > count)
561                         count += ldlm_cancel_lru_local(ns, cancels, to_free,
562                                                        avail - count, 0, flags);
563                 if (avail > count)
564                         pack = count;
565                 else
566                         pack = avail;
567                 size[bufoff] = ldlm_request_bufsize(pack, opc);
568         }
569         req = ptlrpc_prep_req(class_exp2cliimp(exp), version,
570                               opc, bufcount, size, NULL);
571         if (exp_connect_cancelset(exp) && req) {
572                 if (canceloff) {
573                         dlm = lustre_msg_buf(req->rq_reqmsg, bufoff,
574                                              sizeof(*dlm));
575                         /* Skip first lock handler in ldlm_request_pack(),
576                          * this method will incrment @lock_count according
577                          * to the lock handle amount actually written to
578                          * the buffer. */
579                         dlm->lock_count = canceloff;
580                 }
581                 /* Pack into the request @pack lock handles. */
582                 ldlm_cli_cancel_list(cancels, pack, req, bufoff);
583                 /* Prepare and send separate cancel rpc for others. */
584                 ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
585         } else {
586                 ldlm_lock_list_put(cancels, l_bl_ast, count);
587         }
588         RETURN(req);
589 }
590
591 struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
592                                              int bufcount, int *size,
593                                              struct list_head *cancels,
594                                              int count)
595 {
596         return ldlm_prep_elc_req(exp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
597                                  bufcount, size, DLM_LOCKREQ_OFF,
598                                  LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
599 }
600
601 /* If a request has some specific initialisation it is passed in @reqp,
602  * otherwise it is created in ldlm_cli_enqueue.
603  *
604  * Supports sync and async requests, pass @async flag accordingly. If a
605  * request was created in ldlm_cli_enqueue and it is the async request,
606  * pass it to the caller in @reqp. */
607 int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
608                      struct ldlm_enqueue_info *einfo, struct ldlm_res_id res_id,
609                      ldlm_policy_data_t *policy, int *flags,
610                      void *lvb, __u32 lvb_len, void *lvb_swabber,
611                      struct lustre_handle *lockh, int async)
612 {
613         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
614         struct ldlm_lock *lock;
615         struct ldlm_request *body;
616         struct ldlm_reply *reply;
617         int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
618                         [DLM_LOCKREQ_OFF]     = sizeof(*body),
619                         [DLM_REPLY_REC_OFF]   = lvb_len };
620         int is_replay = *flags & LDLM_FL_REPLAY;
621         int req_passed_in = 1, rc, err;
622         struct ptlrpc_request *req;
623         ENTRY;
624
625         LASSERT(exp != NULL);
626
627         /* If we're replaying this lock, just check some invariants.
628          * If we're creating a new lock, get everything all setup nice. */
629         if (is_replay) {
630                 lock = ldlm_handle2lock(lockh);
631                 LASSERT(lock != NULL);
632                 LDLM_DEBUG(lock, "client-side enqueue START");
633                 LASSERT(exp == lock->l_conn_export);
634         } else {
635                 lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
636                                         einfo->ei_mode, einfo->ei_cb_bl,
637                                         einfo->ei_cb_cp, einfo->ei_cb_gl,
638                                         einfo->ei_cbdata, lvb_len);
639                 if (lock == NULL)
640                         RETURN(-ENOMEM);
641                 /* for the local lock, add the reference */
642                 ldlm_lock_addref_internal(lock, einfo->ei_mode);
643                 ldlm_lock2handle(lock, lockh);
644                 lock->l_lvb_swabber = lvb_swabber;
645                 if (policy != NULL) {
646                         /* INODEBITS_INTEROP: If the server does not support
647                          * inodebits, we will request a plain lock in the
648                          * descriptor (ldlm_lock2desc() below) but use an
649                          * inodebits lock internally with both bits set.
650                          */
651                         if (einfo->ei_type == LDLM_IBITS &&
652                             !(exp->exp_connect_flags & OBD_CONNECT_IBITS))
653                                 lock->l_policy_data.l_inodebits.bits =
654                                         MDS_INODELOCK_LOOKUP |
655                                         MDS_INODELOCK_UPDATE;
656                         else
657                                 lock->l_policy_data = *policy;
658                 }
659
660                 if (einfo->ei_type == LDLM_EXTENT)
661                         lock->l_req_extent = policy->l_extent;
662                 LDLM_DEBUG(lock, "client-side enqueue START");
663         }
664
665         /* lock not sent to server yet */
666
667         if (reqp == NULL || *reqp == NULL) {
668                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
669                 if (req == NULL) {
670                         failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
671                         LDLM_LOCK_PUT(lock);
672                         RETURN(-ENOMEM);
673                 }
674                 req_passed_in = 0;
675                 if (reqp)
676                         *reqp = req;
677         } else {
678                 req = *reqp;
679                 LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >=
680                          sizeof(*body), "buflen[%d] = %d, not %d\n",
681                          DLM_LOCKREQ_OFF,
682                          lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF),
683                          (int)sizeof(*body));
684         }
685
686         lock->l_conn_export = exp;
687         lock->l_export = NULL;
688         lock->l_blocking_ast = einfo->ei_cb_bl;
689
690         /* Dump lock data into the request buffer */
691         body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
692         ldlm_lock2desc(lock, &body->lock_desc);
693         body->lock_flags = *flags;
694         body->lock_handle[0] = *lockh;
695
696         /* Continue as normal. */
697         if (!req_passed_in) {
698                 size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
699                 ptlrpc_req_set_repsize(req, 2 + (lvb_len > 0), size);
700         }
701
702         /*
703          * Liblustre client doesn't get extent locks, except for O_APPEND case
704          * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
705          * [i_size, OBD_OBJECT_EOF] lock is taken.
706          */
707         LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
708                      policy->l_extent.end == OBD_OBJECT_EOF));
709
710         if (async) {
711                 LASSERT(reqp != NULL);
712                 RETURN(0);
713         }
714
715         LDLM_DEBUG(lock, "sending request");
716         rc = ptlrpc_queue_wait(req);
717         err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
718                                     einfo->ei_mode, flags, lvb, lvb_len,
719                                     lvb_swabber, lockh, rc);
720
721         /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
722          * one reference that we took */
723         if (err == -ENOLCK)
724                 LDLM_LOCK_PUT(lock);
725         else
726                 rc = err;
727
728         if (!req_passed_in && req != NULL) {
729                 ptlrpc_req_finished(req);
730                 if (reqp)
731                         *reqp = NULL;
732         }
733
734         RETURN(rc);
735 }
736
737 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
738                                   __u32 *flags)
739 {
740         struct ldlm_resource *res;
741         int rc;
742         ENTRY;
743         if (ns_is_client(lock->l_resource->lr_namespace)) {
744                 CERROR("Trying to cancel local lock\n");
745                 LBUG();
746         }
747         LDLM_DEBUG(lock, "client-side local convert");
748
749         res = ldlm_lock_convert(lock, new_mode, flags);
750         if (res) {
751                 ldlm_reprocess_all(res);
752                 rc = 0;
753         } else {
754                 rc = EDEADLOCK;
755         }
756         LDLM_DEBUG(lock, "client-side local convert handler END");
757         LDLM_LOCK_PUT(lock);
758         RETURN(rc);
759 }
760
761 /* FIXME: one of ldlm_cli_convert or the server side should reject attempted
762  * conversion of locks which are on the waiting or converting queue */
763 /* Caller of this code is supposed to take care of lock readers/writers
764    accounting */
765 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
766 {
767         struct ldlm_request *body;
768         struct ldlm_reply *reply;
769         struct ldlm_lock *lock;
770         struct ldlm_resource *res;
771         struct ptlrpc_request *req;
772         int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
773                         [DLM_LOCKREQ_OFF]     = sizeof(*body) };
774         int rc;
775         ENTRY;
776
777         lock = ldlm_handle2lock(lockh);
778         if (!lock) {
779                 LBUG();
780                 RETURN(-EINVAL);
781         }
782         *flags = 0;
783
784         if (lock->l_conn_export == NULL)
785                 RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
786
787         LDLM_DEBUG(lock, "client-side convert");
788
789         req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export),
790                               LUSTRE_DLM_VERSION, LDLM_CONVERT, 2, size, NULL);
791         if (!req)
792                 GOTO(out, rc = -ENOMEM);
793
794         body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
795         body->lock_handle[0] = lock->l_remote_handle;
796
797         body->lock_desc.l_req_mode = new_mode;
798         body->lock_flags = *flags;
799
800         size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
801         ptlrpc_req_set_repsize(req, 2, size);
802
803         rc = ptlrpc_queue_wait(req);
804         if (rc != ELDLM_OK)
805                 GOTO(out, rc);
806
807         reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
808                                    lustre_swab_ldlm_reply);
809         if (reply == NULL) {
810                 CERROR ("Can't unpack ldlm_reply\n");
811                 GOTO (out, rc = -EPROTO);
812         }
813
814         if (req->rq_status)
815                 GOTO(out, rc = req->rq_status);
816
817         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
818         if (res != NULL) {
819                 ldlm_reprocess_all(res);
820                 /* Go to sleep until the lock is granted. */
821                 /* FIXME: or cancelled. */
822                 if (lock->l_completion_ast) {
823                         rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
824                                                     NULL);
825                         if (rc)
826                                 GOTO(out, rc);
827                 }
828         } else {
829                 rc = EDEADLOCK;
830         }
831         EXIT;
832  out:
833         LDLM_LOCK_PUT(lock);
834         ptlrpc_req_finished(req);
835         return rc;
836 }
837
838 /* Cancel locks locally.
839  * Returns:
840  * LDLM_FL_LOCAL_ONLY if tere is no need in a CANCEL rpc to the server;
841  * LDLM_FL_CANCELING otherwise;
842  * LDLM_FL_BL_AST if there is a need in a separate CANCEL rpc. */
843 static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
844 {
845         int rc = LDLM_FL_LOCAL_ONLY;
846         ENTRY;
847
848         if (lock->l_conn_export) {
849                 int local_only;
850
851                 LDLM_DEBUG(lock, "client-side cancel");
852                 /* Set this flag to prevent others from getting new references*/
853                 lock_res_and_lock(lock);
854                 lock->l_flags |= LDLM_FL_CBPENDING;
855                 local_only = (lock->l_flags &
856                               (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
857                 ldlm_cancel_callback(lock);
858                 rc = (lock->l_flags & LDLM_FL_BL_AST) ?
859                         LDLM_FL_BL_AST : LDLM_FL_CANCELING;
860                 unlock_res_and_lock(lock);
861
862                 if (local_only) {
863                         CDEBUG(D_DLMTRACE, "not sending request (at caller's "
864                                "instruction)\n");
865                         rc = LDLM_FL_LOCAL_ONLY;
866                 }
867                 ldlm_lock_cancel(lock);
868         } else {
869                 if (ns_is_client(lock->l_resource->lr_namespace)) {
870                         LDLM_ERROR(lock, "Trying to cancel local lock");
871                         LBUG();
872                 }
873                 LDLM_DEBUG(lock, "server-side local cancel");
874                 ldlm_lock_cancel(lock);
875                 ldlm_reprocess_all(lock->l_resource);
876                 LDLM_DEBUG(lock, "server-side local cancel handler END");
877         }
878
879         RETURN(rc);
880 }
881
882 /* Pack @count locks in @head into ldlm_request buffer at the offset @off,
883    of the request @req. */
884 static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
885                              struct list_head *head, int count)
886 {
887         struct ldlm_request *dlm;
888         struct ldlm_lock *lock;
889         int max, packed = 0;
890         ENTRY;
891
892         dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm));
893         LASSERT(dlm != NULL);
894
895         /* Check the room in the request buffer. */
896         max = lustre_msg_buflen(req->rq_reqmsg, off) -
897                 sizeof(struct ldlm_request);
898         max /= sizeof(struct lustre_handle);
899         max += LDLM_LOCKREQ_HANDLES;
900         LASSERT(max >= dlm->lock_count + count);
901
902         /* XXX: it would be better to pack lock handles grouped by resource.
903          * so that the server cancel would call filter_lvbo_update() less
904          * frequently. */
905         list_for_each_entry(lock, head, l_bl_ast) {
906                 if (!count--)
907                         break;
908                 LASSERT(lock->l_conn_export);
909                 /* Pack the lock handle to the given request buffer. */
910                 LDLM_DEBUG(lock, "packing");
911                 dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
912                 packed++;
913         }
914         CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
915         EXIT;
916 }
917
918 /* Prepare and send a batched cancel rpc, it will include count lock handles
919  * of locks given in @head. */
920 int ldlm_cli_cancel_req(struct obd_export *exp,
921                         struct list_head *cancels, int count)
922 {
923         struct ptlrpc_request *req = NULL;
924         struct ldlm_request *body;
925         int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
926                         [DLM_LOCKREQ_OFF]     = sizeof(*body) };
927         struct obd_import *imp;
928         int free, sent = 0;
929         int rc = 0;
930         ENTRY;
931
932         LASSERT(exp != NULL);
933         LASSERT(count > 0);
934
935         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, obd_fail_val);
936
937         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
938                 RETURN(count);
939
940         free = ldlm_req_handles_avail(exp, size, 2, 0);
941         if (count > free)
942                 count = free;
943
944         size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL);
945         while (1) {
946                 imp = class_exp2cliimp(exp);
947                 if (imp == NULL || imp->imp_invalid) {
948                         CDEBUG(D_DLMTRACE,
949                                "skipping cancel on invalid import %p\n", imp);
950                         RETURN(count);
951                 }
952
953                 req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2,
954                                       size, NULL);
955                 if (!req)
956                         GOTO(out, rc = -ENOMEM);
957
958                 req->rq_no_resend = 1;
959                 req->rq_no_delay = 1;
960
961                 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
962                 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
963                 ptlrpc_at_set_req_timeout(req);
964
965                 body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
966                                       sizeof(*body));
967                 ldlm_cancel_pack(req, DLM_LOCKREQ_OFF, cancels, count);
968
969                 ptlrpc_req_set_repsize(req, 1, NULL);
970                 rc = ptlrpc_queue_wait(req);
971
972                 if (rc == ESTALE) {
973                         CDEBUG(D_DLMTRACE, "client/server (nid %s) "
974                                "out of sync -- not fatal\n",
975                                libcfs_nid2str(req->rq_import->
976                                               imp_connection->c_peer.nid));
977                         rc = 0;
978                 } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
979                            req->rq_import_generation == imp->imp_generation) {
980                         ptlrpc_req_finished(req);
981                         continue;
982                 } else if (rc != ELDLM_OK) {
983                         CERROR("Got rc %d from cancel RPC: canceling "
984                                "anyway\n", rc);
985                         break;
986                 }
987                 sent = count;
988                 break;
989         }
990
991         ptlrpc_req_finished(req);
992         EXIT;
993 out:
994         return sent ? sent : rc;
995 }
996
997 static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
998 {
999         LASSERT(imp != NULL);
1000         return &imp->imp_obd->obd_namespace->ns_pool;
1001 }
1002
1003 int ldlm_cli_update_pool(struct ptlrpc_request *req)
1004 {
1005         struct obd_device *obd;
1006         __u64 old_slv, new_slv;
1007         __u32 new_limit;
1008         ENTRY;
1009     
1010         if (unlikely(!req->rq_import || !req->rq_import->imp_obd || 
1011                      !imp_connect_lru_resize(req->rq_import)))
1012         {
1013                 /* 
1014                  * Do nothing for corner cases. 
1015                  */
1016                 RETURN(0);
1017         }
1018
1019         /* 
1020          * In some cases RPC may contain slv and limit zeroed out. This is 
1021          * the case when server does not support lru resize feature. This is
1022          * also possible in some recovery cases when server side reqs have no
1023          * ref to obd export and thus access to server side namespace is no 
1024          * possible. 
1025          */
1026         if (lustre_msg_get_slv(req->rq_repmsg) == 0 || 
1027             lustre_msg_get_limit(req->rq_repmsg) == 0) {
1028                 DEBUG_REQ(D_HA, req, "Zero SLV or Limit found "
1029                           "(SLV: "LPU64", Limit: %u)", 
1030                           lustre_msg_get_slv(req->rq_repmsg), 
1031                           lustre_msg_get_limit(req->rq_repmsg));
1032                 RETURN(0);
1033         }
1034
1035         new_limit = lustre_msg_get_limit(req->rq_repmsg);
1036         new_slv = lustre_msg_get_slv(req->rq_repmsg);
1037         obd = req->rq_import->imp_obd;
1038
1039         /* 
1040          * Set new SLV and Limit to obd fields to make accessible for pool 
1041          * thread. We do not access obd_namespace and pool directly here
1042          * as there is no reliable way to make sure that they are still
1043          * alive in cleanup time. Evil races are possible which may cause
1044          * oops in that time. 
1045          */
1046         write_lock(&obd->obd_pool_lock);
1047         old_slv = obd->obd_pool_slv;
1048         obd->obd_pool_slv = new_slv;
1049         obd->obd_pool_limit = new_limit;
1050         write_unlock(&obd->obd_pool_lock);
1051
1052         /* 
1053          * Check if we need to wakeup pools thread for fast SLV change. 
1054          * This is only done when threads period is noticably long like 
1055          * 10s or more. 
1056          */
1057 #if defined(__KERNEL__) && (LDLM_POOLS_THREAD_PERIOD >= 10)
1058         if (old_slv > 0) {
1059                 __u64 fast_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE;
1060                 do_div(fast_change, 100);
1061
1062                 /* 
1063                  * Wake up pools thread only if SLV has changed more than 
1064                  * 50% since last update. In this case we want to react asap. 
1065                  * Otherwise it is no sense to wake up pools as they are 
1066                  * re-calculated every LDLM_POOLS_THREAD_PERIOD anyways. 
1067                  */
1068                 if (old_slv > new_slv && old_slv - new_slv > fast_change)
1069                         ldlm_pools_wakeup();
1070         }
1071 #endif
1072         RETURN(0);
1073 }
1074 EXPORT_SYMBOL(ldlm_cli_update_pool);
1075
1076 int ldlm_cli_cancel(struct lustre_handle *lockh)
1077 {
1078         struct ldlm_namespace *ns;
1079         int avail, flags, count = 1, rc = 0;
1080         struct ldlm_lock *lock;
1081         CFS_LIST_HEAD(cancels);
1082         ENTRY;
1083
1084         /* concurrent cancels on the same handle can happen */
1085         lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING);
1086         if (lock == NULL) {
1087                 LDLM_DEBUG_NOLOCK("lock is already being destroyed\n");
1088                 RETURN(0);
1089         }
1090         
1091         rc = ldlm_cli_cancel_local(lock);
1092         if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) {
1093                 LDLM_LOCK_PUT(lock);
1094                 RETURN(rc < 0 ? rc : 0);
1095         }
1096         /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
1097          * rpc which goes to canceld portal, so we can cancel other lru locks
1098          * here and send them all as one LDLM_CANCEL rpc. */
1099         LASSERT(list_empty(&lock->l_bl_ast));
1100         list_add(&lock->l_bl_ast, &cancels);
1101
1102         if (exp_connect_cancelset(lock->l_conn_export)) {
1103                 avail = ldlm_cancel_handles_avail(lock->l_conn_export);
1104                 LASSERT(avail > 0);
1105
1106                 ns = lock->l_resource->lr_namespace;
1107                 flags = ns_connect_lru_resize(ns) ?
1108                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
1109                 count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
1110                                                LDLM_FL_BL_AST, flags);
1111         }
1112         ldlm_cli_cancel_list(&cancels, count, NULL, 0);
1113         RETURN(0);
1114 }
1115
1116 /* XXX until we will have compound requests and can cut cancels from generic rpc
1117  * we need send cancels with LDLM_FL_BL_AST flag as separate rpc */
1118 static int ldlm_cancel_list(struct list_head *cancels, int count, int flags)
1119 {
1120         CFS_LIST_HEAD(head);
1121         struct ldlm_lock *lock, *next;
1122         int left = 0, bl_ast = 0, rc;
1123
1124         left = count;
1125         list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
1126                 if (left-- == 0)
1127                         break;
1128
1129                 if (flags & LDLM_FL_LOCAL_ONLY) {
1130                         rc = LDLM_FL_LOCAL_ONLY;
1131                         ldlm_lock_cancel(lock);
1132                 } else {
1133                         rc = ldlm_cli_cancel_local(lock);
1134                 }
1135                 if (!(flags & LDLM_FL_BL_AST) && (rc == LDLM_FL_BL_AST)) {
1136                         LDLM_DEBUG(lock, "Cancel lock separately");
1137                         list_del_init(&lock->l_bl_ast);
1138                         list_add(&lock->l_bl_ast, &head);
1139                         bl_ast ++;
1140                         continue;
1141                 }
1142                 if (rc == LDLM_FL_LOCAL_ONLY) {
1143                         /* CANCEL RPC should not be sent to server. */
1144                         list_del_init(&lock->l_bl_ast);
1145                         LDLM_LOCK_PUT(lock);
1146                         count--;
1147                 }
1148
1149         }
1150         if (bl_ast > 0) {
1151                 count -= bl_ast;
1152                 ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
1153         }
1154
1155         RETURN(count);
1156 }
1157
1158 /* Return 1 to stop lru processing and keep current lock cached. Return zero 
1159  * otherwise. */
1160 static ldlm_policy_res_t ldlm_cancel_shrink_policy(struct ldlm_namespace *ns,
1161                                                    struct ldlm_lock *lock,
1162                                                    int unused, int added, 
1163                                                    int count)
1164 {
1165         int lock_cost;
1166         __u64 page_nr;
1167
1168         /* Stop lru processing when we reached passed @count or checked all 
1169          * locks in lru. */
1170         if (count && added >= count)
1171                 return LDLM_POLICY_KEEP_LOCK;
1172
1173         if (lock->l_resource->lr_type == LDLM_EXTENT) {
1174                 struct ldlm_extent *l_extent;
1175
1176                 /* For all extent locks cost is 1 + number of pages in
1177                  * their extent. */
1178                 l_extent = &lock->l_policy_data.l_extent;
1179                 page_nr = (l_extent->end - l_extent->start);
1180                 do_div(page_nr, CFS_PAGE_SIZE);
1181
1182 #ifdef __KERNEL__
1183                 /* XXX: In fact this is evil hack, we can't access inode
1184                  * here. For doing it right we need somehow to have number
1185                  * of covered by lock. This should be fixed later when 10718 
1186                  * is landed. */
1187                 if (lock->l_ast_data != NULL) {
1188                         struct inode *inode = lock->l_ast_data;
1189                         if (page_nr > inode->i_mapping->nrpages)
1190                                 page_nr = inode->i_mapping->nrpages;
1191                 }
1192 #endif
1193                 lock_cost = 1 + page_nr;
1194         } else {
1195                 /* For all locks which are not extent ones cost is 1 */
1196                 lock_cost = 1;
1197         }
1198
1199         /* Keep all expensive locks in lru for the memory pressure time
1200          * cancel policy. They anyways may be canceled by lru resize
1201          * pplicy if they have not small enough CLV. */
1202         return lock_cost > ns->ns_shrink_thumb ? 
1203                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1204 }
1205
1206 /* Return 1 to stop lru processing and keep current lock cached. Return zero 
1207  * otherwise. */
1208 static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
1209                                                  struct ldlm_lock *lock, 
1210                                                  int unused, int added, 
1211                                                  int count)
1212 {
1213         cfs_time_t cur = cfs_time_current();
1214         struct ldlm_pool *pl = &ns->ns_pool;
1215         __u64 slv, lvf, lv;
1216         cfs_time_t la;
1217
1218         /* Stop lru processing when we reached passed @count or checked all 
1219          * locks in lru. */
1220         if (count && added >= count)
1221                 return LDLM_POLICY_KEEP_LOCK;
1222
1223         slv = ldlm_pool_get_slv(pl);
1224         lvf = ldlm_pool_get_lvf(pl);
1225
1226         la = cfs_duration_sec(cfs_time_sub(cur, 
1227                               lock->l_last_used));
1228
1229         /* Stop when slv is not yet come from server or 
1230          * lv is smaller than it is. */
1231         lv = lvf * la * unused;
1232
1233         /* Inform pool about current CLV to see it via proc. */
1234         ldlm_pool_set_clv(pl, lv);
1235         return (slv == 1 || lv < slv) ? 
1236                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1237 }
1238
1239 /* Return 1 to stop lru processing and keep current lock cached. Return zero 
1240  * otherwise. */
1241 static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
1242                                                    struct ldlm_lock *lock, 
1243                                                    int unused, int added,
1244                                                    int count)
1245 {
1246         /* Stop lru processing when we reached passed @count or checked all 
1247          * locks in lru. */
1248         return (added >= count) ? 
1249                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1250 }
1251
1252 /* Return 1 to stop lru processing and keep current lock cached. Return zero 
1253  * otherwise. */
1254 static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
1255                                                  struct ldlm_lock *lock, 
1256                                                  int unused, int added,
1257                                                  int count)
1258 {
1259         /* Stop lru processing if young lock is found and we reached passed 
1260          * @count. */
1261         return ((added >= count) && 
1262                 cfs_time_before(cfs_time_current(),
1263                                 cfs_time_add(lock->l_last_used,
1264                                              ns->ns_max_age))) ? 
1265                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1266 }
1267
1268 /* Return 1 to stop lru processing and keep current lock cached. Return zero 
1269  * otherwise. */
1270 static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns,
1271                                                     struct ldlm_lock *lock, 
1272                                                     int unused, int added,
1273                                                     int count)
1274 {
1275         /* Stop lru processing when we reached passed @count or checked all 
1276          * locks in lru. */
1277         return (added >= count) ? 
1278                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1279 }
1280
1281 typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *, 
1282                                                       struct ldlm_lock *, int, 
1283                                                       int, int);
1284
1285 static ldlm_cancel_lru_policy_t
1286 ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
1287 {
1288         if (ns_connect_lru_resize(ns)) {
1289                 if (flags & LDLM_CANCEL_SHRINK)
1290                         return ldlm_cancel_shrink_policy;
1291                 else if (flags & LDLM_CANCEL_LRUR)
1292                         return ldlm_cancel_lrur_policy;
1293                 else if (flags & LDLM_CANCEL_PASSED)
1294                         return ldlm_cancel_passed_policy;
1295         } else {
1296                 if (flags & LDLM_CANCEL_AGED)
1297                         return ldlm_cancel_aged_policy;
1298         }
1299         
1300         return ldlm_cancel_default_policy;
1301 }
1302  
1303 /* - Free space in lru for @count new locks,
1304  *   redundant unused locks are canceled locally;
1305  * - also cancel locally unused aged locks;
1306  * - do not cancel more than @max locks;
1307  * - GET the found locks and add them into the @cancels list.
1308  *
1309  * A client lock can be added to the l_bl_ast list only when it is
1310  * marked LDLM_FL_CANCELING. Otherwise, somebody is already doing CANCEL.
1311  * There are the following use cases: ldlm_cancel_resource_local(),
1312  * ldlm_cancel_lru_local() and ldlm_cli_cancel(), which check&set this
1313  * flag properly. As any attempt to cancel a lock rely on this flag,
1314  * l_bl_ast list is accessed later without any special locking.
1315  *
1316  * Calling policies for enabled lru resize:
1317  * ----------------------------------------
1318  * flags & LDLM_CANCEL_LRUR - use lru resize policy (SLV from server) to
1319  *                            cancel not more than @count locks;
1320  *
1321  * flags & LDLM_CANCEL_PASSED - cancel @count number of old locks (located at
1322  *                              the beginning of lru list);
1323  *
1324  * flags & LDLM_CANCEL_SHRINK - cancel not more than @count locks according to
1325  *                              memory pressre policy function;
1326  *
1327  * flags & LDLM_CANCEL_AGED -   cancel locks according to "aged policy".
1328  */
1329 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
1330                           int count, int max, int cancel_flags, int flags)
1331 {
1332         ldlm_cancel_lru_policy_t pf;
1333         struct ldlm_lock *lock, *next;
1334         int added = 0, unused;
1335         ENTRY;
1336
1337         spin_lock(&ns->ns_unused_lock);
1338         unused = ns->ns_nr_unused;
1339
1340         if (!ns_connect_lru_resize(ns))
1341                 count += unused - ns->ns_max_unused;
1342
1343         pf = ldlm_cancel_lru_policy(ns, flags);
1344         LASSERT(pf != NULL);
1345         
1346         while (!list_empty(&ns->ns_unused_list)) {
1347                 /* For any flags, stop scanning if @max is reached. */
1348                 if (max && added >= max)
1349                         break;
1350
1351                 list_for_each_entry_safe(lock, next, &ns->ns_unused_list, l_lru){
1352                         /* No locks which got blocking requests. */
1353                         LASSERT(!(lock->l_flags & LDLM_FL_BL_AST));
1354
1355                         /* Somebody is already doing CANCEL. No need in this
1356                          * lock in lru, do not traverse it again. */
1357                         if (!(lock->l_flags & LDLM_FL_CANCELING))
1358                                 break;
1359
1360                         ldlm_lock_remove_from_lru_nolock(lock);
1361                 }
1362                 if (&lock->l_lru == &ns->ns_unused_list)
1363                         break;
1364
1365                 /* Pass the lock through the policy filter and see if it
1366                  * should stay in lru.
1367                  *
1368                  * Even for shrinker policy we stop scanning if
1369                  * we find a lock that should stay in the cache.
1370                  * We should take into account lock age anyway
1371                  * as new lock even if it is small of weight is
1372                  * valuable resource. 
1373                  *
1374                  * That is, for shrinker policy we drop only
1375                  * old locks, but additionally chose them by
1376                  * their weight. Big extent locks will stay in 
1377                  * the cache. */
1378                 if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK)
1379                         break;
1380
1381                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
1382                 spin_unlock(&ns->ns_unused_lock);
1383
1384                 lock_res_and_lock(lock);
1385                 /* Check flags again under the lock. */
1386                 if ((lock->l_flags & LDLM_FL_CANCELING) ||
1387                     (ldlm_lock_remove_from_lru(lock) == 0)) {
1388                         /* other thread is removing lock from lru or
1389                          * somebody is already doing CANCEL or
1390                          * there is a blocking request which will send
1391                          * cancel by itseft or the lock is matched
1392                          * is already not unused. */
1393                         unlock_res_and_lock(lock);
1394                         LDLM_LOCK_PUT(lock);
1395                         spin_lock(&ns->ns_unused_lock);
1396                         continue;
1397                 }
1398                 LASSERT(!lock->l_readers && !lock->l_writers);
1399
1400                 /* If we have chosen to cancel this lock voluntarily, we
1401                  * better send cancel notification to server, so that it
1402                  * frees appropriate state. This might lead to a race 
1403                  * where while we are doing cancel here, server is also 
1404                  * silently cancelling this lock. */
1405                 lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
1406
1407                 /* Setting the CBPENDING flag is a little misleading,
1408                  * but prevents an important race; namely, once
1409                  * CBPENDING is set, the lock can accumulate no more
1410                  * readers/writers. Since readers and writers are
1411                  * already zero here, ldlm_lock_decref() won't see
1412                  * this flag and call l_blocking_ast */
1413                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
1414
1415                 /* We can't re-add to l_lru as it confuses the
1416                  * refcounting in ldlm_lock_remove_from_lru() if an AST
1417                  * arrives after we drop ns_lock below. We use l_bl_ast
1418                  * and can't use l_pending_chain as it is used both on
1419                  * server and client nevertheless bug 5666 says it is
1420                  * used only on server */
1421                 LASSERT(list_empty(&lock->l_bl_ast));
1422                 list_add(&lock->l_bl_ast, cancels);
1423                 unlock_res_and_lock(lock);
1424                 spin_lock(&ns->ns_unused_lock);
1425                 added++;
1426                 unused--;
1427         }
1428         spin_unlock(&ns->ns_unused_lock);
1429         RETURN(ldlm_cancel_list(cancels, added, cancel_flags));
1430 }
1431
1432 /* Returns number of locks which could be canceled next time when 
1433  * ldlm_cancel_lru() is called. Used from locks pool shrinker. */
1434 int ldlm_cancel_lru_estimate(struct ldlm_namespace *ns,
1435                              int count, int max, int flags)
1436 {
1437         ldlm_cancel_lru_policy_t pf;
1438         struct ldlm_lock *lock;
1439         int added = 0, unused;
1440         ENTRY;
1441
1442         pf = ldlm_cancel_lru_policy(ns, flags);
1443         LASSERT(pf != NULL);
1444         spin_lock(&ns->ns_unused_lock);
1445         unused = ns->ns_nr_unused;
1446
1447         list_for_each_entry(lock, &ns->ns_unused_list, l_lru) {
1448                 /* For any flags, stop scanning if @max is reached. */
1449                 if (max && added >= max)
1450                         break;
1451
1452                 /* Somebody is already doing CANCEL or there is a
1453                  * blocking request will send cancel. Let's not count 
1454                  * this lock. */
1455                 if ((lock->l_flags & LDLM_FL_CANCELING) ||
1456                     (lock->l_flags & LDLM_FL_BL_AST)) 
1457                         continue;
1458
1459                 /* Pass the lock through the policy filter and see if it
1460                  * should stay in lru. */
1461                 if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK)
1462                         break;
1463
1464                 added++;
1465                 unused--;
1466         }
1467         spin_unlock(&ns->ns_unused_lock);
1468         RETURN(added);
1469 }
1470
1471 /* when called with LDLM_ASYNC the blocking callback will be handled
1472  * in a thread and this function will return after the thread has been
1473  * asked to call the callback.  when called with LDLM_SYNC the blocking
1474  * callback will be performed in this function. */
1475 int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync, 
1476                     int flags)
1477 {
1478         CFS_LIST_HEAD(cancels);
1479         int count, rc;
1480         ENTRY;
1481
1482 #ifndef __KERNEL__
1483         sync = LDLM_SYNC; /* force to be sync in user space */
1484 #endif
1485         count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0, flags);
1486         if (sync == LDLM_ASYNC) {
1487                 rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
1488                 if (rc == 0)
1489                         RETURN(count);
1490         }
1491
1492         /* If an error occured in ASYNC mode, or
1493          * this is SYNC mode, cancel the list. */
1494         ldlm_cli_cancel_list(&cancels, count, NULL, 0);
1495         RETURN(count);
1496 }
1497
1498 /* Find and cancel locally unused locks found on resource, matched to the
1499  * given policy, mode. GET the found locks and add them into the @cancels
1500  * list. */
1501 int ldlm_cancel_resource_local(struct ldlm_resource *res,
1502                                struct list_head *cancels,
1503                                ldlm_policy_data_t *policy,
1504                                ldlm_mode_t mode, int lock_flags,
1505                                int cancel_flags, void *opaque)
1506 {
1507         struct ldlm_lock *lock;
1508         int count = 0;
1509         ENTRY;
1510
1511         lock_res(res);
1512         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
1513                 if (opaque != NULL && lock->l_ast_data != opaque) {
1514                         LDLM_ERROR(lock, "data %p doesn't match opaque %p",
1515                                    lock->l_ast_data, opaque);
1516                         //LBUG();
1517                         continue;
1518                 }
1519
1520                 if (lock->l_readers || lock->l_writers) {
1521                         if (cancel_flags & LDLM_FL_WARN) {
1522                                 LDLM_ERROR(lock, "lock in use");
1523                                 //LBUG();
1524                         }
1525                         continue;
1526                 }
1527
1528                 /* If somebody is already doing CANCEL, or blocking ast came,
1529                  * skip this lock. */
1530                 if (lock->l_flags & LDLM_FL_BL_AST || 
1531                     lock->l_flags & LDLM_FL_CANCELING)
1532                         continue;
1533
1534                 if (lockmode_compat(lock->l_granted_mode, mode))
1535                         continue;
1536
1537                 /* If policy is given and this is IBITS lock, add to list only
1538                  * those locks that match by policy. */
1539                 if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
1540                     !(lock->l_policy_data.l_inodebits.bits &
1541                       policy->l_inodebits.bits))
1542                         continue;
1543
1544                 /* See CBPENDING comment in ldlm_cancel_lru */
1545                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
1546                         lock_flags;
1547
1548                 LASSERT(list_empty(&lock->l_bl_ast));
1549                 list_add(&lock->l_bl_ast, cancels);
1550                 LDLM_LOCK_GET(lock);
1551                 count++;
1552         }
1553         unlock_res(res);
1554
1555         /* Handle only @count inserted locks. */
1556         RETURN(ldlm_cancel_list(cancels, count, cancel_flags));
1557 }
1558
1559 /* If @req is NULL, send CANCEL request to server with handles of locks
1560  * in the @cancels. If EARLY_CANCEL is not supported, send CANCEL requests
1561  * separately per lock.
1562  * If @req is not NULL, put handles of locks in @cancels into the request
1563  * buffer at the offset @off.
1564  * Destroy @cancels at the end. */
1565 int ldlm_cli_cancel_list(struct list_head *cancels, int count,
1566                          struct ptlrpc_request *req, int off)
1567 {
1568         struct ldlm_lock *lock;
1569         int res = 0;
1570         ENTRY;
1571
1572         if (list_empty(cancels) || count == 0)
1573                 RETURN(0);
1574
1575         while (count) {
1576                 LASSERT(!list_empty(cancels));
1577                 lock = list_entry(cancels->next, struct ldlm_lock, l_bl_ast);
1578                 LASSERT(lock->l_conn_export);
1579
1580                 if (exp_connect_cancelset(lock->l_conn_export)) {
1581                         res = count;
1582                         if (req)
1583                                 ldlm_cancel_pack(req, off, cancels, count);
1584                         else
1585                                 res = ldlm_cli_cancel_req(lock->l_conn_export,
1586                                                           cancels, count);
1587                 } else {
1588                         res = ldlm_cli_cancel_req(lock->l_conn_export,
1589                                                   cancels, 1);
1590                 }
1591
1592                 if (res < 0) {
1593                         CERROR("ldlm_cli_cancel_list: %d\n", res);
1594                         res = count;
1595                 }
1596
1597                 count -= res;
1598                 ldlm_lock_list_put(cancels, l_bl_ast, res);
1599         }
1600         RETURN(0);
1601 }
1602
1603 static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
1604                                            struct ldlm_res_id res_id,
1605                                            int flags, void *opaque)
1606 {
1607         struct ldlm_resource *res;
1608         CFS_LIST_HEAD(cancels);
1609         int count;
1610         int rc;
1611         ENTRY;
1612
1613         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1614         if (res == NULL) {
1615                 /* This is not a problem. */
1616                 CDEBUG(D_INFO, "No resource "LPU64"\n", res_id.name[0]);
1617                 RETURN(0);
1618         }
1619
1620         count = ldlm_cancel_resource_local(res, &cancels, NULL, LCK_MINMODE,
1621                                            0, flags, opaque);
1622         rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0);
1623         if (rc != ELDLM_OK)
1624                 CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
1625
1626         ldlm_resource_putref(res);
1627         RETURN(0);
1628 }
1629
1630 static inline int have_no_nsresource(struct ldlm_namespace *ns)
1631 {
1632         int no_resource = 0;
1633
1634         spin_lock(&ns->ns_hash_lock);
1635         if (ns->ns_resources == 0)
1636                 no_resource = 1;
1637         spin_unlock(&ns->ns_hash_lock);
1638
1639         RETURN(no_resource);
1640 }
1641
1642 /* Cancel all locks on a namespace (or a specific resource, if given)
1643  * that have 0 readers/writers.
1644  *
1645  * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
1646  * to notify the server. */
1647 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
1648                            struct ldlm_res_id *res_id, int flags, void *opaque)
1649 {
1650         int i;
1651         ENTRY;
1652
1653         if (ns == NULL)
1654                 RETURN(ELDLM_OK);
1655
1656         if (res_id)
1657                 RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags,
1658                                                        opaque));
1659
1660         spin_lock(&ns->ns_hash_lock);
1661         for (i = 0; i < RES_HASH_SIZE; i++) {
1662                 struct list_head *tmp;
1663                 tmp = ns->ns_hash[i].next;
1664                 while (tmp != &(ns->ns_hash[i])) {
1665                         struct ldlm_resource *res;
1666                         int rc;
1667
1668                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
1669                         ldlm_resource_getref(res);
1670                         spin_unlock(&ns->ns_hash_lock);
1671
1672                         rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
1673                                                              flags, opaque);
1674
1675                         if (rc)
1676                                 CERROR("ldlm_cli_cancel_unused ("LPU64"): %d\n",
1677                                        res->lr_name.name[0], rc);
1678
1679                         spin_lock(&ns->ns_hash_lock);
1680                         tmp = tmp->next;
1681                         ldlm_resource_putref_locked(res);
1682                 }
1683         }
1684         spin_unlock(&ns->ns_hash_lock);
1685
1686         RETURN(ELDLM_OK);
1687 }
1688
1689 /* join/split resource locks to/from lru list */
1690 int ldlm_cli_join_lru(struct ldlm_namespace *ns,
1691                       struct ldlm_res_id *res_id, int join)
1692 {
1693         struct ldlm_resource *res;
1694         struct ldlm_lock *lock, *n;
1695         int count = 0;
1696         ENTRY;
1697
1698         LASSERT(ns_is_client(ns));
1699
1700         res = ldlm_resource_get(ns, NULL, *res_id, LDLM_EXTENT, 0);
1701         if (res == NULL)
1702                 RETURN(count);
1703         LASSERT(res->lr_type == LDLM_EXTENT);
1704
1705         lock_res(res);
1706         if (!join)
1707                 goto split;
1708
1709         list_for_each_entry_safe (lock, n, &res->lr_granted, l_res_link) {
1710                 if (list_empty(&lock->l_lru) &&
1711                     !lock->l_readers && !lock->l_writers &&
1712                     !(lock->l_flags & LDLM_FL_LOCAL) &&
1713                     !(lock->l_flags & LDLM_FL_CBPENDING) &&
1714                     !(lock->l_flags & LDLM_FL_BL_AST)) {
1715                         ldlm_lock_add_to_lru(lock);
1716                         lock->l_flags &= ~LDLM_FL_NO_LRU;
1717                         LDLM_DEBUG(lock, "join lock to lru");
1718                         count++;
1719                 }
1720         }
1721         goto unlock;
1722 split:
1723         spin_lock(&ns->ns_unused_lock);
1724         list_for_each_entry_safe (lock, n, &ns->ns_unused_list, l_lru) {
1725                 if (lock->l_resource == res) {
1726                         ldlm_lock_remove_from_lru_nolock(lock);
1727                         lock->l_flags |= LDLM_FL_NO_LRU;
1728                         LDLM_DEBUG(lock, "split lock from lru");
1729                         count++;
1730                 }
1731         }
1732         spin_unlock(&ns->ns_unused_lock);
1733 unlock:
1734         unlock_res(res);
1735         ldlm_resource_putref(res);
1736         RETURN(count);
1737 }
1738
1739 /* Lock iterators. */
1740
1741 int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
1742                           void *closure)
1743 {
1744         struct list_head *tmp, *next;
1745         struct ldlm_lock *lock;
1746         int rc = LDLM_ITER_CONTINUE;
1747
1748         ENTRY;
1749
1750         if (!res)
1751                 RETURN(LDLM_ITER_CONTINUE);
1752
1753         lock_res(res);
1754         list_for_each_safe(tmp, next, &res->lr_granted) {
1755                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1756
1757                 if (iter(lock, closure) == LDLM_ITER_STOP)
1758                         GOTO(out, rc = LDLM_ITER_STOP);
1759         }
1760
1761         list_for_each_safe(tmp, next, &res->lr_converting) {
1762                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1763
1764                 if (iter(lock, closure) == LDLM_ITER_STOP)
1765                         GOTO(out, rc = LDLM_ITER_STOP);
1766         }
1767
1768         list_for_each_safe(tmp, next, &res->lr_waiting) {
1769                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1770
1771                 if (iter(lock, closure) == LDLM_ITER_STOP)
1772                         GOTO(out, rc = LDLM_ITER_STOP);
1773         }
1774  out:
1775         unlock_res(res);
1776         RETURN(rc);
1777 }
1778
1779 struct iter_helper_data {
1780         ldlm_iterator_t iter;
1781         void *closure;
1782 };
1783
1784 static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
1785 {
1786         struct iter_helper_data *helper = closure;
1787         return helper->iter(lock, helper->closure);
1788 }
1789
1790 static int ldlm_res_iter_helper(struct ldlm_resource *res, void *closure)
1791 {
1792         return ldlm_resource_foreach(res, ldlm_iter_helper, closure);
1793 }
1794
1795 int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
1796                            void *closure)
1797 {
1798         struct iter_helper_data helper = { iter: iter, closure: closure };
1799         return ldlm_namespace_foreach_res(ns, ldlm_res_iter_helper, &helper);
1800 }
1801
1802 int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
1803                                ldlm_res_iterator_t iter, void *closure)
1804 {
1805         int i, rc = LDLM_ITER_CONTINUE;
1806         struct ldlm_resource *res;
1807         struct list_head *tmp;
1808
1809         ENTRY;
1810         spin_lock(&ns->ns_hash_lock);
1811         for (i = 0; i < RES_HASH_SIZE; i++) {
1812                 tmp = ns->ns_hash[i].next;
1813                 while (tmp != &(ns->ns_hash[i])) {
1814                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
1815                         ldlm_resource_getref(res);
1816                         spin_unlock(&ns->ns_hash_lock);
1817
1818                         rc = iter(res, closure);
1819
1820                         spin_lock(&ns->ns_hash_lock);
1821                         tmp = tmp->next;
1822                         ldlm_resource_putref_locked(res);
1823                         if (rc == LDLM_ITER_STOP)
1824                                 GOTO(out, rc);
1825                 }
1826         }
1827  out:
1828         spin_unlock(&ns->ns_hash_lock);
1829         RETURN(rc);
1830 }
1831
1832 /* non-blocking function to manipulate a lock whose cb_data is being put away.*/
1833 void ldlm_resource_iterate(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
1834                            ldlm_iterator_t iter, void *data)
1835 {
1836         struct ldlm_resource *res;
1837         ENTRY;
1838
1839         if (ns == NULL) {
1840                 CERROR("must pass in namespace\n");
1841                 LBUG();
1842         }
1843
1844         res = ldlm_resource_get(ns, NULL, *res_id, 0, 0);
1845         if (res == NULL) {
1846                 EXIT;
1847                 return;
1848         }
1849
1850         ldlm_resource_foreach(res, iter, data);
1851         ldlm_resource_putref(res);
1852         EXIT;
1853 }
1854
1855 /* Lock replay */
1856
1857 static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
1858 {
1859         struct list_head *list = closure;
1860
1861         /* we use l_pending_chain here, because it's unused on clients. */
1862         LASSERTF(list_empty(&lock->l_pending_chain),"lock %p next %p prev %p\n",
1863                  lock, &lock->l_pending_chain.next,&lock->l_pending_chain.prev);
1864         /* bug 9573: don't replay locks left after eviction */
1865         if (!(lock->l_flags & LDLM_FL_FAILED))
1866                 list_add(&lock->l_pending_chain, list);
1867         return LDLM_ITER_CONTINUE;
1868 }
1869
1870 static int replay_lock_interpret(struct ptlrpc_request *req,
1871                                  struct ldlm_async_args *aa, int rc)
1872 {
1873         struct ldlm_lock *lock;
1874         struct ldlm_reply *reply;
1875
1876         ENTRY;
1877         atomic_dec(&req->rq_import->imp_replay_inflight);
1878         if (rc != ELDLM_OK)
1879                 GOTO(out, rc);
1880
1881
1882         reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),
1883                                    lustre_swab_ldlm_reply);
1884         if (reply == NULL) {
1885                 CERROR("Can't unpack ldlm_reply\n");
1886                 GOTO (out, rc = -EPROTO);
1887         }
1888
1889         lock = ldlm_handle2lock(&aa->lock_handle);
1890         if (!lock) {
1891                 CERROR("received replay ack for unknown local cookie "LPX64
1892                        " remote cookie "LPX64 " from server %s id %s\n",
1893                        aa->lock_handle.cookie, reply->lock_handle.cookie,
1894                        req->rq_export->exp_client_uuid.uuid,
1895                        libcfs_id2str(req->rq_peer));
1896                 GOTO(out, rc = -ESTALE);
1897         }
1898
1899         lock->l_remote_handle = reply->lock_handle;
1900         LDLM_DEBUG(lock, "replayed lock:");
1901         ptlrpc_import_recovery_state_machine(req->rq_import);
1902         LDLM_LOCK_PUT(lock);
1903 out:
1904         if (rc != ELDLM_OK)
1905                 ptlrpc_connect_import(req->rq_import, NULL);
1906
1907
1908         RETURN(rc);
1909 }
1910
1911 static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
1912 {
1913         struct ptlrpc_request *req;
1914         struct ldlm_request *body;
1915         struct ldlm_reply *reply;
1916         struct ldlm_async_args *aa;
1917         int buffers = 2;
1918         int size[3] = { sizeof(struct ptlrpc_body) };
1919         int flags;
1920         ENTRY;
1921
1922
1923         /* Bug 11974: Do not replay a lock which is actively being canceled */
1924         if (lock->l_flags & LDLM_FL_CANCELING) {
1925                 LDLM_DEBUG(lock, "Not replaying canceled lock:");
1926                 RETURN(0);
1927         }
1928
1929         /* If this is reply-less callback lock, we cannot replay it, since
1930          * server might have long dropped it, but notification of that event was
1931          * lost by network. (and server granted conflicting lock already) */
1932         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
1933                 LDLM_DEBUG(lock, "Not replaying reply-less lock:");
1934                 ldlm_lock_cancel(lock);
1935                 RETURN(0);
1936         }
1937         /*
1938          * If granted mode matches the requested mode, this lock is granted.
1939          *
1940          * If they differ, but we have a granted mode, then we were granted
1941          * one mode and now want another: ergo, converting.
1942          *
1943          * If we haven't been granted anything and are on a resource list,
1944          * then we're blocked/waiting.
1945          *
1946          * If we haven't been granted anything and we're NOT on a resource list,
1947          * then we haven't got a reply yet and don't have a known disposition.
1948          * This happens whenever a lock enqueue is the request that triggers
1949          * recovery.
1950          */
1951         if (lock->l_granted_mode == lock->l_req_mode)
1952                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
1953         else if (lock->l_granted_mode)
1954                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
1955         else if (!list_empty(&lock->l_res_link))
1956                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
1957         else
1958                 flags = LDLM_FL_REPLAY;
1959
1960         size[DLM_LOCKREQ_OFF] = sizeof(*body);
1961         req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 2, size,
1962                               NULL);
1963         if (!req)
1964                 RETURN(-ENOMEM);
1965
1966         /* We're part of recovery, so don't wait for it. */
1967         req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
1968
1969         body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
1970         ldlm_lock2desc(lock, &body->lock_desc);
1971         body->lock_flags = flags;
1972
1973         ldlm_lock2handle(lock, &body->lock_handle[0]);
1974         size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
1975         if (lock->l_lvb_len != 0) {
1976                 buffers = 3;
1977                 size[DLM_REPLY_REC_OFF] = lock->l_lvb_len;
1978         }
1979         ptlrpc_req_set_repsize(req, buffers, size);
1980
1981         LDLM_DEBUG(lock, "replaying lock:");
1982
1983         atomic_inc(&req->rq_import->imp_replay_inflight);
1984         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1985         aa = (struct ldlm_async_args *)&req->rq_async_args;
1986         aa->lock_handle = body->lock_handle[0];
1987         req->rq_interpret_reply = replay_lock_interpret;
1988         ptlrpcd_add_req(req);
1989
1990         RETURN(0);
1991 }
1992
1993 int ldlm_replay_locks(struct obd_import *imp)
1994 {
1995         struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
1996         struct list_head list;
1997         struct ldlm_lock *lock, *next;
1998         int rc = 0;
1999
2000         ENTRY;
2001         CFS_INIT_LIST_HEAD(&list);
2002
2003         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
2004
2005         /* ensure this doesn't fall to 0 before all have been queued */
2006         atomic_inc(&imp->imp_replay_inflight);
2007
2008         (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
2009
2010         list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
2011                 list_del_init(&lock->l_pending_chain);
2012                 if (rc)
2013                         continue; /* or try to do the rest? */
2014                 rc = replay_one_lock(imp, lock);
2015         }
2016
2017         atomic_dec(&imp->imp_replay_inflight);
2018
2019         RETURN(rc);
2020 }