Whamcloud - gitweb
Cleanup compiler warnings
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LDLM
26 #ifndef __KERNEL__
27 #include <signal.h>
28 #include <liblustre.h>
29 #endif
30
31 #include <lustre_dlm.h>
32 #include <obd_class.h>
33 #include <obd.h>
34
35 #include "ldlm_internal.h"
36
37 int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
38 CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
39                 "lock enqueue timeout minimum");
40
41 static void interrupted_completion_wait(void *data)
42 {
43 }
44
45 struct lock_wait_data {
46         struct ldlm_lock *lwd_lock;
47         __u32             lwd_conn_cnt;
48 };
49
50 struct ldlm_async_args {
51         struct lustre_handle lock_handle;
52 };
53
54 int ldlm_expired_completion_wait(void *data)
55 {
56         struct lock_wait_data *lwd = data;
57         struct ldlm_lock *lock = lwd->lwd_lock;
58         struct obd_import *imp;
59         struct obd_device *obd;
60
61         ENTRY;
62         if (lock->l_conn_export == NULL) {
63                 static cfs_time_t next_dump = 0, last_dump = 0;
64
65                 if (ptlrpc_check_suspend())
66                         RETURN(0);
67
68                 LDLM_ERROR(lock, "lock timed out (enqueued at "CFS_TIME_T", "
69                            CFS_DURATION_T"s ago); not entering recovery in "
70                            "server code, just going back to sleep",
71                            lock->l_enqueued_time.tv_sec,
72                            cfs_time_current_sec() -
73                            lock->l_enqueued_time.tv_sec);
74                 if (cfs_time_after(cfs_time_current(), next_dump)) {
75                         last_dump = next_dump;
76                         next_dump = cfs_time_shift(300);
77                         ldlm_namespace_dump(D_DLMTRACE,
78                                             lock->l_resource->lr_namespace);
79                         if (last_dump == 0)
80                                 libcfs_debug_dumplog();
81                 }
82                 RETURN(0);
83         }
84
85         obd = lock->l_conn_export->exp_obd;
86         imp = obd->u.cli.cl_import;
87         ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);
88         LDLM_ERROR(lock, "lock timed out (enqueued at "CFS_TIME_T", "
89                   CFS_DURATION_T"s ago), entering recovery for %s@%s",
90                   lock->l_enqueued_time.tv_sec,
91                   cfs_time_current_sec() - lock->l_enqueued_time.tv_sec,
92                   obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
93
94         RETURN(0);
95 }
96
97 /* We use the same basis for both server side and client side functions
98    from a single node. */
99 int ldlm_get_enq_timeout(struct ldlm_lock *lock)
100 {
101         int timeout = at_get(&lock->l_resource->lr_namespace->ns_at_estimate);
102         if (AT_OFF)
103                 return obd_timeout / 2;
104         /* Since these are non-updating timeouts, we should be conservative.
105            It would be nice to have some kind of "early reply" mechanism for
106            lock callbacks too... */
107         timeout = timeout + (timeout >> 1); /* 150% */
108         return max(timeout, ldlm_enqueue_min);
109 }
110
111 static int is_granted_or_cancelled(struct ldlm_lock *lock)
112 {
113         int ret = 0;
114
115         lock_res_and_lock(lock);
116         if (((lock->l_req_mode == lock->l_granted_mode) &&
117              !(lock->l_flags & LDLM_FL_CP_REQD)) ||
118             (lock->l_flags & LDLM_FL_FAILED))
119                 ret = 1;
120         unlock_res_and_lock(lock);
121
122         return ret;
123 }
124
125 int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
126 {
127         /* XXX ALLOCATE - 160 bytes */
128         struct lock_wait_data lwd;
129         struct obd_device *obd;
130         struct obd_import *imp = NULL;
131         struct l_wait_info lwi;
132         __u32 timeout;
133         int rc = 0;
134         ENTRY;
135
136         if (flags == LDLM_FL_WAIT_NOREPROC) {
137                 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");
138                 goto noreproc;
139         }
140
141         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
142                        LDLM_FL_BLOCK_CONV))) {
143                 cfs_waitq_signal(&lock->l_waitq);
144                 RETURN(0);
145         }
146
147         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
148                    "sleeping");
149         ldlm_lock_dump(D_OTHER, lock, 0);
150         ldlm_reprocess_all(lock->l_resource);
151
152 noreproc:
153
154         obd = class_exp2obd(lock->l_conn_export);
155
156         /* if this is a local lock, then there is no import */
157         if (obd != NULL) {
158                 imp = obd->u.cli.cl_import;
159         }
160
161         /* Wait a long time for enqueue - server may have to callback a
162            lock from another client.  Server will evict the other client if it
163            doesn't respond reasonably, and then give us the lock. */
164         timeout = ldlm_get_enq_timeout(lock) * 2;
165
166         lwd.lwd_lock = lock;
167
168         if (lock->l_flags & LDLM_FL_NO_TIMEOUT) {
169                 LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
170                 lwi = LWI_INTR(interrupted_completion_wait, &lwd);
171         } else {
172                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
173                                        ldlm_expired_completion_wait,
174                                        interrupted_completion_wait, &lwd);
175         }
176
177         if (imp != NULL) {
178                 spin_lock(&imp->imp_lock);
179                 lwd.lwd_conn_cnt = imp->imp_conn_cnt;
180                 spin_unlock(&imp->imp_lock);
181         }
182
183         /* Go to sleep until the lock is granted or cancelled. */
184         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
185
186         if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
187                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
188                 RETURN(-EIO);
189         }
190
191         if (rc) {
192                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
193                            rc);
194                 RETURN(rc);
195         }
196
197         LDLM_DEBUG(lock, "client-side enqueue waking up: granted after %lds",
198                    cfs_time_current_sec() - lock->l_enqueued_time.tv_sec);
199
200         /* Update our time estimate */
201         at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
202                cfs_time_current_sec() - lock->l_enqueued_time.tv_sec);
203
204         RETURN(0);
205 }
206
207 /*
208  * ->l_blocking_ast() callback for LDLM locks acquired by server-side OBDs.
209  */
210 int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
211                       void *data, int flag)
212 {
213         int do_ast;
214         ENTRY;
215
216         if (flag == LDLM_CB_CANCELING) {
217                 /* Don't need to do anything here. */
218                 RETURN(0);
219         }
220
221         lock_res_and_lock(lock);
222         /* Get this: if ldlm_blocking_ast is racing with intent_policy, such
223          * that ldlm_blocking_ast is called just before intent_policy method
224          * takes the ns_lock, then by the time we get the lock, we might not
225          * be the correct blocking function anymore.  So check, and return
226          * early, if so. */
227         if (lock->l_blocking_ast != ldlm_blocking_ast) {
228                 unlock_res_and_lock(lock);
229                 RETURN(0);
230         }
231
232         lock->l_flags |= LDLM_FL_CBPENDING;
233         do_ast = (!lock->l_readers && !lock->l_writers);
234         unlock_res_and_lock(lock);
235
236         if (do_ast) {
237                 struct lustre_handle lockh;
238                 int rc;
239
240                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
241                 ldlm_lock2handle(lock, &lockh);
242                 rc = ldlm_cli_cancel(&lockh);
243                 if (rc < 0)
244                         CERROR("ldlm_cli_cancel: %d\n", rc);
245         } else {
246                 LDLM_DEBUG(lock, "Lock still has references, will be "
247                            "cancelled later");
248         }
249         RETURN(0);
250 }
251
252 /*
253  * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See
254  * comment in filter_intent_policy() on why you may need this.
255  */
256 int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
257 {
258         /*
259          * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for
260          * that is rather subtle: with OST-side locking, it may so happen that
261          * _all_ extent locks are held by the OST. If client wants to obtain
262          * current file size it calls ll{,u}_glimpse_size(), and (as locks are
263          * on the server), dummy glimpse callback fires and does
264          * nothing. Client still receives correct file size due to the
265          * following fragment in filter_intent_policy():
266          *
267          * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB
268          * if (rc != 0 && res->lr_namespace->ns_lvbo &&
269          *     res->lr_namespace->ns_lvbo->lvbo_update) {
270          *         res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
271          * }
272          *
273          * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and
274          * returns correct file size to the client.
275          */
276         return -ELDLM_NO_LOCK_DATA;
277 }
278
279 int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
280                            const struct ldlm_res_id *res_id,
281                            ldlm_type_t type, ldlm_policy_data_t *policy,
282                            ldlm_mode_t mode, int *flags,
283                            ldlm_blocking_callback blocking,
284                            ldlm_completion_callback completion,
285                            ldlm_glimpse_callback glimpse,
286                            void *data, __u32 lvb_len, void *lvb_swabber,
287                            struct lustre_handle *lockh)
288 {
289         struct ldlm_lock *lock;
290         int err;
291         ENTRY;
292
293         LASSERT(!(*flags & LDLM_FL_REPLAY));
294         if (unlikely(ns_is_client(ns))) {
295                 CERROR("Trying to enqueue local lock in a shadow namespace\n");
296                 LBUG();
297         }
298
299         lock = ldlm_lock_create(ns, res_id, type, mode, blocking,
300                                 completion, glimpse, data, lvb_len);
301         if (unlikely(!lock))
302                 GOTO(out_nolock, err = -ENOMEM);
303         LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
304
305         ldlm_lock_addref_internal(lock, mode);
306         ldlm_lock2handle(lock, lockh);
307         lock_res_and_lock(lock);
308         lock->l_flags |= LDLM_FL_LOCAL;
309         if (*flags & LDLM_FL_ATOMIC_CB)
310                 lock->l_flags |= LDLM_FL_ATOMIC_CB;
311         lock->l_lvb_swabber = lvb_swabber;
312         unlock_res_and_lock(lock);
313         if (policy != NULL)
314                 lock->l_policy_data = *policy;
315         if (type == LDLM_EXTENT)
316                 lock->l_req_extent = policy->l_extent;
317
318         err = ldlm_lock_enqueue(ns, &lock, policy, flags);
319         if (unlikely(err != ELDLM_OK))
320                 GOTO(out, err);
321
322         if (policy != NULL)
323                 *policy = lock->l_policy_data;
324
325         LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
326                           lock);
327
328         if (lock->l_completion_ast)
329                 lock->l_completion_ast(lock, *flags, NULL);
330
331         LDLM_DEBUG(lock, "client-side local enqueue END");
332         EXIT;
333  out:
334         LDLM_LOCK_PUT(lock);
335  out_nolock:
336         return err;
337 }
338
339 static void failed_lock_cleanup(struct ldlm_namespace *ns,
340                                 struct ldlm_lock *lock,
341                                 struct lustre_handle *lockh, int mode)
342 {
343         /* Set a flag to prevent us from sending a CANCEL (bug 407) */
344         lock_res_and_lock(lock);
345         lock->l_flags |= LDLM_FL_LOCAL_ONLY;
346         unlock_res_and_lock(lock);
347         LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
348
349         ldlm_lock_decref_and_cancel(lockh, mode);
350
351         /* XXX - HACK because we shouldn't call ldlm_lock_destroy()
352          *       from llite/file.c/ll_file_flock(). */
353         if (lock->l_resource->lr_type == LDLM_FLOCK) {
354                 ldlm_lock_destroy(lock);
355         }
356 }
357
358 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
359                           ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
360                           int *flags, void *lvb, __u32 lvb_len,
361                           void *lvb_swabber, struct lustre_handle *lockh,int rc)
362 {
363         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
364         int is_replay = *flags & LDLM_FL_REPLAY;
365         struct ldlm_lock *lock;
366         struct ldlm_reply *reply;
367         int cleanup_phase = 1;
368         ENTRY;
369
370         lock = ldlm_handle2lock(lockh);
371         /* ldlm_cli_enqueue is holding a reference on this lock. */
372         if (!lock) {
373                 LASSERT(type == LDLM_FLOCK);
374                 RETURN(-ENOLCK);
375         }
376
377         if (rc != ELDLM_OK) {
378                 LASSERT(!is_replay);
379                 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
380                            rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
381                 if (rc == ELDLM_LOCK_ABORTED) {
382                         /* Before we return, swab the reply */
383                         reply = req_capsule_server_get(&req->rq_pill,
384                                                        &RMF_DLM_REP);
385                         if (reply == NULL)
386                                 rc = -EPROTO;
387                         if (lvb_len) {
388                                 struct ost_lvb *tmplvb;
389
390                                 req_capsule_set_size(&req->rq_pill,
391                                                      &RMF_DLM_LVB, RCL_SERVER,
392                                                      lvb_len);
393                             tmplvb = req_capsule_server_swab_get(&req->rq_pill,
394                                                                  &RMF_DLM_LVB,
395                                                                  lvb_swabber);
396                                 if (tmplvb == NULL)
397                                         GOTO(cleanup, rc = -EPROTO);
398                                 if (lvb != NULL)
399                                         memcpy(lvb, tmplvb, lvb_len);
400                         }
401                 }
402                 GOTO(cleanup, rc);
403         }
404
405         reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
406         if (reply == NULL)
407                 GOTO(cleanup, rc = -EPROTO);
408
409         /* lock enqueued on the server */
410         cleanup_phase = 0;
411
412         lock_res_and_lock(lock);
413         lock->l_remote_handle = reply->lock_handle;
414         *flags = reply->lock_flags;
415         lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS;
416         /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match()
417          * to wait with no timeout as well */
418         lock->l_flags |= reply->lock_flags & LDLM_FL_NO_TIMEOUT;
419         unlock_res_and_lock(lock);
420
421         CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n",
422                lock, reply->lock_handle.cookie, *flags);
423
424         /* If enqueue returned a blocked lock but the completion handler has
425          * already run, then it fixed up the resource and we don't need to do it
426          * again. */
427         if ((*flags) & LDLM_FL_LOCK_CHANGED) {
428                 int newmode = reply->lock_desc.l_req_mode;
429                 LASSERT(!is_replay);
430                 if (newmode && newmode != lock->l_req_mode) {
431                         LDLM_DEBUG(lock, "server returned different mode %s",
432                                    ldlm_lockname[newmode]);
433                         lock->l_req_mode = newmode;
434                 }
435
436                 if (memcmp(reply->lock_desc.l_resource.lr_name.name,
437                           lock->l_resource->lr_name.name,
438                           sizeof(struct ldlm_res_id))) {
439                         CDEBUG(D_INFO, "remote intent success, locking "
440                                         "(%ld,%ld,%ld) instead of "
441                                         "(%ld,%ld,%ld)\n",
442                               (long)reply->lock_desc.l_resource.lr_name.name[0],
443                               (long)reply->lock_desc.l_resource.lr_name.name[1],
444                               (long)reply->lock_desc.l_resource.lr_name.name[2],
445                               (long)lock->l_resource->lr_name.name[0],
446                               (long)lock->l_resource->lr_name.name[1],
447                               (long)lock->l_resource->lr_name.name[2]);
448
449                         rc = ldlm_lock_change_resource(ns, lock,
450                                         &reply->lock_desc.l_resource.lr_name);
451                         if (rc || lock->l_resource == NULL)
452                                 GOTO(cleanup, rc = -ENOMEM);
453                         LDLM_DEBUG(lock, "client-side enqueue, new resource");
454                 }
455                 if (with_policy)
456                         if (!(type == LDLM_IBITS && !(exp->exp_connect_flags &
457                                                     OBD_CONNECT_IBITS)))
458                                 lock->l_policy_data =
459                                                  reply->lock_desc.l_policy_data;
460                 if (type != LDLM_PLAIN)
461                         LDLM_DEBUG(lock,"client-side enqueue, new policy data");
462         }
463
464         if ((*flags) & LDLM_FL_AST_SENT ||
465             /* Cancel extent locks as soon as possible on a liblustre client,
466              * because it cannot handle asynchronous ASTs robustly (see
467              * bug 7311). */
468             (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
469                 lock_res_and_lock(lock);
470                 lock->l_flags |= LDLM_FL_CBPENDING |  LDLM_FL_BL_AST;
471                 unlock_res_and_lock(lock);
472                 LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
473         }
474
475         /* If the lock has already been granted by a completion AST, don't
476          * clobber the LVB with an older one. */
477         if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {
478                 void *tmplvb;
479
480                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
481                                      lvb_len);
482                 tmplvb = req_capsule_server_swab_get(&req->rq_pill,
483                                                      &RMF_DLM_LVB,
484                                                      lvb_swabber);
485                 if (tmplvb == NULL)
486                         GOTO(cleanup, rc = -EPROTO);
487                 memcpy(lock->l_lvb_data, tmplvb, lvb_len);
488         }
489
490         if (!is_replay) {
491                 rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
492                 if (lock->l_completion_ast != NULL) {
493                         int err = lock->l_completion_ast(lock, *flags, NULL);
494                         if (!rc)
495                                 rc = err;
496                         if (rc && type != LDLM_FLOCK) /* bug 9425, bug 10250 */
497                                 cleanup_phase = 1;
498                 }
499         }
500
501         if (lvb_len && lvb != NULL) {
502                 /* Copy the LVB here, and not earlier, because the completion
503                  * AST (if any) can override what we got in the reply */
504                 memcpy(lvb, lock->l_lvb_data, lvb_len);
505         }
506
507         LDLM_DEBUG(lock, "client-side enqueue END");
508         EXIT;
509 cleanup:
510         if (cleanup_phase == 1 && rc)
511                 failed_lock_cleanup(ns, lock, lockh, mode);
512         /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
513         LDLM_LOCK_PUT(lock);
514         LDLM_LOCK_PUT(lock);
515         return rc;
516 }
517
518 /* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
519  * a single page on the send/receive side. XXX: 512 should be changed
520  * to more adequate value. */
521 static inline int ldlm_req_handles_avail(int req_size, int off)
522 {
523         int avail;
524
525         avail = min_t(int, LDLM_MAXREQSIZE, CFS_PAGE_SIZE - 512) - req_size;
526         avail /= sizeof(struct lustre_handle);
527         avail += LDLM_LOCKREQ_HANDLES - off;
528
529         return avail;
530 }
531
532 static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
533                                              enum req_location loc,
534                                              int off)
535 {
536         int size = req_capsule_msg_size(pill, loc);
537         return ldlm_req_handles_avail(size, off);
538 }
539
540 static inline int ldlm_format_handles_avail(struct obd_import *imp,
541                                             const struct req_format *fmt,
542                                             enum req_location loc, int off)
543 {
544         int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
545         return ldlm_req_handles_avail(size, off);
546 }
547
548 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
549  * @count locks in @cancels. */
550 int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
551                       int version, int opc, int canceloff,
552                       struct list_head *cancels, int count)
553 {
554         struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
555         struct req_capsule      *pill = &req->rq_pill;
556         struct ldlm_request     *dlm = NULL;
557         int flags, avail, to_free, bufcount, pack = 0;
558         CFS_LIST_HEAD(head);
559         int rc;
560         ENTRY;
561
562         if (cancels == NULL)
563                 cancels = &head;
564         if (exp_connect_cancelset(exp)) {
565                 /* Estimate the amount of available space in the request. */
566                 bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
567                 avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
568
569                 flags = ns_connect_lru_resize(ns) ? 
570                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
571                 to_free = !ns_connect_lru_resize(ns) &&
572                           opc == LDLM_ENQUEUE ? 1 : 0;
573
574                 /* Cancel lru locks here _only_ if the server supports 
575                  * EARLY_CANCEL. Otherwise we have to send extra CANCEL
576                  * rpc, what will make us slower. */
577                 if (avail > count)
578                         count += ldlm_cancel_lru_local(ns, cancels, to_free,
579                                                        avail - count, 0, flags);
580                 if (avail > count)
581                         pack = count;
582                 else
583                         pack = avail;
584                 req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT,
585                                      ldlm_request_bufsize(pack, opc));
586         }
587
588         rc = ptlrpc_request_pack(req, version, opc);
589         if (rc) {
590                 ldlm_lock_list_put(cancels, l_bl_ast, count);
591                 RETURN(rc);
592         }
593
594         if (exp_connect_cancelset(exp)) {
595                 if (canceloff) {
596                         dlm = req_capsule_client_get(pill, &RMF_DLM_REQ);
597                         LASSERT(dlm);
598                         /* Skip first lock handler in ldlm_request_pack(),
599                          * this method will incrment @lock_count according
600                          * to the lock handle amount actually written to
601                          * the buffer. */
602                         dlm->lock_count = canceloff;
603                 }
604                 /* Pack into the request @pack lock handles. */
605                 ldlm_cli_cancel_list(cancels, pack, req, 0);
606                 /* Prepare and send separate cancel rpc for others. */
607                 ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
608         } else {
609                 ldlm_lock_list_put(cancels, l_bl_ast, count);
610         }
611         RETURN(0);
612 }
613
614 int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
615                           struct list_head *cancels, int count)
616 {
617         return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
618                                  LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
619 }
620
621 /* If a request has some specific initialisation it is passed in @reqp,
622  * otherwise it is created in ldlm_cli_enqueue.
623  *
624  * Supports sync and async requests, pass @async flag accordingly. If a
625  * request was created in ldlm_cli_enqueue and it is the async request,
626  * pass it to the caller in @reqp. */
627 int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
628                      struct ldlm_enqueue_info *einfo,
629                      const struct ldlm_res_id *res_id,
630                      ldlm_policy_data_t *policy, int *flags,
631                      void *lvb, __u32 lvb_len, void *lvb_swabber,
632                      struct lustre_handle *lockh, int async)
633 {
634         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
635         struct ldlm_lock      *lock;
636         struct ldlm_request   *body;
637         int                    is_replay = *flags & LDLM_FL_REPLAY;
638         int                    req_passed_in = 1;
639         int                    rc, err;
640         struct ptlrpc_request *req;
641         ENTRY;
642
643         LASSERT(exp != NULL);
644
645         /* If we're replaying this lock, just check some invariants.
646          * If we're creating a new lock, get everything all setup nice. */
647         if (is_replay) {
648                 lock = ldlm_handle2lock(lockh);
649                 LASSERT(lock != NULL);
650                 LDLM_DEBUG(lock, "client-side enqueue START");
651                 LASSERT(exp == lock->l_conn_export);
652         } else {
653                 lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
654                                         einfo->ei_mode, einfo->ei_cb_bl,
655                                         einfo->ei_cb_cp, einfo->ei_cb_gl,
656                                         einfo->ei_cbdata, lvb_len);
657                 if (lock == NULL)
658                         RETURN(-ENOMEM);
659                 /* for the local lock, add the reference */
660                 ldlm_lock_addref_internal(lock, einfo->ei_mode);
661                 ldlm_lock2handle(lock, lockh);
662                 lock->l_lvb_swabber = lvb_swabber;
663                 if (policy != NULL) {
664                         /* INODEBITS_INTEROP: If the server does not support
665                          * inodebits, we will request a plain lock in the
666                          * descriptor (ldlm_lock2desc() below) but use an
667                          * inodebits lock internally with both bits set.
668                          */
669                         if (einfo->ei_type == LDLM_IBITS &&
670                             !(exp->exp_connect_flags & OBD_CONNECT_IBITS))
671                                 lock->l_policy_data.l_inodebits.bits =
672                                         MDS_INODELOCK_LOOKUP |
673                                         MDS_INODELOCK_UPDATE;
674                         else
675                                 lock->l_policy_data = *policy;
676                 }
677
678                 if (einfo->ei_type == LDLM_EXTENT)
679                         lock->l_req_extent = policy->l_extent;
680                 LDLM_DEBUG(lock, "client-side enqueue START");
681         }
682
683         /* lock not sent to server yet */
684
685         if (reqp == NULL || *reqp == NULL) {
686                 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
687                                                 &RQF_LDLM_ENQUEUE,
688                                                 LUSTRE_DLM_VERSION,
689                                                 LDLM_ENQUEUE);
690                 if (req == NULL) {
691                         failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
692                         LDLM_LOCK_PUT(lock);
693                         RETURN(-ENOMEM);
694                 }
695                 req_passed_in = 0;
696                 if (reqp)
697                         *reqp = req;
698         } else {
699                 int len;
700
701                 req = *reqp;
702                 len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ,
703                                            RCL_CLIENT);
704                 LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n",
705                          DLM_LOCKREQ_OFF, len, (int)sizeof(*body));
706         }
707
708         lock->l_conn_export = exp;
709         lock->l_export = NULL;
710         lock->l_blocking_ast = einfo->ei_cb_bl;
711
712         /* Dump lock data into the request buffer */
713         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
714         ldlm_lock2desc(lock, &body->lock_desc);
715         body->lock_flags = *flags;
716         body->lock_handle[0] = *lockh;
717
718         /* Continue as normal. */
719         if (!req_passed_in) {
720                 if (lvb_len > 0) {
721                         req_capsule_extend(&req->rq_pill,
722                                            &RQF_LDLM_ENQUEUE_LVB);
723                         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
724                                              RCL_SERVER, lvb_len);
725                 }
726                 ptlrpc_request_set_replen(req);
727         }
728
729         /*
730          * Liblustre client doesn't get extent locks, except for O_APPEND case
731          * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where
732          * [i_size, OBD_OBJECT_EOF] lock is taken.
733          */
734         LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||
735                      policy->l_extent.end == OBD_OBJECT_EOF));
736
737         if (async) {
738                 LASSERT(reqp != NULL);
739                 RETURN(0);
740         }
741
742         LDLM_DEBUG(lock, "sending request");
743         rc = ptlrpc_queue_wait(req);
744         err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
745                                     einfo->ei_mode, flags, lvb, lvb_len,
746                                     lvb_swabber, lockh, rc);
747
748         /* If ldlm_cli_enqueue_fini did not find the lock, we need to free
749          * one reference that we took */
750         if (err == -ENOLCK)
751                 LDLM_LOCK_PUT(lock);
752         else
753                 rc = err;
754
755         if (!req_passed_in && req != NULL) {
756                 ptlrpc_req_finished(req);
757                 if (reqp)
758                         *reqp = NULL;
759         }
760
761         RETURN(rc);
762 }
763
764 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
765                                   __u32 *flags)
766 {
767         struct ldlm_resource *res;
768         int rc;
769         ENTRY;
770         if (ns_is_client(lock->l_resource->lr_namespace)) {
771                 CERROR("Trying to cancel local lock\n");
772                 LBUG();
773         }
774         LDLM_DEBUG(lock, "client-side local convert");
775
776         res = ldlm_lock_convert(lock, new_mode, flags);
777         if (res) {
778                 ldlm_reprocess_all(res);
779                 rc = 0;
780         } else {
781                 rc = EDEADLOCK;
782         }
783         LDLM_DEBUG(lock, "client-side local convert handler END");
784         LDLM_LOCK_PUT(lock);
785         RETURN(rc);
786 }
787
788 /* FIXME: one of ldlm_cli_convert or the server side should reject attempted
789  * conversion of locks which are on the waiting or converting queue */
790 /* Caller of this code is supposed to take care of lock readers/writers
791    accounting */
792 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags)
793 {
794         struct ldlm_request   *body;
795         struct ldlm_reply     *reply;
796         struct ldlm_lock      *lock;
797         struct ldlm_resource  *res;
798         struct ptlrpc_request *req;
799         int                    rc;
800         ENTRY;
801
802         lock = ldlm_handle2lock(lockh);
803         if (!lock) {
804                 LBUG();
805                 RETURN(-EINVAL);
806         }
807         *flags = 0;
808
809         if (lock->l_conn_export == NULL)
810                 RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
811
812         LDLM_DEBUG(lock, "client-side convert");
813
814         req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export),
815                                         &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION,
816                                         LDLM_CONVERT);
817         if (req == NULL) {
818                 LDLM_LOCK_PUT(lock);
819                 RETURN(-ENOMEM);
820         }
821
822         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
823         body->lock_handle[0] = lock->l_remote_handle;
824
825         body->lock_desc.l_req_mode = new_mode;
826         body->lock_flags = *flags;
827
828
829         ptlrpc_request_set_replen(req);
830         rc = ptlrpc_queue_wait(req);
831         if (rc != ELDLM_OK)
832                 GOTO(out, rc);
833
834         reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
835         if (reply == NULL)
836                 GOTO(out, rc = -EPROTO);
837
838         if (req->rq_status)
839                 GOTO(out, rc = req->rq_status);
840
841         res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
842         if (res != NULL) {
843                 ldlm_reprocess_all(res);
844                 /* Go to sleep until the lock is granted. */
845                 /* FIXME: or cancelled. */
846                 if (lock->l_completion_ast) {
847                         rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,
848                                                     NULL);
849                         if (rc)
850                                 GOTO(out, rc);
851                 }
852         } else {
853                 rc = EDEADLOCK;
854         }
855         EXIT;
856  out:
857         LDLM_LOCK_PUT(lock);
858         ptlrpc_req_finished(req);
859         return rc;
860 }
861
862 /* Cancel locks locally.
863  * Returns:
864  * LDLM_FL_LOCAL_ONLY if tere is no need in a CANCEL rpc to the server;
865  * LDLM_FL_CANCELING otherwise;
866  * LDLM_FL_BL_AST if there is a need in a separate CANCEL rpc. */
867 static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
868 {
869         int rc = LDLM_FL_LOCAL_ONLY;
870         ENTRY;
871         
872         if (lock->l_conn_export) {
873                 int local_only;
874
875                 LDLM_DEBUG(lock, "client-side cancel");
876                 /* Set this flag to prevent others from getting new references*/
877                 lock_res_and_lock(lock);
878                 lock->l_flags |= LDLM_FL_CBPENDING;
879                 local_only = (lock->l_flags &
880                               (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
881                 ldlm_cancel_callback(lock);
882                 rc = (lock->l_flags & LDLM_FL_BL_AST) ?
883                         LDLM_FL_BL_AST : LDLM_FL_CANCELING;
884                 unlock_res_and_lock(lock);
885
886                 if (local_only) {
887                         CDEBUG(D_DLMTRACE, "not sending request (at caller's "
888                                "instruction)\n");
889                         rc = LDLM_FL_LOCAL_ONLY;
890                 }
891                 ldlm_lock_cancel(lock);
892         } else {
893                 if (ns_is_client(lock->l_resource->lr_namespace)) {
894                         LDLM_ERROR(lock, "Trying to cancel local lock");
895                         LBUG();
896                 }
897                 LDLM_DEBUG(lock, "server-side local cancel");
898                 ldlm_lock_cancel(lock);
899                 ldlm_reprocess_all(lock->l_resource);
900                 LDLM_DEBUG(lock, "server-side local cancel handler END");
901         }
902
903         RETURN(rc);
904 }
905
906 /* Pack @count locks in @head into ldlm_request buffer at the offset @off,
907    of the request @req. */
908 static void ldlm_cancel_pack(struct ptlrpc_request *req,
909                              struct list_head *head, int count)
910 {
911         struct ldlm_request *dlm;
912         struct ldlm_lock *lock;
913         int max, packed = 0;
914         ENTRY;
915
916         dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
917         LASSERT(dlm != NULL);
918
919         /* Check the room in the request buffer. */
920         max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) - 
921                 sizeof(struct ldlm_request);
922         max /= sizeof(struct lustre_handle);
923         max += LDLM_LOCKREQ_HANDLES;
924         LASSERT(max >= dlm->lock_count + count);
925
926         /* XXX: it would be better to pack lock handles grouped by resource.
927          * so that the server cancel would call filter_lvbo_update() less
928          * frequently. */
929         list_for_each_entry(lock, head, l_bl_ast) {
930                 if (!count--)
931                         break;
932                 LASSERT(lock->l_conn_export);
933                 /* Pack the lock handle to the given request buffer. */
934                 LDLM_DEBUG(lock, "packing");
935                 dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
936                 packed++;
937         }
938         CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);
939         EXIT;
940 }
941
942 /* Prepare and send a batched cancel rpc, it will include count lock handles
943  * of locks given in @head. */
944 int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
945                         int count, int flags)
946 {
947         struct ptlrpc_request *req = NULL;
948         struct obd_import *imp;
949         int free, sent = 0;
950         int rc = 0;
951         ENTRY;
952
953         LASSERT(exp != NULL);
954         LASSERT(count > 0);
955
956         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, obd_fail_val);
957
958         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
959                 RETURN(count);
960
961         free = ldlm_format_handles_avail(class_exp2cliimp(exp),
962                                          &RQF_LDLM_CANCEL, RCL_CLIENT, 0);
963         if (count > free)
964                 count = free;
965
966         while (1) {
967                 int bufcount;
968
969                 imp = class_exp2cliimp(exp);
970                 if (imp == NULL || imp->imp_invalid) {
971                         CDEBUG(D_DLMTRACE,
972                                "skipping cancel on invalid import %p\n", imp);
973                         RETURN(count);
974                 }
975
976                 req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL);
977                 if (req == NULL)
978                         GOTO(out, rc = -ENOMEM);
979
980                 bufcount = req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT);
981                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
982                                      ldlm_request_bufsize(count, LDLM_CANCEL));
983
984                 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL);
985                 if (rc) {
986                         ptlrpc_request_free(req);
987                         GOTO(out, rc);
988                 }
989                 req->rq_no_resend = 1;
990                 req->rq_no_delay = 1;
991
992                 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
993                 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
994                 ptlrpc_at_set_req_timeout(req);
995
996                 ldlm_cancel_pack(req, cancels, count);
997
998                 ptlrpc_request_set_replen(req);
999                 if (flags & LDLM_FL_ASYNC) {
1000                         ptlrpcd_add_req(req);
1001                         sent = count;
1002                         GOTO(out, 0);
1003                 } else {
1004                         rc = ptlrpc_queue_wait(req);
1005                 }
1006                 if (rc == ESTALE) {
1007                         CDEBUG(D_DLMTRACE, "client/server (nid %s) "
1008                                "out of sync -- not fatal\n",
1009                                libcfs_nid2str(req->rq_import->
1010                                               imp_connection->c_peer.nid));
1011                         rc = 0;
1012                 } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
1013                            req->rq_import_generation == imp->imp_generation) {
1014                         ptlrpc_req_finished(req);
1015                         continue;
1016                 } else if (rc != ELDLM_OK) {
1017                         CERROR("Got rc %d from cancel RPC: canceling "
1018                                "anyway\n", rc);
1019                         break;
1020                 }
1021                 sent = count;
1022                 break;
1023         }
1024
1025         ptlrpc_req_finished(req);
1026         EXIT;
1027 out:
1028         return sent ? sent : rc;
1029 }
1030
1031 static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
1032 {
1033         LASSERT(imp != NULL);
1034         return &imp->imp_obd->obd_namespace->ns_pool;
1035 }
1036
1037 /**
1038  * Update client's obd pool related fields with new SLV and Limit from \a req.
1039  */
1040 int ldlm_cli_update_pool(struct ptlrpc_request *req)
1041 {
1042         struct obd_device *obd;
1043         __u64 old_slv, new_slv;
1044         __u32 new_limit;
1045         ENTRY;
1046     
1047         if (unlikely(!req->rq_import || !req->rq_import->imp_obd || 
1048                      !imp_connect_lru_resize(req->rq_import)))
1049         {
1050                 /* 
1051                  * Do nothing for corner cases. 
1052                  */
1053                 RETURN(0);
1054         }
1055
1056         /* 
1057          * In some cases RPC may contain slv and limit zeroed out. This is 
1058          * the case when server does not support lru resize feature. This is
1059          * also possible in some recovery cases when server side reqs have no
1060          * ref to obd export and thus access to server side namespace is no 
1061          * possible. 
1062          */
1063         if (lustre_msg_get_slv(req->rq_repmsg) == 0 || 
1064             lustre_msg_get_limit(req->rq_repmsg) == 0) {
1065                 DEBUG_REQ(D_HA, req, "Zero SLV or Limit found "
1066                           "(SLV: "LPU64", Limit: %u)", 
1067                           lustre_msg_get_slv(req->rq_repmsg), 
1068                           lustre_msg_get_limit(req->rq_repmsg));
1069                 RETURN(0);
1070         }
1071
1072         new_limit = lustre_msg_get_limit(req->rq_repmsg);
1073         new_slv = lustre_msg_get_slv(req->rq_repmsg);
1074         obd = req->rq_import->imp_obd;
1075
1076         /* 
1077          * Set new SLV and Limit to obd fields to make accessible for pool 
1078          * thread. We do not access obd_namespace and pool directly here
1079          * as there is no reliable way to make sure that they are still
1080          * alive in cleanup time. Evil races are possible which may cause
1081          * oops in that time. 
1082          */
1083         write_lock(&obd->obd_pool_lock);
1084         old_slv = obd->obd_pool_slv;
1085         obd->obd_pool_slv = new_slv;
1086         obd->obd_pool_limit = new_limit;
1087         write_unlock(&obd->obd_pool_lock);
1088
1089         /* 
1090          * Check if we need to wakeup pools thread for fast SLV change. 
1091          * This is only done when threads period is noticably long like 
1092          * 10s or more. 
1093          */
1094 #if defined(__KERNEL__) && (LDLM_POOLS_THREAD_PERIOD >= 10)
1095         if (old_slv > 0) {
1096                 __u64 fast_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE;
1097                 do_div(fast_change, 100);
1098
1099                 /* 
1100                  * Wake up pools thread only if SLV has changed more than 
1101                  * 50% since last update. In this case we want to react asap. 
1102                  * Otherwise it is no sense to wake up pools as they are 
1103                  * re-calculated every LDLM_POOLS_THREAD_PERIOD anyways. 
1104                  */
1105                 if (old_slv > new_slv && old_slv - new_slv > fast_change)
1106                         ldlm_pools_wakeup();
1107         }
1108 #endif
1109         RETURN(0);
1110 }
1111 EXPORT_SYMBOL(ldlm_cli_update_pool);
1112
1113 int ldlm_cli_cancel(struct lustre_handle *lockh)
1114 {
1115         struct obd_export *exp;
1116         int avail, flags, count = 1, rc = 0;
1117         struct ldlm_namespace *ns;
1118         struct ldlm_lock *lock;
1119         CFS_LIST_HEAD(cancels);
1120         ENTRY;
1121
1122         /* concurrent cancels on the same handle can happen */
1123         lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING);
1124         if (lock == NULL) {
1125                 LDLM_DEBUG_NOLOCK("lock is already being destroyed\n");
1126                 RETURN(0);
1127         }
1128
1129         rc = ldlm_cli_cancel_local(lock);
1130         if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) {
1131                 LDLM_LOCK_PUT(lock);
1132                 RETURN(rc < 0 ? rc : 0);
1133         }
1134         /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL
1135          * rpc which goes to canceld portal, so we can cancel other lru locks
1136          * here and send them all as one LDLM_CANCEL rpc. */
1137         LASSERT(list_empty(&lock->l_bl_ast));
1138         list_add(&lock->l_bl_ast, &cancels);
1139
1140         exp = lock->l_conn_export;
1141         if (exp_connect_cancelset(exp)) {
1142                 avail = ldlm_format_handles_avail(class_exp2cliimp(exp),
1143                                                   &RQF_LDLM_CANCEL,
1144                                                   RCL_CLIENT, 0);
1145                 LASSERT(avail > 0);
1146
1147                 ns = lock->l_resource->lr_namespace;
1148                 flags = ns_connect_lru_resize(ns) ?
1149                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
1150                 count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
1151                                                LDLM_FL_BL_AST, flags);
1152         }
1153         ldlm_cli_cancel_list(&cancels, count, NULL, 0);
1154         RETURN(0);
1155 }
1156
1157 /* XXX until we will have compound requests and can cut cancels from generic rpc
1158  * we need send cancels with LDLM_FL_BL_AST flag as separate rpc */
1159 static int ldlm_cancel_list(struct list_head *cancels, int count, int flags)
1160 {
1161         CFS_LIST_HEAD(head);
1162         struct ldlm_lock *lock, *next;
1163         int left = 0, bl_ast = 0, rc;
1164
1165         left = count;
1166         list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
1167                 if (left-- == 0)
1168                         break;
1169
1170                 if (flags & LDLM_FL_LOCAL_ONLY) {
1171                         rc = LDLM_FL_LOCAL_ONLY;
1172                         ldlm_lock_cancel(lock);
1173                 } else {
1174                         rc = ldlm_cli_cancel_local(lock);
1175                 }
1176                 if (!(flags & LDLM_FL_BL_AST) && (rc == LDLM_FL_BL_AST)) {
1177                         LDLM_DEBUG(lock, "Cancel lock separately");
1178                         list_del_init(&lock->l_bl_ast);
1179                         list_add(&lock->l_bl_ast, &head);
1180                         bl_ast ++;
1181                         continue;
1182                 }
1183                 if (rc == LDLM_FL_LOCAL_ONLY) {
1184                         /* CANCEL RPC should not be sent to server. */
1185                         list_del_init(&lock->l_bl_ast);
1186                         LDLM_LOCK_PUT(lock);
1187                         count--;
1188                 }
1189
1190         }
1191         if (bl_ast > 0) {
1192                 count -= bl_ast;
1193                 ldlm_cli_cancel_list(&head, bl_ast, NULL, 0);
1194         }
1195
1196         RETURN(count);
1197 }
1198
1199 /** 
1200  * Callback function for shrink policy. Makes decision whether to keep
1201  * \a lock in LRU for current \a LRU size \a unused, added in current scan
1202  * \a added and number of locks to be preferably canceled \a count.
1203  *
1204  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1205  *
1206  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1207  */
1208 static ldlm_policy_res_t ldlm_cancel_shrink_policy(struct ldlm_namespace *ns,
1209                                                    struct ldlm_lock *lock,
1210                                                    int unused, int added, 
1211                                                    int count)
1212 {
1213         int lock_cost;
1214         __u64 page_nr;
1215
1216         /* 
1217          * Stop lru processing when we reached passed @count or checked all 
1218          * locks in lru. 
1219          */
1220         if (count && added >= count)
1221                 return LDLM_POLICY_KEEP_LOCK;
1222
1223         if (lock->l_resource->lr_type == LDLM_EXTENT) {
1224                 struct ldlm_extent *l_extent;
1225
1226                 /* 
1227                  * For all extent locks cost is 1 + number of pages in
1228                  * their extent. 
1229                  */
1230                 l_extent = &lock->l_policy_data.l_extent;
1231                 page_nr = (l_extent->end - l_extent->start);
1232                 do_div(page_nr, CFS_PAGE_SIZE);
1233
1234 #ifdef __KERNEL__
1235                 /* 
1236                  * XXX: In fact this is evil hack, we can't access inode
1237                  * here. For doing it right we need somehow to have number
1238                  * of covered by lock. This should be fixed later when 10718 
1239                  * is landed. 
1240                  */
1241                 if (lock->l_ast_data != NULL) {
1242                         struct inode *inode = lock->l_ast_data;
1243                         if (page_nr > inode->i_mapping->nrpages)
1244                                 page_nr = inode->i_mapping->nrpages;
1245                 }
1246 #endif
1247                 lock_cost = 1 + page_nr;
1248         } else {
1249                 /* 
1250                  * For all locks which are not extent ones cost is 1 
1251                  */
1252                 lock_cost = 1;
1253         }
1254
1255         /* 
1256          * Keep all expensive locks in lru for the memory pressure time
1257          * cancel policy. They anyways may be canceled by lru resize
1258          * pplicy if they have not small enough CLV. 
1259          */
1260         return lock_cost > ns->ns_shrink_thumb ? 
1261                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1262 }
1263
1264 /**
1265  * Callback function for lru-resize policy. Makes decision whether to keep
1266  * \a lock in LRU for current \a LRU size \a unused, added in current scan
1267  * \a added and number of locks to be preferably canceled \a count.
1268  *
1269  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1270  *
1271  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1272  */
1273 static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
1274                                                  struct ldlm_lock *lock, 
1275                                                  int unused, int added, 
1276                                                  int count)
1277 {
1278         cfs_time_t cur = cfs_time_current();
1279         struct ldlm_pool *pl = &ns->ns_pool;
1280         __u64 slv, lvf, lv;
1281         cfs_time_t la;
1282
1283         /* 
1284          * Stop lru processing when we reached passed @count or checked all 
1285          * locks in lru.
1286          */
1287         if (count && added >= count)
1288                 return LDLM_POLICY_KEEP_LOCK;
1289
1290         slv = ldlm_pool_get_slv(pl);
1291         lvf = ldlm_pool_get_lvf(pl);
1292         la = cfs_duration_sec(cfs_time_sub(cur, 
1293                               lock->l_last_used));
1294
1295         /* 
1296          * Stop when slv is not yet come from server or lv is smaller than 
1297          * it is.
1298          */
1299         lv = lvf * la * unused;
1300         
1301         /* 
1302          * Inform pool about current CLV to see it via proc. 
1303          */
1304         ldlm_pool_set_clv(pl, lv);
1305         return (slv == 1 || lv < slv) ? 
1306                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1307 }
1308
1309 /**
1310  * Callback function for proc used policy. Makes decision whether to keep
1311  * \a lock in LRU for current \a LRU size \a unused, added in current scan
1312  * \a added and number of locks to be preferably canceled \a count.
1313  *
1314  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1315  *
1316  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1317  */
1318 static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns,
1319                                                    struct ldlm_lock *lock, 
1320                                                    int unused, int added,
1321                                                    int count)
1322 {
1323         /* 
1324          * Stop lru processing when we reached passed @count or checked all 
1325          * locks in lru. 
1326          */
1327         return (added >= count) ? 
1328                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1329 }
1330
1331 /**
1332  * Callback function for aged policy. Makes decision whether to keep
1333  * \a lock in LRU for current \a LRU size \a unused, added in current scan
1334  * \a added and number of locks to be preferably canceled \a count.
1335  *
1336  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1337  *
1338  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1339  */
1340 static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns,
1341                                                  struct ldlm_lock *lock, 
1342                                                  int unused, int added,
1343                                                  int count)
1344 {
1345         /* 
1346          * Stop lru processing if young lock is found and we reached passed 
1347          * @count. 
1348          */
1349         return ((added >= count) && 
1350                 cfs_time_before(cfs_time_current(),
1351                                 cfs_time_add(lock->l_last_used,
1352                                              ns->ns_max_age))) ? 
1353                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1354 }
1355
1356 /**
1357  * Callback function for default policy. Makes decision whether to keep
1358  * \a lock in LRU for current \a LRU size \a unused, added in current scan
1359  * \a added and number of locks to be preferably canceled \a count.
1360  *
1361  * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning
1362  *
1363  * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU
1364  */
1365 static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns,
1366                                                     struct ldlm_lock *lock, 
1367                                                     int unused, int added,
1368                                                     int count)
1369 {
1370         /* 
1371          * Stop lru processing when we reached passed @count or checked all 
1372          * locks in lru. 
1373          */
1374         return (added >= count) ? 
1375                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
1376 }
1377
1378 typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *, 
1379                                                       struct ldlm_lock *, int, 
1380                                                       int, int);
1381
1382 static ldlm_cancel_lru_policy_t
1383 ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags)
1384 {
1385         if (ns_connect_lru_resize(ns)) {
1386                 if (flags & LDLM_CANCEL_SHRINK)
1387                         return ldlm_cancel_shrink_policy;
1388                 else if (flags & LDLM_CANCEL_LRUR)
1389                         return ldlm_cancel_lrur_policy;
1390                 else if (flags & LDLM_CANCEL_PASSED)
1391                         return ldlm_cancel_passed_policy;
1392         } else {
1393                 if (flags & LDLM_CANCEL_AGED)
1394                         return ldlm_cancel_aged_policy;
1395         }
1396         
1397         return ldlm_cancel_default_policy;
1398 }
1399  
1400 /* - Free space in lru for @count new locks,
1401  *   redundant unused locks are canceled locally;
1402  * - also cancel locally unused aged locks;
1403  * - do not cancel more than @max locks;
1404  * - GET the found locks and add them into the @cancels list.
1405  *
1406  * A client lock can be added to the l_bl_ast list only when it is
1407  * marked LDLM_FL_CANCELING. Otherwise, somebody is already doing CANCEL.
1408  * There are the following use cases: ldlm_cancel_resource_local(),
1409  * ldlm_cancel_lru_local() and ldlm_cli_cancel(), which check&set this
1410  * flag properly. As any attempt to cancel a lock rely on this flag,
1411  * l_bl_ast list is accessed later without any special locking.
1412  *
1413  * Calling policies for enabled lru resize:
1414  * ----------------------------------------
1415  * flags & LDLM_CANCEL_LRUR - use lru resize policy (SLV from server) to
1416  *                            cancel not more than @count locks;
1417  *
1418  * flags & LDLM_CANCEL_PASSED - cancel @count number of old locks (located at
1419  *                              the beginning of lru list);
1420  *
1421  * flags & LDLM_CANCEL_SHRINK - cancel not more than @count locks according to
1422  *                              memory pressre policy function;
1423  *
1424  * flags & LDLM_CANCEL_AGED -   cancel alocks according to "aged policy".
1425  */
1426 int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
1427                           int count, int max, int cancel_flags, int flags)
1428 {
1429         ldlm_cancel_lru_policy_t pf;
1430         struct ldlm_lock *lock, *next;
1431         int added = 0, unused;
1432         ENTRY;
1433
1434         spin_lock(&ns->ns_unused_lock);
1435         unused = ns->ns_nr_unused;
1436
1437         if (!ns_connect_lru_resize(ns))
1438                 count += unused - ns->ns_max_unused;
1439
1440         pf = ldlm_cancel_lru_policy(ns, flags);
1441         LASSERT(pf != NULL);
1442         
1443         while (!list_empty(&ns->ns_unused_list)) {
1444                 /* For any flags, stop scanning if @max is reached. */
1445                 if (max && added >= max)
1446                         break;
1447
1448                 list_for_each_entry_safe(lock, next, &ns->ns_unused_list, l_lru){
1449                         /* No locks which got blocking requests. */
1450                         LASSERT(!(lock->l_flags & LDLM_FL_BL_AST));
1451
1452                         /* Somebody is already doing CANCEL. No need in this
1453                          * lock in lru, do not traverse it again. */
1454                         if (!(lock->l_flags & LDLM_FL_CANCELING))
1455                                 break;
1456
1457                         ldlm_lock_remove_from_lru_nolock(lock);
1458                 }
1459                 if (&lock->l_lru == &ns->ns_unused_list)
1460                         break;
1461
1462                 /* Pass the lock through the policy filter and see if it
1463                  * should stay in lru.
1464                  *
1465                  * Even for shrinker policy we stop scanning if
1466                  * we find a lock that should stay in the cache.
1467                  * We should take into account lock age anyway
1468                  * as new lock even if it is small of weight is
1469                  * valuable resource. 
1470                  *
1471                  * That is, for shrinker policy we drop only
1472                  * old locks, but additionally chose them by
1473                  * their weight. Big extent locks will stay in 
1474                  * the cache. */
1475                 if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK)
1476                         break;
1477
1478                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
1479                 spin_unlock(&ns->ns_unused_lock);
1480
1481                 lock_res_and_lock(lock);
1482                 /* Check flags again under the lock. */
1483                 if ((lock->l_flags & LDLM_FL_CANCELING) ||
1484                     (ldlm_lock_remove_from_lru(lock) == 0)) {
1485                         /* other thread is removing lock from lru or
1486                          * somebody is already doing CANCEL or
1487                          * there is a blocking request which will send
1488                          * cancel by itseft or the lock is matched
1489                          * is already not unused. */
1490                         unlock_res_and_lock(lock);
1491                         LDLM_LOCK_PUT(lock);
1492                         spin_lock(&ns->ns_unused_lock);
1493                         continue;
1494                 }
1495                 LASSERT(!lock->l_readers && !lock->l_writers);
1496
1497                 /* If we have chosen to cancel this lock voluntarily, we
1498                  * better send cancel notification to server, so that it
1499                  * frees appropriate state. This might lead to a race 
1500                  * where while we are doing cancel here, server is also 
1501                  * silently cancelling this lock. */
1502                 lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK;
1503
1504                 /* Setting the CBPENDING flag is a little misleading,
1505                  * but prevents an important race; namely, once
1506                  * CBPENDING is set, the lock can accumulate no more
1507                  * readers/writers. Since readers and writers are
1508                  * already zero here, ldlm_lock_decref() won't see
1509                  * this flag and call l_blocking_ast */
1510                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
1511
1512                 /* We can't re-add to l_lru as it confuses the
1513                  * refcounting in ldlm_lock_remove_from_lru() if an AST
1514                  * arrives after we drop ns_lock below. We use l_bl_ast
1515                  * and can't use l_pending_chain as it is used both on
1516                  * server and client nevertheless bug 5666 says it is
1517                  * used only on server */
1518                 LASSERT(list_empty(&lock->l_bl_ast));
1519                 list_add(&lock->l_bl_ast, cancels);
1520                 unlock_res_and_lock(lock);
1521                 spin_lock(&ns->ns_unused_lock);
1522                 added++;
1523                 unused--;
1524         }
1525         spin_unlock(&ns->ns_unused_lock);
1526         RETURN(ldlm_cancel_list(cancels, added, cancel_flags));
1527 }
1528
1529 /* Returns number of locks which could be canceled next time when 
1530  * ldlm_cancel_lru() is called. Used from locks pool shrinker. */
1531 int ldlm_cancel_lru_estimate(struct ldlm_namespace *ns,
1532                              int count, int max, int flags)
1533 {
1534         ldlm_cancel_lru_policy_t pf;
1535         struct ldlm_lock *lock;
1536         int added = 0, unused;
1537         ENTRY;
1538
1539         pf = ldlm_cancel_lru_policy(ns, flags);
1540         LASSERT(pf != NULL);
1541         spin_lock(&ns->ns_unused_lock);
1542         unused = ns->ns_nr_unused;
1543
1544         list_for_each_entry(lock, &ns->ns_unused_list, l_lru) {
1545                 /* For any flags, stop scanning if @max is reached. */
1546                 if (max && added >= max)
1547                         break;
1548
1549                 /* Somebody is already doing CANCEL or there is a
1550                  * blocking request will send cancel. Let's not count 
1551                  * this lock. */
1552                 if ((lock->l_flags & LDLM_FL_CANCELING) ||
1553                     (lock->l_flags & LDLM_FL_BL_AST)) 
1554                         continue;
1555
1556                 /* Pass the lock through the policy filter and see if it
1557                  * should stay in lru. */
1558                 if (pf(ns, lock, unused, added, count) == LDLM_POLICY_KEEP_LOCK)
1559                         break;
1560
1561                 added++;
1562                 unused--;
1563         }
1564         spin_unlock(&ns->ns_unused_lock);
1565         RETURN(added);
1566 }
1567
1568 /* when called with LDLM_ASYNC the blocking callback will be handled
1569  * in a thread and this function will return after the thread has been
1570  * asked to call the callback.  when called with LDLM_SYNC the blocking
1571  * callback will be performed in this function. */
1572 int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync, 
1573                     int flags)
1574 {
1575         CFS_LIST_HEAD(cancels);
1576         int count, rc;
1577         ENTRY;
1578
1579 #ifndef __KERNEL__
1580         sync = LDLM_SYNC; /* force to be sync in user space */
1581 #endif
1582         count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0, flags);
1583         if (sync == LDLM_ASYNC) {
1584                 rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
1585                 if (rc == 0)
1586                         RETURN(count);
1587         }
1588
1589         /* If an error occured in ASYNC mode, or
1590          * this is SYNC mode, cancel the list. */
1591         ldlm_cli_cancel_list(&cancels, count, NULL, 0);
1592         RETURN(count);
1593 }
1594
1595 /* Find and cancel locally unused locks found on resource, matched to the
1596  * given policy, mode. GET the found locks and add them into the @cancels
1597  * list. */
1598 int ldlm_cancel_resource_local(struct ldlm_resource *res,
1599                                struct list_head *cancels,
1600                                ldlm_policy_data_t *policy,
1601                                ldlm_mode_t mode, int lock_flags,
1602                                int cancel_flags, void *opaque)
1603 {
1604         struct ldlm_lock *lock;
1605         int count = 0;
1606         ENTRY;
1607
1608         lock_res(res);
1609         list_for_each_entry(lock, &res->lr_granted, l_res_link) {
1610                 if (opaque != NULL && lock->l_ast_data != opaque) {
1611                         LDLM_ERROR(lock, "data %p doesn't match opaque %p",
1612                                    lock->l_ast_data, opaque);
1613                         //LBUG();
1614                         continue;
1615                 }
1616
1617                 if (lock->l_readers || lock->l_writers) {
1618                         if (cancel_flags & LDLM_FL_WARN) {
1619                                 LDLM_ERROR(lock, "lock in use");
1620                                 //LBUG();
1621                         }
1622                         continue;
1623                 }
1624
1625                 /* If somebody is already doing CANCEL, or blocking ast came,
1626                  * skip this lock. */
1627                 if (lock->l_flags & LDLM_FL_BL_AST || 
1628                     lock->l_flags & LDLM_FL_CANCELING)
1629                         continue;
1630
1631                 if (lockmode_compat(lock->l_granted_mode, mode))
1632                         continue;
1633
1634                 /* If policy is given and this is IBITS lock, add to list only
1635                  * those locks that match by policy. */
1636                 if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
1637                     !(lock->l_policy_data.l_inodebits.bits &
1638                       policy->l_inodebits.bits))
1639                         continue;
1640
1641                 /* See CBPENDING comment in ldlm_cancel_lru */
1642                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
1643                                  lock_flags;
1644
1645                 LASSERT(list_empty(&lock->l_bl_ast));
1646                 list_add(&lock->l_bl_ast, cancels);
1647                 LDLM_LOCK_GET(lock);
1648                 count++;
1649         }
1650         unlock_res(res);
1651
1652         RETURN(ldlm_cancel_list(cancels, count, cancel_flags));
1653 }
1654
1655 /* If @req is NULL, send CANCEL request to server with handles of locks 
1656  * in the @cancels. If EARLY_CANCEL is not supported, send CANCEL requests 
1657  * separately per lock.
1658  * If @req is not NULL, put handles of locks in @cancels into the request 
1659  * buffer at the offset @off.
1660  * Destroy @cancels at the end. */
1661 int ldlm_cli_cancel_list(struct list_head *cancels, int count,
1662                          struct ptlrpc_request *req, int flags)
1663 {
1664         struct ldlm_lock *lock;
1665         int res = 0;
1666         ENTRY;
1667
1668         if (list_empty(cancels) || count == 0)
1669                 RETURN(0);
1670         
1671         /* XXX: requests (both batched and not) could be sent in parallel. 
1672          * Usually it is enough to have just 1 RPC, but it is possible that
1673          * there are to many locks to be cancelled in LRU or on a resource.
1674          * It would also speed up the case when the server does not support
1675          * the feature. */
1676         while (count > 0) {
1677                 LASSERT(!list_empty(cancels));
1678                 lock = list_entry(cancels->next, struct ldlm_lock, l_bl_ast);
1679                 LASSERT(lock->l_conn_export);
1680
1681                 if (exp_connect_cancelset(lock->l_conn_export)) {
1682                         res = count;
1683                         if (req)
1684                                 ldlm_cancel_pack(req, cancels, count);
1685                         else
1686                                 res = ldlm_cli_cancel_req(lock->l_conn_export,
1687                                                           cancels, count,
1688                                                           flags);
1689                 } else {
1690                         res = ldlm_cli_cancel_req(lock->l_conn_export,
1691                                                   cancels, 1, flags);
1692                 }
1693
1694                 if (res < 0) {
1695                         CERROR("ldlm_cli_cancel_list: %d\n", res);
1696                         res = count;
1697                 }
1698
1699                 count -= res;
1700                 ldlm_lock_list_put(cancels, l_bl_ast, res);
1701         }
1702         LASSERT(count == 0);
1703         RETURN(0);
1704 }
1705
1706 int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
1707                                     const struct ldlm_res_id *res_id,
1708                                     ldlm_policy_data_t *policy,
1709                                     ldlm_mode_t mode, int flags, void *opaque)
1710 {
1711         struct ldlm_resource *res;
1712         CFS_LIST_HEAD(cancels);
1713         int count;
1714         int rc;
1715         ENTRY;
1716
1717         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1718         if (res == NULL) {
1719                 /* This is not a problem. */
1720                 CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]);
1721                 RETURN(0);
1722         }
1723
1724         count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
1725                                            0, flags, opaque);
1726         rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags);
1727         if (rc != ELDLM_OK)
1728                 CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
1729
1730         ldlm_resource_putref(res);
1731         RETURN(0);
1732 }
1733
1734 static inline int have_no_nsresource(struct ldlm_namespace *ns)
1735 {
1736         int no_resource = 0;
1737
1738         spin_lock(&ns->ns_hash_lock);
1739         if (ns->ns_resources == 0)
1740                 no_resource = 1;
1741         spin_unlock(&ns->ns_hash_lock);
1742
1743         RETURN(no_resource);
1744 }
1745
1746 /* Cancel all locks on a namespace (or a specific resource, if given)
1747  * that have 0 readers/writers.
1748  *
1749  * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
1750  * to notify the server. */
1751 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
1752                            const struct ldlm_res_id *res_id,
1753                            int flags, void *opaque)
1754 {
1755         int i;
1756         ENTRY;
1757
1758         if (ns == NULL)
1759                 RETURN(ELDLM_OK);
1760
1761         if (res_id)
1762                 RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, NULL,
1763                                                        LCK_MINMODE, flags,
1764                                                        opaque));
1765
1766         spin_lock(&ns->ns_hash_lock);
1767         for (i = 0; i < RES_HASH_SIZE; i++) {
1768                 struct list_head *tmp;
1769                 tmp = ns->ns_hash[i].next;
1770                 while (tmp != &(ns->ns_hash[i])) {
1771                         struct ldlm_resource *res;
1772                         int rc;
1773
1774                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
1775                         ldlm_resource_getref(res);
1776                         spin_unlock(&ns->ns_hash_lock);
1777
1778                         rc = ldlm_cli_cancel_unused_resource(ns, &res->lr_name,
1779                                                              NULL, LCK_MINMODE,
1780                                                              flags, opaque);
1781
1782                         if (rc)
1783                                 CERROR("ldlm_cli_cancel_unused ("LPU64"): %d\n",
1784                                        res->lr_name.name[0], rc);
1785
1786                         spin_lock(&ns->ns_hash_lock);
1787                         tmp = tmp->next;
1788                         ldlm_resource_putref_locked(res);
1789                 }
1790         }
1791         spin_unlock(&ns->ns_hash_lock);
1792
1793         RETURN(ELDLM_OK);
1794 }
1795
1796 /* join/split resource locks to/from lru list */
1797 int ldlm_cli_join_lru(struct ldlm_namespace *ns,
1798                       const struct ldlm_res_id *res_id, int join)
1799 {
1800         struct ldlm_resource *res;
1801         struct ldlm_lock *lock, *n;
1802         int count = 0;
1803         ENTRY;
1804
1805         LASSERT(ns_is_client(ns));
1806
1807         res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0);
1808         if (res == NULL)
1809                 RETURN(count);
1810         LASSERT(res->lr_type == LDLM_EXTENT);
1811
1812         lock_res(res);
1813         if (!join)
1814                 goto split;
1815
1816         list_for_each_entry_safe (lock, n, &res->lr_granted, l_res_link) {
1817                 if (list_empty(&lock->l_lru) &&
1818                     !lock->l_readers && !lock->l_writers &&
1819                     !(lock->l_flags & LDLM_FL_LOCAL) &&
1820                     !(lock->l_flags & LDLM_FL_CBPENDING) &&
1821                     !(lock->l_flags & LDLM_FL_BL_AST)) {
1822                         ldlm_lock_add_to_lru(lock);
1823                         lock->l_flags &= ~LDLM_FL_NO_LRU;
1824                         LDLM_DEBUG(lock, "join lock to lru");
1825                         count++;
1826                 }
1827         }
1828         goto unlock;
1829 split:
1830         spin_lock(&ns->ns_unused_lock);
1831         list_for_each_entry_safe (lock, n, &ns->ns_unused_list, l_lru) {
1832                 if (lock->l_resource == res) {
1833                         ldlm_lock_remove_from_lru_nolock(lock);
1834                         lock->l_flags |= LDLM_FL_NO_LRU;
1835                         LDLM_DEBUG(lock, "split lock from lru");
1836                         count++;
1837                 }
1838         }
1839         spin_unlock(&ns->ns_unused_lock);
1840 unlock:
1841         unlock_res(res);
1842         ldlm_resource_putref(res);
1843         RETURN(count);
1844 }
1845
1846 /* Lock iterators. */
1847
1848 int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
1849                           void *closure)
1850 {
1851         struct list_head *tmp, *next;
1852         struct ldlm_lock *lock;
1853         int rc = LDLM_ITER_CONTINUE;
1854
1855         ENTRY;
1856
1857         if (!res)
1858                 RETURN(LDLM_ITER_CONTINUE);
1859
1860         lock_res(res);
1861         list_for_each_safe(tmp, next, &res->lr_granted) {
1862                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1863
1864                 if (iter(lock, closure) == LDLM_ITER_STOP)
1865                         GOTO(out, rc = LDLM_ITER_STOP);
1866         }
1867
1868         list_for_each_safe(tmp, next, &res->lr_converting) {
1869                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1870
1871                 if (iter(lock, closure) == LDLM_ITER_STOP)
1872                         GOTO(out, rc = LDLM_ITER_STOP);
1873         }
1874
1875         list_for_each_safe(tmp, next, &res->lr_waiting) {
1876                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
1877
1878                 if (iter(lock, closure) == LDLM_ITER_STOP)
1879                         GOTO(out, rc = LDLM_ITER_STOP);
1880         }
1881  out:
1882         unlock_res(res);
1883         RETURN(rc);
1884 }
1885
1886 struct iter_helper_data {
1887         ldlm_iterator_t iter;
1888         void *closure;
1889 };
1890
1891 static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
1892 {
1893         struct iter_helper_data *helper = closure;
1894         return helper->iter(lock, helper->closure);
1895 }
1896
1897 static int ldlm_res_iter_helper(struct ldlm_resource *res, void *closure)
1898 {
1899         return ldlm_resource_foreach(res, ldlm_iter_helper, closure);
1900 }
1901
1902 int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
1903                            void *closure)
1904 {
1905         struct iter_helper_data helper = { iter: iter, closure: closure };
1906         return ldlm_namespace_foreach_res(ns, ldlm_res_iter_helper, &helper);
1907 }
1908
1909 int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
1910                                ldlm_res_iterator_t iter, void *closure)
1911 {
1912         int i, rc = LDLM_ITER_CONTINUE;
1913         struct ldlm_resource *res;
1914         struct list_head *tmp;
1915
1916         ENTRY;
1917         spin_lock(&ns->ns_hash_lock);
1918         for (i = 0; i < RES_HASH_SIZE; i++) {
1919                 tmp = ns->ns_hash[i].next;
1920                 while (tmp != &(ns->ns_hash[i])) {
1921                         res = list_entry(tmp, struct ldlm_resource, lr_hash);
1922                         ldlm_resource_getref(res);
1923                         spin_unlock(&ns->ns_hash_lock);
1924
1925                         rc = iter(res, closure);
1926
1927                         spin_lock(&ns->ns_hash_lock);
1928                         tmp = tmp->next;
1929                         ldlm_resource_putref_locked(res);
1930                         if (rc == LDLM_ITER_STOP)
1931                                 GOTO(out, rc);
1932                 }
1933         }
1934  out:
1935         spin_unlock(&ns->ns_hash_lock);
1936         RETURN(rc);
1937 }
1938
1939 /* non-blocking function to manipulate a lock whose cb_data is being put away.*/
1940 void ldlm_resource_iterate(struct ldlm_namespace *ns,
1941                            const struct ldlm_res_id *res_id,
1942                            ldlm_iterator_t iter, void *data)
1943 {
1944         struct ldlm_resource *res;
1945         ENTRY;
1946
1947         if (ns == NULL) {
1948                 CERROR("must pass in namespace\n");
1949                 LBUG();
1950         }
1951
1952         res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
1953         if (res == NULL) {
1954                 EXIT;
1955                 return;
1956         }
1957
1958         ldlm_resource_foreach(res, iter, data);
1959         ldlm_resource_putref(res);
1960         EXIT;
1961 }
1962
1963 /* Lock replay */
1964
1965 static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
1966 {
1967         struct list_head *list = closure;
1968
1969         /* we use l_pending_chain here, because it's unused on clients. */
1970         LASSERTF(list_empty(&lock->l_pending_chain),"lock %p next %p prev %p\n",
1971                  lock, &lock->l_pending_chain.next,&lock->l_pending_chain.prev);
1972         /* bug 9573: don't replay locks left after eviction */
1973         if (!(lock->l_flags & LDLM_FL_FAILED))
1974                 list_add(&lock->l_pending_chain, list);
1975         return LDLM_ITER_CONTINUE;
1976 }
1977
1978 static int replay_lock_interpret(struct ptlrpc_request *req,
1979                                  struct ldlm_async_args *aa, int rc)
1980 {
1981         struct ldlm_lock  *lock;
1982         struct ldlm_reply *reply;
1983
1984         ENTRY;
1985         atomic_dec(&req->rq_import->imp_replay_inflight);
1986         if (rc != ELDLM_OK)
1987                 GOTO(out, rc);
1988
1989
1990         reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1991         if (reply == NULL)
1992                 GOTO(out, rc = -EPROTO);
1993
1994         lock = ldlm_handle2lock(&aa->lock_handle);
1995         if (!lock) {
1996                 CERROR("received replay ack for unknown local cookie "LPX64
1997                        " remote cookie "LPX64 " from server %s id %s\n",
1998                        aa->lock_handle.cookie, reply->lock_handle.cookie,
1999                        req->rq_export->exp_client_uuid.uuid,
2000                        libcfs_id2str(req->rq_peer));
2001                 GOTO(out, rc = -ESTALE);
2002         }
2003
2004         lock->l_remote_handle = reply->lock_handle;
2005         LDLM_DEBUG(lock, "replayed lock:");
2006         ptlrpc_import_recovery_state_machine(req->rq_import);
2007         LDLM_LOCK_PUT(lock);
2008 out:
2009         if (rc != ELDLM_OK)
2010                 ptlrpc_connect_import(req->rq_import, NULL);
2011
2012
2013         RETURN(rc);
2014 }
2015
2016 static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
2017 {
2018         struct ptlrpc_request *req;
2019         struct ldlm_async_args *aa;
2020         struct ldlm_request   *body;
2021         int flags;
2022         ENTRY;
2023
2024
2025         /* Bug 11974: Do not replay a lock which is actively being canceled */
2026         if (lock->l_flags & LDLM_FL_CANCELING) {
2027                 LDLM_DEBUG(lock, "Not replaying canceled lock:");
2028                 RETURN(0);
2029         }
2030
2031         /* If this is reply-less callback lock, we cannot replay it, since
2032          * server might have long dropped it, but notification of that event was
2033          * lost by network. (and server granted conflicting lock already) */
2034         if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
2035                 LDLM_DEBUG(lock, "Not replaying reply-less lock:");
2036                 ldlm_lock_cancel(lock);
2037                 RETURN(0);
2038         }
2039         /*
2040          * If granted mode matches the requested mode, this lock is granted.
2041          *
2042          * If they differ, but we have a granted mode, then we were granted
2043          * one mode and now want another: ergo, converting.
2044          *
2045          * If we haven't been granted anything and are on a resource list,
2046          * then we're blocked/waiting.
2047          *
2048          * If we haven't been granted anything and we're NOT on a resource list,
2049          * then we haven't got a reply yet and don't have a known disposition.
2050          * This happens whenever a lock enqueue is the request that triggers
2051          * recovery.
2052          */
2053         if (lock->l_granted_mode == lock->l_req_mode)
2054                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
2055         else if (lock->l_granted_mode)
2056                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
2057         else if (!list_empty(&lock->l_res_link))
2058                 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
2059         else
2060                 flags = LDLM_FL_REPLAY;
2061
2062         req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
2063                                         LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
2064         if (req == NULL)
2065                 RETURN(-ENOMEM);
2066
2067         /* We're part of recovery, so don't wait for it. */
2068         req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
2069
2070         body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
2071         ldlm_lock2desc(lock, &body->lock_desc);
2072         body->lock_flags = flags;
2073
2074         ldlm_lock2handle(lock, &body->lock_handle[0]);
2075         if (lock->l_lvb_len != 0) {
2076                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
2077                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2078                                      lock->l_lvb_len);
2079         }
2080         ptlrpc_request_set_replen(req);
2081         /* notify the server we've replayed all requests.
2082          * also, we mark the request to be put on a dedicated
2083          * queue to be processed after all request replayes.
2084          * bug 6063 */
2085         lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
2086
2087         LDLM_DEBUG(lock, "replaying lock:");
2088
2089         atomic_inc(&req->rq_import->imp_replay_inflight);
2090         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2091         aa = ptlrpc_req_async_args(req);
2092         aa->lock_handle = body->lock_handle[0];
2093         req->rq_interpret_reply = replay_lock_interpret;
2094         ptlrpcd_add_req(req);
2095
2096         RETURN(0);
2097 }
2098
2099 int ldlm_replay_locks(struct obd_import *imp)
2100 {
2101         struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
2102         CFS_LIST_HEAD(list);
2103         struct ldlm_lock *lock, *next;
2104         int rc = 0;
2105
2106         ENTRY;
2107
2108         LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
2109
2110         /* ensure this doesn't fall to 0 before all have been queued */
2111         atomic_inc(&imp->imp_replay_inflight);
2112
2113         (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
2114
2115         list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
2116                 list_del_init(&lock->l_pending_chain);
2117                 if (rc)
2118                         continue; /* or try to do the rest? */
2119                 rc = replay_one_lock(imp, lock);
2120         }
2121
2122         atomic_dec(&imp->imp_replay_inflight);
2123
2124         RETURN(rc);
2125 }