Whamcloud - gitweb
LU-3030 build: Update Master Copyrights pre 2.4 split
[fs/lustre-release.git] / lustre / osc / osc_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_lock for OSC layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_OSC
42
43 #ifdef __KERNEL__
44 # include <libcfs/libcfs.h>
45 #else
46 # include <liblustre.h>
47 #endif
48 /* fid_build_reg_res_name() */
49 #include <lustre_fid.h>
50
51 #include "osc_cl_internal.h"
52
53 /** \addtogroup osc 
54  *  @{ 
55  */
56
57 #define _PAGEREF_MAGIC  (-10000000)
58
59 /*****************************************************************************
60  *
61  * Type conversions.
62  *
63  */
64
65 static const struct cl_lock_operations osc_lock_ops;
66 static const struct cl_lock_operations osc_lock_lockless_ops;
67 static void osc_lock_to_lockless(const struct lu_env *env,
68                                  struct osc_lock *ols, int force);
69 static int osc_lock_has_pages(struct osc_lock *olck);
70
71 int osc_lock_is_lockless(const struct osc_lock *olck)
72 {
73         return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops);
74 }
75
76 /**
77  * Returns a weak pointer to the ldlm lock identified by a handle. Returned
78  * pointer cannot be dereferenced, as lock is not protected from concurrent
79  * reclaim. This function is a helper for osc_lock_invariant().
80  */
81 static struct ldlm_lock *osc_handle_ptr(struct lustre_handle *handle)
82 {
83         struct ldlm_lock *lock;
84
85         lock = ldlm_handle2lock(handle);
86         if (lock != NULL)
87                 LDLM_LOCK_PUT(lock);
88         return lock;
89 }
90
91 /**
92  * Invariant that has to be true all of the time.
93  */
94 static int osc_lock_invariant(struct osc_lock *ols)
95 {
96         struct ldlm_lock *lock        = osc_handle_ptr(&ols->ols_handle);
97         struct ldlm_lock *olock       = ols->ols_lock;
98         int               handle_used = lustre_handle_is_used(&ols->ols_handle);
99
100         return
101                 ergo(osc_lock_is_lockless(ols),
102                      ols->ols_locklessable && ols->ols_lock == NULL)  ||
103                 (ergo(olock != NULL, handle_used) &&
104                  ergo(olock != NULL,
105                       olock->l_handle.h_cookie == ols->ols_handle.cookie) &&
106                  /*
107                   * Check that ->ols_handle and ->ols_lock are consistent, but
108                   * take into account that they are set at the different time.
109                   */
110                  ergo(handle_used,
111                       ergo(lock != NULL && olock != NULL, lock == olock) &&
112                       ergo(lock == NULL, olock == NULL)) &&
113                  ergo(ols->ols_state == OLS_CANCELLED,
114                       olock == NULL && !handle_used) &&
115                  /*
116                   * DLM lock is destroyed only after we have seen cancellation
117                   * ast.
118                   */
119                  ergo(olock != NULL && ols->ols_state < OLS_CANCELLED,
120                       !olock->l_destroyed) &&
121                  ergo(ols->ols_state == OLS_GRANTED,
122                       olock != NULL &&
123                       olock->l_req_mode == olock->l_granted_mode &&
124                       ols->ols_hold));
125 }
126
127 /*****************************************************************************
128  *
129  * Lock operations.
130  *
131  */
132
133 /**
134  * Breaks a link between osc_lock and dlm_lock.
135  */
136 static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck)
137 {
138         struct ldlm_lock *dlmlock;
139
140         spin_lock(&osc_ast_guard);
141         dlmlock = olck->ols_lock;
142         if (dlmlock == NULL) {
143                 spin_unlock(&osc_ast_guard);
144                 return;
145         }
146
147         olck->ols_lock = NULL;
148         /* wb(); --- for all who checks (ols->ols_lock != NULL) before
149          * call to osc_lock_detach() */
150         dlmlock->l_ast_data = NULL;
151         olck->ols_handle.cookie = 0ULL;
152         spin_unlock(&osc_ast_guard);
153
154         lock_res_and_lock(dlmlock);
155         if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
156                 struct cl_object *obj = olck->ols_cl.cls_obj;
157                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
158                 __u64 old_kms;
159
160                 cl_object_attr_lock(obj);
161                 /* Must get the value under the lock to avoid possible races. */
162                 old_kms = cl2osc(obj)->oo_oinfo->loi_kms;
163                 /* Update the kms. Need to loop all granted locks.
164                  * Not a problem for the client */
165                 attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms);
166
167                 cl_object_attr_set(env, obj, attr, CAT_KMS);
168                 cl_object_attr_unlock(obj);
169         }
170         unlock_res_and_lock(dlmlock);
171
172         /* release a reference taken in osc_lock_upcall0(). */
173         LASSERT(olck->ols_has_ref);
174         lu_ref_del(&dlmlock->l_reference, "osc_lock", olck);
175         LDLM_LOCK_RELEASE(dlmlock);
176         olck->ols_has_ref = 0;
177 }
178
179 static int osc_lock_unhold(struct osc_lock *ols)
180 {
181         int result = 0;
182
183         if (ols->ols_hold) {
184                 ols->ols_hold = 0;
185                 result = osc_cancel_base(&ols->ols_handle,
186                                          ols->ols_einfo.ei_mode);
187         }
188         return result;
189 }
190
191 static int osc_lock_unuse(const struct lu_env *env,
192                           const struct cl_lock_slice *slice)
193 {
194         struct osc_lock *ols = cl2osc_lock(slice);
195
196         LINVRNT(osc_lock_invariant(ols));
197
198         switch (ols->ols_state) {
199         case OLS_NEW:
200                 LASSERT(!ols->ols_hold);
201                 LASSERT(ols->ols_agl);
202                 return 0;
203         case OLS_UPCALL_RECEIVED:
204                 osc_lock_unhold(ols);
205         case OLS_ENQUEUED:
206                 LASSERT(!ols->ols_hold);
207                 osc_lock_detach(env, ols);
208                 ols->ols_state = OLS_NEW;
209                 return 0;
210         case OLS_GRANTED:
211                 LASSERT(!ols->ols_glimpse);
212                 LASSERT(ols->ols_hold);
213                 /*
214                  * Move lock into OLS_RELEASED state before calling
215                  * osc_cancel_base() so that possible synchronous cancellation
216                  * (that always happens e.g., for liblustre) sees that lock is
217                  * released.
218                  */
219                 ols->ols_state = OLS_RELEASED;
220                 return osc_lock_unhold(ols);
221         default:
222                 CERROR("Impossible state: %d\n", ols->ols_state);
223                 LBUG();
224         }
225 }
226
227 static void osc_lock_fini(const struct lu_env *env,
228                           struct cl_lock_slice *slice)
229 {
230         struct osc_lock  *ols = cl2osc_lock(slice);
231
232         LINVRNT(osc_lock_invariant(ols));
233         /*
234          * ->ols_hold can still be true at this point if, for example, a
235          * thread that requested a lock was killed (and released a reference
236          * to the lock), before reply from a server was received. In this case
237          * lock is destroyed immediately after upcall.
238          */
239         osc_lock_unhold(ols);
240         LASSERT(ols->ols_lock == NULL);
241         LASSERT(cfs_atomic_read(&ols->ols_pageref) == 0 ||
242                 cfs_atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC);
243
244         OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
245 }
246
247 static void osc_lock_build_policy(const struct lu_env *env,
248                                   const struct cl_lock *lock,
249                                   ldlm_policy_data_t *policy)
250 {
251         const struct cl_lock_descr *d = &lock->cll_descr;
252
253         osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
254         policy->l_extent.gid = d->cld_gid;
255 }
256
257 static __u64 osc_enq2ldlm_flags(__u32 enqflags)
258 {
259         __u64 result = 0;
260
261         LASSERT((enqflags & ~CEF_MASK) == 0);
262
263         if (enqflags & CEF_NONBLOCK)
264                 result |= LDLM_FL_BLOCK_NOWAIT;
265         if (enqflags & CEF_ASYNC)
266                 result |= LDLM_FL_HAS_INTENT;
267         if (enqflags & CEF_DISCARD_DATA)
268                 result |= LDLM_AST_DISCARD_DATA;
269         return result;
270 }
271
272 /**
273  * Global spin-lock protecting consistency of ldlm_lock::l_ast_data
274  * pointers. Initialized in osc_init().
275  */
276 spinlock_t osc_ast_guard;
277
278 static struct osc_lock *osc_ast_data_get(struct ldlm_lock *dlm_lock)
279 {
280         struct osc_lock *olck;
281
282         lock_res_and_lock(dlm_lock);
283         spin_lock(&osc_ast_guard);
284         olck = dlm_lock->l_ast_data;
285         if (olck != NULL) {
286                 struct cl_lock *lock = olck->ols_cl.cls_lock;
287                 /*
288                  * If osc_lock holds a reference on ldlm lock, return it even
289                  * when cl_lock is in CLS_FREEING state. This way
290                  *
291                  *         osc_ast_data_get(dlmlock) == NULL
292                  *
293                  * guarantees that all osc references on dlmlock were
294                  * released. osc_dlm_blocking_ast0() relies on that.
295                  */
296                 if (lock->cll_state < CLS_FREEING || olck->ols_has_ref) {
297                         cl_lock_get_trust(lock);
298                         lu_ref_add_atomic(&lock->cll_reference,
299                                           "ast", cfs_current());
300                 } else
301                         olck = NULL;
302         }
303         spin_unlock(&osc_ast_guard);
304         unlock_res_and_lock(dlm_lock);
305         return olck;
306 }
307
308 static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck)
309 {
310         struct cl_lock *lock;
311
312         lock = olck->ols_cl.cls_lock;
313         lu_ref_del(&lock->cll_reference, "ast", cfs_current());
314         cl_lock_put(env, lock);
315 }
316
317 /**
318  * Updates object attributes from a lock value block (lvb) received together
319  * with the DLM lock reply from the server. Copy of osc_update_enqueue()
320  * logic.
321  *
322  * This can be optimized to not update attributes when lock is a result of a
323  * local match.
324  *
325  * Called under lock and resource spin-locks.
326  */
327 static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
328                                 int rc)
329 {
330         struct ost_lvb    *lvb;
331         struct cl_object  *obj;
332         struct lov_oinfo  *oinfo;
333         struct cl_attr    *attr;
334         unsigned           valid;
335
336         ENTRY;
337
338         if (!(olck->ols_flags & LDLM_FL_LVB_READY))
339                 RETURN_EXIT;
340
341         lvb   = &olck->ols_lvb;
342         obj   = olck->ols_cl.cls_obj;
343         oinfo = cl2osc(obj)->oo_oinfo;
344         attr  = &osc_env_info(env)->oti_attr;
345         valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE;
346         cl_lvb2attr(attr, lvb);
347
348         cl_object_attr_lock(obj);
349         if (rc == 0) {
350                 struct ldlm_lock  *dlmlock;
351                 __u64 size;
352
353                 dlmlock = olck->ols_lock;
354                 LASSERT(dlmlock != NULL);
355
356                 /* re-grab LVB from a dlm lock under DLM spin-locks. */
357                 *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
358                 size = lvb->lvb_size;
359                 /* Extend KMS up to the end of this lock and no further
360                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
361                 if (size > dlmlock->l_policy_data.l_extent.end)
362                         size = dlmlock->l_policy_data.l_extent.end + 1;
363                 if (size >= oinfo->loi_kms) {
364                         LDLM_DEBUG(dlmlock, "lock acquired, setting rss="LPU64
365                                    ", kms="LPU64, lvb->lvb_size, size);
366                         valid |= CAT_KMS;
367                         attr->cat_kms = size;
368                 } else {
369                         LDLM_DEBUG(dlmlock, "lock acquired, setting rss="
370                                    LPU64"; leaving kms="LPU64", end="LPU64,
371                                    lvb->lvb_size, oinfo->loi_kms,
372                                    dlmlock->l_policy_data.l_extent.end);
373                 }
374                 ldlm_lock_allow_match_locked(dlmlock);
375         } else if (rc == -ENAVAIL && olck->ols_glimpse) {
376                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
377                        " kms="LPU64"\n", lvb->lvb_size, oinfo->loi_kms);
378         } else
379                 valid = 0;
380
381         if (valid != 0)
382                 cl_object_attr_set(env, obj, attr, valid);
383
384         cl_object_attr_unlock(obj);
385
386         EXIT;
387 }
388
389 /**
390  * Called when a lock is granted, from an upcall (when server returned a
391  * granted lock), or from completion AST, when server returned a blocked lock.
392  *
393  * Called under lock and resource spin-locks, that are released temporarily
394  * here.
395  */
396 static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
397                              struct ldlm_lock *dlmlock, int rc)
398 {
399         struct ldlm_extent   *ext;
400         struct cl_lock       *lock;
401         struct cl_lock_descr *descr;
402
403         LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode);
404
405         ENTRY;
406         if (olck->ols_state < OLS_GRANTED) {
407                 lock  = olck->ols_cl.cls_lock;
408                 ext   = &dlmlock->l_policy_data.l_extent;
409                 descr = &osc_env_info(env)->oti_descr;
410                 descr->cld_obj = lock->cll_descr.cld_obj;
411
412                 /* XXX check that ->l_granted_mode is valid. */
413                 descr->cld_mode  = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
414                 descr->cld_start = cl_index(descr->cld_obj, ext->start);
415                 descr->cld_end   = cl_index(descr->cld_obj, ext->end);
416                 descr->cld_gid   = ext->gid;
417                 /*
418                  * tell upper layers the extent of the lock that was actually
419                  * granted
420                  */
421                 olck->ols_state = OLS_GRANTED;
422                 osc_lock_lvb_update(env, olck, rc);
423
424                 /* release DLM spin-locks to allow cl_lock_{modify,signal}()
425                  * to take a semaphore on a parent lock. This is safe, because
426                  * spin-locks are needed to protect consistency of
427                  * dlmlock->l_*_mode and LVB, and we have finished processing
428                  * them. */
429                 unlock_res_and_lock(dlmlock);
430                 cl_lock_modify(env, lock, descr);
431                 cl_lock_signal(env, lock);
432                 LINVRNT(osc_lock_invariant(olck));
433                 lock_res_and_lock(dlmlock);
434         }
435         EXIT;
436 }
437
438 static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck)
439
440 {
441         struct ldlm_lock *dlmlock;
442
443         ENTRY;
444
445         dlmlock = ldlm_handle2lock_long(&olck->ols_handle, 0);
446         LASSERT(dlmlock != NULL);
447
448         lock_res_and_lock(dlmlock);
449         spin_lock(&osc_ast_guard);
450         LASSERT(dlmlock->l_ast_data == olck);
451         LASSERT(olck->ols_lock == NULL);
452         olck->ols_lock = dlmlock;
453         spin_unlock(&osc_ast_guard);
454
455         /*
456          * Lock might be not yet granted. In this case, completion ast
457          * (osc_ldlm_completion_ast()) comes later and finishes lock
458          * granting.
459          */
460         if (dlmlock->l_granted_mode == dlmlock->l_req_mode)
461                 osc_lock_granted(env, olck, dlmlock, 0);
462         unlock_res_and_lock(dlmlock);
463
464         /*
465          * osc_enqueue_interpret() decrefs asynchronous locks, counter
466          * this.
467          */
468         ldlm_lock_addref(&olck->ols_handle, olck->ols_einfo.ei_mode);
469         olck->ols_hold = 1;
470
471         /* lock reference taken by ldlm_handle2lock_long() is owned by
472          * osc_lock and released in osc_lock_detach() */
473         lu_ref_add(&dlmlock->l_reference, "osc_lock", olck);
474         olck->ols_has_ref = 1;
475 }
476
477 /**
478  * Lock upcall function that is executed either when a reply to ENQUEUE rpc is
479  * received from a server, or after osc_enqueue_base() matched a local DLM
480  * lock.
481  */
482 static int osc_lock_upcall(void *cookie, int errcode)
483 {
484         struct osc_lock         *olck  = cookie;
485         struct cl_lock_slice    *slice = &olck->ols_cl;
486         struct cl_lock          *lock  = slice->cls_lock;
487         struct lu_env           *env;
488         struct cl_env_nest       nest;
489
490         ENTRY;
491         env = cl_env_nested_get(&nest);
492         if (!IS_ERR(env)) {
493                 int rc;
494
495                 cl_lock_mutex_get(env, lock);
496
497                 LASSERT(lock->cll_state >= CLS_QUEUING);
498                 if (olck->ols_state == OLS_ENQUEUED) {
499                         olck->ols_state = OLS_UPCALL_RECEIVED;
500                         rc = ldlm_error2errno(errcode);
501                 } else if (olck->ols_state == OLS_CANCELLED) {
502                         rc = -EIO;
503                 } else {
504                         CERROR("Impossible state: %d\n", olck->ols_state);
505                         LBUG();
506                 }
507                 if (rc) {
508                         struct ldlm_lock *dlmlock;
509
510                         dlmlock = ldlm_handle2lock(&olck->ols_handle);
511                         if (dlmlock != NULL) {
512                                 lock_res_and_lock(dlmlock);
513                                 spin_lock(&osc_ast_guard);
514                                 LASSERT(olck->ols_lock == NULL);
515                                 dlmlock->l_ast_data = NULL;
516                                 olck->ols_handle.cookie = 0ULL;
517                                 spin_unlock(&osc_ast_guard);
518                                 ldlm_lock_fail_match_locked(dlmlock);
519                                 unlock_res_and_lock(dlmlock);
520                                 LDLM_LOCK_PUT(dlmlock);
521                         }
522                 } else {
523                         if (olck->ols_glimpse)
524                                 olck->ols_glimpse = 0;
525                         osc_lock_upcall0(env, olck);
526                 }
527
528                 /* Error handling, some errors are tolerable. */
529                 if (olck->ols_locklessable && rc == -EUSERS) {
530                         /* This is a tolerable error, turn this lock into
531                          * lockless lock.
532                          */
533                         osc_object_set_contended(cl2osc(slice->cls_obj));
534                         LASSERT(slice->cls_ops == &osc_lock_ops);
535
536                         /* Change this lock to ldlmlock-less lock. */
537                         osc_lock_to_lockless(env, olck, 1);
538                         olck->ols_state = OLS_GRANTED;
539                         rc = 0;
540                 } else if (olck->ols_glimpse && rc == -ENAVAIL) {
541                         osc_lock_lvb_update(env, olck, rc);
542                         cl_lock_delete(env, lock);
543                         /* Hide the error. */
544                         rc = 0;
545                 }
546
547                 if (rc == 0) {
548                         /* For AGL case, the RPC sponsor may exits the cl_lock
549                         *  processing without wait() called before related OSC
550                         *  lock upcall(). So update the lock status according
551                         *  to the enqueue result inside AGL upcall(). */
552                         if (olck->ols_agl) {
553                                 lock->cll_flags |= CLF_FROM_UPCALL;
554                                 cl_wait_try(env, lock);
555                                 lock->cll_flags &= ~CLF_FROM_UPCALL;
556                                 if (!olck->ols_glimpse)
557                                         olck->ols_agl = 0;
558                         }
559                         cl_lock_signal(env, lock);
560                         /* del user for lock upcall cookie */
561                         cl_unuse_try(env, lock);
562                 } else {
563                         /* del user for lock upcall cookie */
564                         cl_lock_user_del(env, lock);
565                         cl_lock_error(env, lock, rc);
566                 }
567
568                 /* release cookie reference, acquired by osc_lock_enqueue() */
569                 cl_lock_hold_release(env, lock, "upcall", lock);
570                 cl_lock_mutex_put(env, lock);
571
572                 lu_ref_del(&lock->cll_reference, "upcall", lock);
573                 /* This maybe the last reference, so must be called after
574                  * cl_lock_mutex_put(). */
575                 cl_lock_put(env, lock);
576
577                 cl_env_nested_put(&nest, env);
578         } else {
579                 /* should never happen, similar to osc_ldlm_blocking_ast(). */
580                 LBUG();
581         }
582         RETURN(errcode);
583 }
584
585 /**
586  * Core of osc_dlm_blocking_ast() logic.
587  */
588 static void osc_lock_blocking(const struct lu_env *env,
589                               struct ldlm_lock *dlmlock,
590                               struct osc_lock *olck, int blocking)
591 {
592         struct cl_lock *lock = olck->ols_cl.cls_lock;
593
594         LASSERT(olck->ols_lock == dlmlock);
595         CLASSERT(OLS_BLOCKED < OLS_CANCELLED);
596         LASSERT(!osc_lock_is_lockless(olck));
597
598         /*
599          * Lock might be still addref-ed here, if e.g., blocking ast
600          * is sent for a failed lock.
601          */
602         osc_lock_unhold(olck);
603
604         if (blocking && olck->ols_state < OLS_BLOCKED)
605                 /*
606                  * Move osc_lock into OLS_BLOCKED before canceling the lock,
607                  * because it recursively re-enters osc_lock_blocking(), with
608                  * the state set to OLS_CANCELLED.
609                  */
610                 olck->ols_state = OLS_BLOCKED;
611         /*
612          * cancel and destroy lock at least once no matter how blocking ast is
613          * entered (see comment above osc_ldlm_blocking_ast() for use
614          * cases). cl_lock_cancel() and cl_lock_delete() are idempotent.
615          */
616         cl_lock_cancel(env, lock);
617         cl_lock_delete(env, lock);
618 }
619
620 /**
621  * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock
622  * and ldlm_lock caches.
623  */
624 static int osc_dlm_blocking_ast0(const struct lu_env *env,
625                                  struct ldlm_lock *dlmlock,
626                                  void *data, int flag)
627 {
628         struct osc_lock *olck;
629         struct cl_lock  *lock;
630         int result;
631         int cancel;
632
633         LASSERT(flag == LDLM_CB_BLOCKING || flag == LDLM_CB_CANCELING);
634
635         cancel = 0;
636         olck = osc_ast_data_get(dlmlock);
637         if (olck != NULL) {
638                 lock = olck->ols_cl.cls_lock;
639                 cl_lock_mutex_get(env, lock);
640                 LINVRNT(osc_lock_invariant(olck));
641                 if (olck->ols_ast_wait) {
642                         /* wake up osc_lock_use() */
643                         cl_lock_signal(env, lock);
644                         olck->ols_ast_wait = 0;
645                 }
646                 /*
647                  * Lock might have been canceled while this thread was
648                  * sleeping for lock mutex, but olck is pinned in memory.
649                  */
650                 if (olck == dlmlock->l_ast_data) {
651                         /*
652                          * NOTE: DLM sends blocking AST's for failed locks
653                          *       (that are still in pre-OLS_GRANTED state)
654                          *       too, and they have to be canceled otherwise
655                          *       DLM lock is never destroyed and stuck in
656                          *       the memory.
657                          *
658                          *       Alternatively, ldlm_cli_cancel() can be
659                          *       called here directly for osc_locks with
660                          *       ols_state < OLS_GRANTED to maintain an
661                          *       invariant that ->clo_cancel() is only called
662                          *       for locks that were granted.
663                          */
664                         LASSERT(data == olck);
665                         osc_lock_blocking(env, dlmlock,
666                                           olck, flag == LDLM_CB_BLOCKING);
667                 } else
668                         cancel = 1;
669                 cl_lock_mutex_put(env, lock);
670                 osc_ast_data_put(env, olck);
671         } else
672                 /*
673                  * DLM lock exists, but there is no cl_lock attached to it.
674                  * This is a `normal' race. cl_object and its cl_lock's can be
675                  * removed by memory pressure, together with all pages.
676                  */
677                 cancel = (flag == LDLM_CB_BLOCKING);
678
679         if (cancel) {
680                 struct lustre_handle *lockh;
681
682                 lockh = &osc_env_info(env)->oti_handle;
683                 ldlm_lock2handle(dlmlock, lockh);
684                 result = ldlm_cli_cancel(lockh, LCF_ASYNC);
685         } else
686                 result = 0;
687         return result;
688 }
689
690 /**
691  * Blocking ast invoked by ldlm when dlm lock is either blocking progress of
692  * some other lock, or is canceled. This function is installed as a
693  * ldlm_lock::l_blocking_ast() for client extent locks.
694  *
695  * Control flow is tricky, because ldlm uses the same call-back
696  * (ldlm_lock::l_blocking_ast()) for both blocking and cancellation ast's.
697  *
698  * \param dlmlock lock for which ast occurred.
699  *
700  * \param new description of a conflicting lock in case of blocking ast.
701  *
702  * \param data value of dlmlock->l_ast_data
703  *
704  * \param flag LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
705  *             cancellation and blocking ast's.
706  *
707  * Possible use cases:
708  *
709  *     - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING) to cancel
710  *       lock due to lock lru pressure, or explicit user request to purge
711  *       locks.
712  *
713  *     - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_BLOCKING) to notify
714  *       us that dlmlock conflicts with another lock that some client is
715  *       enqueing. Lock is canceled.
716  *
717  *           - cl_lock_cancel() is called. osc_lock_cancel() calls
718  *             ldlm_cli_cancel() that calls
719  *
720  *                  dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
721  *
722  *             recursively entering osc_ldlm_blocking_ast().
723  *
724  *     - client cancels lock voluntary (e.g., as a part of early cancellation):
725  *
726  *           cl_lock_cancel()->
727  *             osc_lock_cancel()->
728  *               ldlm_cli_cancel()->
729  *                 dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
730  *
731  */
732 static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
733                                  struct ldlm_lock_desc *new, void *data,
734                                  int flag)
735 {
736         struct lu_env     *env;
737         struct cl_env_nest nest;
738         int                result;
739
740         /*
741          * This can be called in the context of outer IO, e.g.,
742          *
743          *     cl_enqueue()->...
744          *       ->osc_enqueue_base()->...
745          *         ->ldlm_prep_elc_req()->...
746          *           ->ldlm_cancel_callback()->...
747          *             ->osc_ldlm_blocking_ast()
748          *
749          * new environment has to be created to not corrupt outer context.
750          */
751         env = cl_env_nested_get(&nest);
752         if (!IS_ERR(env)) {
753                 result = osc_dlm_blocking_ast0(env, dlmlock, data, flag);
754                 cl_env_nested_put(&nest, env);
755         } else {
756                 result = PTR_ERR(env);
757                 /*
758                  * XXX This should never happen, as cl_lock is
759                  * stuck. Pre-allocated environment a la vvp_inode_fini_env
760                  * should be used.
761                  */
762                 LBUG();
763         }
764         if (result != 0) {
765                 if (result == -ENODATA)
766                         result = 0;
767                 else
768                         CERROR("BAST failed: %d\n", result);
769         }
770         return result;
771 }
772
773 static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock,
774                                    __u64 flags, void *data)
775 {
776         struct cl_env_nest nest;
777         struct lu_env     *env;
778         struct osc_lock   *olck;
779         struct cl_lock    *lock;
780         int result;
781         int dlmrc;
782
783         /* first, do dlm part of the work */
784         dlmrc = ldlm_completion_ast_async(dlmlock, flags, data);
785         /* then, notify cl_lock */
786         env = cl_env_nested_get(&nest);
787         if (!IS_ERR(env)) {
788                 olck = osc_ast_data_get(dlmlock);
789                 if (olck != NULL) {
790                         lock = olck->ols_cl.cls_lock;
791                         cl_lock_mutex_get(env, lock);
792                         /*
793                          * ldlm_handle_cp_callback() copied LVB from request
794                          * to lock->l_lvb_data, store it in osc_lock.
795                          */
796                         LASSERT(dlmlock->l_lvb_data != NULL);
797                         lock_res_and_lock(dlmlock);
798                         olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
799                         if (olck->ols_lock == NULL) {
800                                 /*
801                                  * upcall (osc_lock_upcall()) hasn't yet been
802                                  * called. Do nothing now, upcall will bind
803                                  * olck to dlmlock and signal the waiters.
804                                  *
805                                  * This maintains an invariant that osc_lock
806                                  * and ldlm_lock are always bound when
807                                  * osc_lock is in OLS_GRANTED state.
808                                  */
809                         } else if (dlmlock->l_granted_mode ==
810                                    dlmlock->l_req_mode) {
811                                 osc_lock_granted(env, olck, dlmlock, dlmrc);
812                         }
813                         unlock_res_and_lock(dlmlock);
814
815                         if (dlmrc != 0) {
816                                 CL_LOCK_DEBUG(D_ERROR, env, lock,
817                                               "dlmlock returned %d\n", dlmrc);
818                                 cl_lock_error(env, lock, dlmrc);
819                         }
820                         cl_lock_mutex_put(env, lock);
821                         osc_ast_data_put(env, olck);
822                         result = 0;
823                 } else
824                         result = -ELDLM_NO_LOCK_DATA;
825                 cl_env_nested_put(&nest, env);
826         } else
827                 result = PTR_ERR(env);
828         return dlmrc ?: result;
829 }
830
831 static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
832 {
833         struct ptlrpc_request  *req  = data;
834         struct osc_lock        *olck;
835         struct cl_lock         *lock;
836         struct cl_object       *obj;
837         struct cl_env_nest      nest;
838         struct lu_env          *env;
839         struct ost_lvb         *lvb;
840         struct req_capsule     *cap;
841         int                     result;
842
843         LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
844
845         env = cl_env_nested_get(&nest);
846         if (!IS_ERR(env)) {
847                 /* osc_ast_data_get() has to go after environment is
848                  * allocated, because osc_ast_data() acquires a
849                  * reference to a lock, and it can only be released in
850                  * environment.
851                  */
852                 olck = osc_ast_data_get(dlmlock);
853                 if (olck != NULL) {
854                         lock = olck->ols_cl.cls_lock;
855                         /* Do not grab the mutex of cl_lock for glimpse.
856                          * See LU-1274 for details.
857                          * BTW, it's okay for cl_lock to be cancelled during
858                          * this period because server can handle this race.
859                          * See ldlm_server_glimpse_ast() for details.
860                          * cl_lock_mutex_get(env, lock); */
861                         cap = &req->rq_pill;
862                         req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK);
863                         req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER,
864                                              sizeof *lvb);
865                         result = req_capsule_server_pack(cap);
866                         if (result == 0) {
867                                 lvb = req_capsule_server_get(cap, &RMF_DLM_LVB);
868                                 obj = lock->cll_descr.cld_obj;
869                                 result = cl_object_glimpse(env, obj, lvb);
870                         }
871                         if (!exp_connect_lvb_type(req->rq_export))
872                                 req_capsule_shrink(&req->rq_pill,
873                                                    &RMF_DLM_LVB,
874                                                    sizeof(struct ost_lvb_v1),
875                                                    RCL_SERVER);
876                         osc_ast_data_put(env, olck);
877                 } else {
878                         /*
879                          * These errors are normal races, so we don't want to
880                          * fill the console with messages by calling
881                          * ptlrpc_error()
882                          */
883                         lustre_pack_reply(req, 1, NULL, NULL);
884                         result = -ELDLM_NO_LOCK_DATA;
885                 }
886                 cl_env_nested_put(&nest, env);
887         } else
888                 result = PTR_ERR(env);
889         req->rq_status = result;
890         return result;
891 }
892
893 static unsigned long osc_lock_weigh(const struct lu_env *env,
894                                     const struct cl_lock_slice *slice)
895 {
896         /*
897          * don't need to grab coh_page_guard since we don't care the exact #
898          * of pages..
899          */
900         return cl_object_header(slice->cls_obj)->coh_pages;
901 }
902
903 /**
904  * Get the weight of dlm lock for early cancellation.
905  *
906  * XXX: it should return the pages covered by this \a dlmlock.
907  */
908 static unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
909 {
910         struct cl_env_nest       nest;
911         struct lu_env           *env;
912         struct osc_lock         *lock;
913         struct cl_lock          *cll;
914         unsigned long            weight;
915         ENTRY;
916
917         cfs_might_sleep();
918         /*
919          * osc_ldlm_weigh_ast has a complex context since it might be called
920          * because of lock canceling, or from user's input. We have to make
921          * a new environment for it. Probably it is implementation safe to use
922          * the upper context because cl_lock_put don't modify environment
923          * variables. But in case of ..
924          */
925         env = cl_env_nested_get(&nest);
926         if (IS_ERR(env))
927                 /* Mostly because lack of memory, tend to eliminate this lock*/
928                 RETURN(0);
929
930         LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT);
931         lock = osc_ast_data_get(dlmlock);
932         if (lock == NULL) {
933                 /* cl_lock was destroyed because of memory pressure.
934                  * It is much reasonable to assign this type of lock
935                  * a lower cost.
936                  */
937                 GOTO(out, weight = 0);
938         }
939
940         cll = lock->ols_cl.cls_lock;
941         cl_lock_mutex_get(env, cll);
942         weight = cl_lock_weigh(env, cll);
943         cl_lock_mutex_put(env, cll);
944         osc_ast_data_put(env, lock);
945         EXIT;
946
947 out:
948         cl_env_nested_put(&nest, env);
949         return weight;
950 }
951
952 static void osc_lock_build_einfo(const struct lu_env *env,
953                                  const struct cl_lock *clock,
954                                  struct osc_lock *lock,
955                                  struct ldlm_enqueue_info *einfo)
956 {
957         enum cl_lock_mode mode;
958
959         mode = clock->cll_descr.cld_mode;
960         if (mode == CLM_PHANTOM)
961                 /*
962                  * For now, enqueue all glimpse locks in read mode. In the
963                  * future, client might choose to enqueue LCK_PW lock for
964                  * glimpse on a file opened for write.
965                  */
966                 mode = CLM_READ;
967
968         einfo->ei_type   = LDLM_EXTENT;
969         einfo->ei_mode   = osc_cl_lock2ldlm(mode);
970         einfo->ei_cb_bl  = osc_ldlm_blocking_ast;
971         einfo->ei_cb_cp  = osc_ldlm_completion_ast;
972         einfo->ei_cb_gl  = osc_ldlm_glimpse_ast;
973         einfo->ei_cb_wg  = osc_ldlm_weigh_ast;
974         einfo->ei_cbdata = lock; /* value to be put into ->l_ast_data */
975 }
976
977 /**
978  * Determine if the lock should be converted into a lockless lock.
979  *
980  * Steps to check:
981  * - if the lock has an explicite requirment for a non-lockless lock;
982  * - if the io lock request type ci_lockreq;
983  * - send the enqueue rpc to ost to make the further decision;
984  * - special treat to truncate lockless lock
985  *
986  *  Additional policy can be implemented here, e.g., never do lockless-io
987  *  for large extents.
988  */
989 static void osc_lock_to_lockless(const struct lu_env *env,
990                                  struct osc_lock *ols, int force)
991 {
992         struct cl_lock_slice *slice = &ols->ols_cl;
993
994         LASSERT(ols->ols_state == OLS_NEW ||
995                 ols->ols_state == OLS_UPCALL_RECEIVED);
996
997         if (force) {
998                 ols->ols_locklessable = 1;
999                 slice->cls_ops = &osc_lock_lockless_ops;
1000         } else {
1001                 struct osc_io *oio     = osc_env_io(env);
1002                 struct cl_io  *io      = oio->oi_cl.cis_io;
1003                 struct cl_object *obj  = slice->cls_obj;
1004                 struct osc_object *oob = cl2osc(obj);
1005                 const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
1006                 struct obd_connect_data *ocd;
1007
1008                 LASSERT(io->ci_lockreq == CILR_MANDATORY ||
1009                         io->ci_lockreq == CILR_MAYBE ||
1010                         io->ci_lockreq == CILR_NEVER);
1011
1012                 ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
1013                 ols->ols_locklessable = (io->ci_type != CIT_SETATTR) &&
1014                                 (io->ci_lockreq == CILR_MAYBE) &&
1015                                 (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
1016                 if (io->ci_lockreq == CILR_NEVER ||
1017                         /* lockless IO */
1018                     (ols->ols_locklessable && osc_object_is_contended(oob)) ||
1019                         /* lockless truncate */
1020                     (cl_io_is_trunc(io) &&
1021                      (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
1022                       osd->od_lockless_truncate)) {
1023                         ols->ols_locklessable = 1;
1024                         slice->cls_ops = &osc_lock_lockless_ops;
1025                 }
1026         }
1027         LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
1028 }
1029
1030 static int osc_lock_compatible(const struct osc_lock *qing,
1031                                const struct osc_lock *qed)
1032 {
1033         enum cl_lock_mode qing_mode;
1034         enum cl_lock_mode qed_mode;
1035
1036         qing_mode = qing->ols_cl.cls_lock->cll_descr.cld_mode;
1037         if (qed->ols_glimpse &&
1038             (qed->ols_state >= OLS_UPCALL_RECEIVED || qing_mode == CLM_READ))
1039                 return 1;
1040
1041         qed_mode = qed->ols_cl.cls_lock->cll_descr.cld_mode;
1042         return ((qing_mode == CLM_READ) && (qed_mode == CLM_READ));
1043 }
1044
1045 /**
1046  * Cancel all conflicting locks and wait for them to be destroyed.
1047  *
1048  * This function is used for two purposes:
1049  *
1050  *     - early cancel all conflicting locks before starting IO, and
1051  *
1052  *     - guarantee that pages added to the page cache by lockless IO are never
1053  *       covered by locks other than lockless IO lock, and, hence, are not
1054  *       visible to other threads.
1055  */
1056 static int osc_lock_enqueue_wait(const struct lu_env *env,
1057                                  const struct osc_lock *olck)
1058 {
1059         struct cl_lock          *lock    = olck->ols_cl.cls_lock;
1060         struct cl_lock_descr    *descr   = &lock->cll_descr;
1061         struct cl_object_header *hdr     = cl_object_header(descr->cld_obj);
1062         struct cl_lock          *scan;
1063         struct cl_lock          *conflict= NULL;
1064         int lockless                     = osc_lock_is_lockless(olck);
1065         int rc                           = 0;
1066         ENTRY;
1067
1068         LASSERT(cl_lock_is_mutexed(lock));
1069
1070         /* make it enqueue anyway for glimpse lock, because we actually
1071          * don't need to cancel any conflicting locks. */
1072         if (olck->ols_glimpse)
1073                 return 0;
1074
1075         spin_lock(&hdr->coh_lock_guard);
1076         cfs_list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) {
1077                 struct cl_lock_descr *cld = &scan->cll_descr;
1078                 const struct osc_lock *scan_ols;
1079
1080                 if (scan == lock)
1081                         break;
1082
1083                 if (scan->cll_state < CLS_QUEUING ||
1084                     scan->cll_state == CLS_FREEING ||
1085                     cld->cld_start > descr->cld_end ||
1086                     cld->cld_end < descr->cld_start)
1087                         continue;
1088
1089                 /* overlapped and living locks. */
1090
1091                 /* We're not supposed to give up group lock. */
1092                 if (scan->cll_descr.cld_mode == CLM_GROUP) {
1093                         LASSERT(descr->cld_mode != CLM_GROUP ||
1094                                 descr->cld_gid != scan->cll_descr.cld_gid);
1095                         continue;
1096                 }
1097
1098                 scan_ols = osc_lock_at(scan);
1099
1100                 /* We need to cancel the compatible locks if we're enqueuing
1101                  * a lockless lock, for example:
1102                  * imagine that client has PR lock on [0, 1000], and thread T0
1103                  * is doing lockless IO in [500, 1500] region. Concurrent
1104                  * thread T1 can see lockless data in [500, 1000], which is
1105                  * wrong, because these data are possibly stale. */
1106                 if (!lockless && osc_lock_compatible(olck, scan_ols))
1107                         continue;
1108
1109                 cl_lock_get_trust(scan);
1110                 conflict = scan;
1111                 break;
1112         }
1113         spin_unlock(&hdr->coh_lock_guard);
1114
1115         if (conflict) {
1116                 if (lock->cll_descr.cld_mode == CLM_GROUP) {
1117                         /* we want a group lock but a previous lock request
1118                          * conflicts, we do not wait but return 0 so the
1119                          * request is send to the server
1120                          */
1121                         CDEBUG(D_DLMTRACE, "group lock %p is conflicted "
1122                                            "with %p, no wait, send to server\n",
1123                                lock, conflict);
1124                         cl_lock_put(env, conflict);
1125                         rc = 0;
1126                 } else {
1127                         CDEBUG(D_DLMTRACE, "lock %p is conflicted with %p, "
1128                                            "will wait\n",
1129                                lock, conflict);
1130                         LASSERT(lock->cll_conflict == NULL);
1131                         lu_ref_add(&conflict->cll_reference, "cancel-wait",
1132                                    lock);
1133                         lock->cll_conflict = conflict;
1134                         rc = CLO_WAIT;
1135                 }
1136         }
1137         RETURN(rc);
1138 }
1139
1140 /**
1141  * Implementation of cl_lock_operations::clo_enqueue() method for osc
1142  * layer. This initiates ldlm enqueue:
1143  *
1144  *     - cancels conflicting locks early (osc_lock_enqueue_wait());
1145  *
1146  *     - calls osc_enqueue_base() to do actual enqueue.
1147  *
1148  * osc_enqueue_base() is supplied with an upcall function that is executed
1149  * when lock is received either after a local cached ldlm lock is matched, or
1150  * when a reply from the server is received.
1151  *
1152  * This function does not wait for the network communication to complete.
1153  */
1154 static int osc_lock_enqueue(const struct lu_env *env,
1155                             const struct cl_lock_slice *slice,
1156                             struct cl_io *unused, __u32 enqflags)
1157 {
1158         struct osc_lock          *ols     = cl2osc_lock(slice);
1159         struct cl_lock           *lock    = ols->ols_cl.cls_lock;
1160         int result;
1161         ENTRY;
1162
1163         LASSERT(cl_lock_is_mutexed(lock));
1164         LASSERTF(ols->ols_state == OLS_NEW,
1165                  "Impossible state: %d\n", ols->ols_state);
1166
1167         LASSERTF(ergo(ols->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ),
1168                 "lock = %p, ols = %p\n", lock, ols);
1169
1170         result = osc_lock_enqueue_wait(env, ols);
1171         if (result == 0) {
1172                 if (!osc_lock_is_lockless(ols)) {
1173                         struct osc_object        *obj = cl2osc(slice->cls_obj);
1174                         struct osc_thread_info   *info = osc_env_info(env);
1175                         struct ldlm_res_id       *resname = &info->oti_resname;
1176                         ldlm_policy_data_t       *policy = &info->oti_policy;
1177                         struct ldlm_enqueue_info *einfo = &ols->ols_einfo;
1178
1179                         /* lock will be passed as upcall cookie,
1180                          * hold ref to prevent to be released. */
1181                         cl_lock_hold_add(env, lock, "upcall", lock);
1182                         /* a user for lock also */
1183                         cl_lock_user_add(env, lock);
1184                         ols->ols_state = OLS_ENQUEUED;
1185
1186                         /*
1187                          * XXX: this is possible blocking point as
1188                          * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
1189                          * LDLM_CP_CALLBACK.
1190                          */
1191                         ostid_build_res_name(&obj->oo_oinfo->loi_oi, resname);
1192                         osc_lock_build_policy(env, lock, policy);
1193                         result = osc_enqueue_base(osc_export(obj), resname,
1194                                           &ols->ols_flags, policy,
1195                                           &ols->ols_lvb,
1196                                           obj->oo_oinfo->loi_kms_valid,
1197                                           osc_lock_upcall,
1198                                           ols, einfo, &ols->ols_handle,
1199                                           PTLRPCD_SET, 1, ols->ols_agl);
1200                         if (result != 0) {
1201                                 cl_lock_user_del(env, lock);
1202                                 cl_lock_unhold(env, lock, "upcall", lock);
1203                                 if (unlikely(result == -ECANCELED)) {
1204                                         ols->ols_state = OLS_NEW;
1205                                         result = 0;
1206                                 }
1207                         }
1208                 } else {
1209                         ols->ols_state = OLS_GRANTED;
1210                         ols->ols_owner = osc_env_io(env);
1211                 }
1212         }
1213         LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
1214         RETURN(result);
1215 }
1216
1217 static int osc_lock_wait(const struct lu_env *env,
1218                          const struct cl_lock_slice *slice)
1219 {
1220         struct osc_lock *olck = cl2osc_lock(slice);
1221         struct cl_lock  *lock = olck->ols_cl.cls_lock;
1222
1223         LINVRNT(osc_lock_invariant(olck));
1224
1225         if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) {
1226                 if (olck->ols_flags & LDLM_FL_LVB_READY) {
1227                         return 0;
1228                 } else if (olck->ols_agl) {
1229                         if (lock->cll_flags & CLF_FROM_UPCALL)
1230                                 /* It is from enqueue RPC reply upcall for
1231                                  * updating state. Do not re-enqueue. */
1232                                 return -ENAVAIL;
1233                         else
1234                                 olck->ols_state = OLS_NEW;
1235                 } else {
1236                         LASSERT(lock->cll_error);
1237                         return lock->cll_error;
1238                 }
1239         }
1240
1241         if (olck->ols_state == OLS_NEW) {
1242                 int rc;
1243
1244                 LASSERT(olck->ols_agl);
1245                 olck->ols_agl = 0;
1246                 rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC | CEF_MUST);
1247                 if (rc != 0)
1248                         return rc;
1249                 else
1250                         return CLO_REENQUEUED;
1251         }
1252
1253         LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED &&
1254                      lock->cll_error == 0, olck->ols_lock != NULL));
1255
1256         return lock->cll_error ?: olck->ols_state >= OLS_GRANTED ? 0 : CLO_WAIT;
1257 }
1258
1259 /**
1260  * An implementation of cl_lock_operations::clo_use() method that pins cached
1261  * lock.
1262  */
1263 static int osc_lock_use(const struct lu_env *env,
1264                         const struct cl_lock_slice *slice)
1265 {
1266         struct osc_lock *olck = cl2osc_lock(slice);
1267         int rc;
1268
1269         LASSERT(!olck->ols_hold);
1270
1271         /*
1272          * Atomically check for LDLM_FL_CBPENDING and addref a lock if this
1273          * flag is not set. This protects us from a concurrent blocking ast.
1274          */
1275         rc = ldlm_lock_addref_try(&olck->ols_handle, olck->ols_einfo.ei_mode);
1276         if (rc == 0) {
1277                 olck->ols_hold = 1;
1278                 olck->ols_state = OLS_GRANTED;
1279         } else {
1280                 struct cl_lock *lock;
1281
1282                 /*
1283                  * Lock is being cancelled somewhere within
1284                  * ldlm_handle_bl_callback(): LDLM_FL_CBPENDING is already
1285                  * set, but osc_ldlm_blocking_ast() hasn't yet acquired
1286                  * cl_lock mutex.
1287                  */
1288                 lock = slice->cls_lock;
1289                 LASSERT(lock->cll_state == CLS_INTRANSIT);
1290                 LASSERT(lock->cll_users > 0);
1291                 /* set a flag for osc_dlm_blocking_ast0() to signal the
1292                  * lock.*/
1293                 olck->ols_ast_wait = 1;
1294                 rc = CLO_WAIT;
1295         }
1296         return rc;
1297 }
1298
1299 static int osc_lock_flush(struct osc_lock *ols, int discard)
1300 {
1301         struct cl_lock       *lock  = ols->ols_cl.cls_lock;
1302         struct cl_env_nest    nest;
1303         struct lu_env        *env;
1304         int result = 0;
1305         ENTRY;
1306
1307         env = cl_env_nested_get(&nest);
1308         if (!IS_ERR(env)) {
1309                 struct osc_object    *obj   = cl2osc(ols->ols_cl.cls_obj);
1310                 struct cl_lock_descr *descr = &lock->cll_descr;
1311                 int rc = 0;
1312
1313                 if (descr->cld_mode >= CLM_WRITE) {
1314                         result = osc_cache_writeback_range(env, obj,
1315                                         descr->cld_start, descr->cld_end,
1316                                         1, discard);
1317                         LDLM_DEBUG(ols->ols_lock,
1318                                 "lock %p: %d pages were %s.\n", lock, result,
1319                                 discard ? "discarded" : "written");
1320                         if (result > 0)
1321                                 result = 0;
1322                 }
1323
1324                 rc = cl_lock_discard_pages(env, lock);
1325                 if (result == 0 && rc < 0)
1326                         result = rc;
1327
1328                 cl_env_nested_put(&nest, env);
1329         } else
1330                 result = PTR_ERR(env);
1331         if (result == 0) {
1332                 ols->ols_flush = 1;
1333                 LINVRNT(!osc_lock_has_pages(ols));
1334         }
1335         RETURN(result);
1336 }
1337
1338 /**
1339  * Implements cl_lock_operations::clo_cancel() method for osc layer. This is
1340  * called (as part of cl_lock_cancel()) when lock is canceled either voluntary
1341  * (LRU pressure, early cancellation, umount, etc.) or due to the conflict
1342  * with some other lock some where in the cluster. This function does the
1343  * following:
1344  *
1345  *     - invalidates all pages protected by this lock (after sending dirty
1346  *       ones to the server, as necessary);
1347  *
1348  *     - decref's underlying ldlm lock;
1349  *
1350  *     - cancels ldlm lock (ldlm_cli_cancel()).
1351  */
1352 static void osc_lock_cancel(const struct lu_env *env,
1353                             const struct cl_lock_slice *slice)
1354 {
1355         struct cl_lock   *lock    = slice->cls_lock;
1356         struct osc_lock  *olck    = cl2osc_lock(slice);
1357         struct ldlm_lock *dlmlock = olck->ols_lock;
1358         int               result  = 0;
1359         int               discard;
1360
1361         LASSERT(cl_lock_is_mutexed(lock));
1362         LINVRNT(osc_lock_invariant(olck));
1363
1364         if (dlmlock != NULL) {
1365                 int do_cancel;
1366
1367                 discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA);
1368                 if (olck->ols_state >= OLS_GRANTED)
1369                         result = osc_lock_flush(olck, discard);
1370                 osc_lock_unhold(olck);
1371
1372                 lock_res_and_lock(dlmlock);
1373                 /* Now that we're the only user of dlm read/write reference,
1374                  * mostly the ->l_readers + ->l_writers should be zero.
1375                  * However, there is a corner case.
1376                  * See bug 18829 for details.*/
1377                 do_cancel = (dlmlock->l_readers == 0 &&
1378                              dlmlock->l_writers == 0);
1379                 dlmlock->l_flags |= LDLM_FL_CBPENDING;
1380                 unlock_res_and_lock(dlmlock);
1381                 if (do_cancel)
1382                         result = ldlm_cli_cancel(&olck->ols_handle, LCF_ASYNC);
1383                 if (result < 0)
1384                         CL_LOCK_DEBUG(D_ERROR, env, lock,
1385                                       "lock %p cancel failure with error(%d)\n",
1386                                       lock, result);
1387         }
1388         olck->ols_state = OLS_CANCELLED;
1389         olck->ols_flags &= ~LDLM_FL_LVB_READY;
1390         osc_lock_detach(env, olck);
1391 }
1392
1393 #ifdef INVARIANT_CHECK
1394 static int check_cb(const struct lu_env *env, struct cl_io *io,
1395                     struct cl_page *page, void *cbdata)
1396 {
1397         struct cl_lock *lock = cbdata;
1398
1399         if (lock->cll_descr.cld_mode == CLM_READ) {
1400                 struct cl_lock *tmp;
1401                 tmp = cl_lock_at_page(env, lock->cll_descr.cld_obj,
1402                                      page, lock, 1, 0);
1403                 if (tmp != NULL) {
1404                         cl_lock_put(env, tmp);
1405                         return CLP_GANG_OKAY;
1406                 }
1407         }
1408
1409         CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n");
1410         CL_PAGE_DEBUG(D_ERROR, env, page, "\n");
1411         return CLP_GANG_ABORT;
1412 }
1413
1414 /**
1415  * Returns true iff there are pages under \a olck not protected by other
1416  * locks.
1417  */
1418 static int osc_lock_has_pages(struct osc_lock *olck)
1419 {
1420         struct cl_lock       *lock;
1421         struct cl_lock_descr *descr;
1422         struct cl_object     *obj;
1423         struct osc_object    *oob;
1424         struct cl_env_nest    nest;
1425         struct cl_io         *io;
1426         struct lu_env        *env;
1427         int                   result;
1428
1429         env = cl_env_nested_get(&nest);
1430         if (IS_ERR(env))
1431                 return 0;
1432
1433         obj   = olck->ols_cl.cls_obj;
1434         oob   = cl2osc(obj);
1435         io    = &oob->oo_debug_io;
1436         lock  = olck->ols_cl.cls_lock;
1437         descr = &lock->cll_descr;
1438
1439         mutex_lock(&oob->oo_debug_mutex);
1440
1441         io->ci_obj = cl_object_top(obj);
1442         io->ci_ignore_layout = 1;
1443         cl_io_init(env, io, CIT_MISC, io->ci_obj);
1444         do {
1445                 result = cl_page_gang_lookup(env, obj, io,
1446                                              descr->cld_start, descr->cld_end,
1447                                              check_cb, (void *)lock);
1448                 if (result == CLP_GANG_ABORT)
1449                         break;
1450                 if (result == CLP_GANG_RESCHED)
1451                         cfs_cond_resched();
1452         } while (result != CLP_GANG_OKAY);
1453         cl_io_fini(env, io);
1454         mutex_unlock(&oob->oo_debug_mutex);
1455         cl_env_nested_put(&nest, env);
1456
1457         return (result == CLP_GANG_ABORT);
1458 }
1459 #else
1460 static int osc_lock_has_pages(struct osc_lock *olck)
1461 {
1462         return 0;
1463 }
1464 #endif /* INVARIANT_CHECK */
1465
1466 static void osc_lock_delete(const struct lu_env *env,
1467                             const struct cl_lock_slice *slice)
1468 {
1469         struct osc_lock *olck;
1470
1471         olck = cl2osc_lock(slice);
1472         if (olck->ols_glimpse) {
1473                 LASSERT(!olck->ols_hold);
1474                 LASSERT(!olck->ols_lock);
1475                 return;
1476         }
1477
1478         LINVRNT(osc_lock_invariant(olck));
1479         LINVRNT(!osc_lock_has_pages(olck));
1480
1481         osc_lock_unhold(olck);
1482         osc_lock_detach(env, olck);
1483 }
1484
1485 /**
1486  * Implements cl_lock_operations::clo_state() method for osc layer.
1487  *
1488  * Maintains osc_lock::ols_owner field.
1489  *
1490  * This assumes that lock always enters CLS_HELD (from some other state) in
1491  * the same IO context as one that requested the lock. This should not be a
1492  * problem, because context is by definition shared by all activity pertaining
1493  * to the same high-level IO.
1494  */
1495 static void osc_lock_state(const struct lu_env *env,
1496                            const struct cl_lock_slice *slice,
1497                            enum cl_lock_state state)
1498 {
1499         struct osc_lock *lock = cl2osc_lock(slice);
1500
1501         /*
1502          * XXX multiple io contexts can use the lock at the same time.
1503          */
1504         LINVRNT(osc_lock_invariant(lock));
1505         if (state == CLS_HELD && slice->cls_lock->cll_state != CLS_HELD) {
1506                 struct osc_io *oio = osc_env_io(env);
1507
1508                 LASSERT(lock->ols_owner == NULL);
1509                 lock->ols_owner = oio;
1510         } else if (state != CLS_HELD)
1511                 lock->ols_owner = NULL;
1512 }
1513
1514 static int osc_lock_print(const struct lu_env *env, void *cookie,
1515                           lu_printer_t p, const struct cl_lock_slice *slice)
1516 {
1517         struct osc_lock *lock = cl2osc_lock(slice);
1518
1519         /*
1520          * XXX print ldlm lock and einfo properly.
1521          */
1522         (*p)(env, cookie, "%p %#16llx "LPX64" %d %p ",
1523              lock->ols_lock, lock->ols_flags, lock->ols_handle.cookie,
1524              lock->ols_state, lock->ols_owner);
1525         osc_lvb_print(env, cookie, p, &lock->ols_lvb);
1526         return 0;
1527 }
1528
1529 static int osc_lock_fits_into(const struct lu_env *env,
1530                               const struct cl_lock_slice *slice,
1531                               const struct cl_lock_descr *need,
1532                               const struct cl_io *io)
1533 {
1534         struct osc_lock *ols = cl2osc_lock(slice);
1535
1536         if (need->cld_enq_flags & CEF_NEVER)
1537                 return 0;
1538
1539         if (ols->ols_state >= OLS_CANCELLED)
1540                 return 0;
1541
1542         if (need->cld_mode == CLM_PHANTOM) {
1543                 if (ols->ols_agl)
1544                         return !(ols->ols_state > OLS_RELEASED);
1545
1546                 /*
1547                  * Note: the QUEUED lock can't be matched here, otherwise
1548                  * it might cause the deadlocks.
1549                  * In read_process,
1550                  * P1: enqueued read lock, create sublock1
1551                  * P2: enqueued write lock, create sublock2(conflicted
1552                  *     with sublock1).
1553                  * P1: Grant read lock.
1554                  * P1: enqueued glimpse lock(with holding sublock1_read),
1555                  *     matched with sublock2, waiting sublock2 to be granted.
1556                  *     But sublock2 can not be granted, because P1
1557                  *     will not release sublock1. Bang!
1558                  */
1559                 if (ols->ols_state < OLS_GRANTED ||
1560                     ols->ols_state > OLS_RELEASED)
1561                         return 0;
1562         } else if (need->cld_enq_flags & CEF_MUST) {
1563                 /*
1564                  * If the lock hasn't ever enqueued, it can't be matched
1565                  * because enqueue process brings in many information
1566                  * which can be used to determine things such as lockless,
1567                  * CEF_MUST, etc.
1568                  */
1569                 if (ols->ols_state < OLS_UPCALL_RECEIVED &&
1570                     ols->ols_locklessable)
1571                         return 0;
1572         }
1573         return 1;
1574 }
1575
1576 static const struct cl_lock_operations osc_lock_ops = {
1577         .clo_fini    = osc_lock_fini,
1578         .clo_enqueue = osc_lock_enqueue,
1579         .clo_wait    = osc_lock_wait,
1580         .clo_unuse   = osc_lock_unuse,
1581         .clo_use     = osc_lock_use,
1582         .clo_delete  = osc_lock_delete,
1583         .clo_state   = osc_lock_state,
1584         .clo_cancel  = osc_lock_cancel,
1585         .clo_weigh   = osc_lock_weigh,
1586         .clo_print   = osc_lock_print,
1587         .clo_fits_into = osc_lock_fits_into,
1588 };
1589
1590 static int osc_lock_lockless_unuse(const struct lu_env *env,
1591                                    const struct cl_lock_slice *slice)
1592 {
1593         struct osc_lock *ols = cl2osc_lock(slice);
1594         struct cl_lock *lock = slice->cls_lock;
1595
1596         LASSERT(ols->ols_state == OLS_GRANTED);
1597         LINVRNT(osc_lock_invariant(ols));
1598
1599         cl_lock_cancel(env, lock);
1600         cl_lock_delete(env, lock);
1601         return 0;
1602 }
1603
1604 static void osc_lock_lockless_cancel(const struct lu_env *env,
1605                                      const struct cl_lock_slice *slice)
1606 {
1607         struct osc_lock   *ols  = cl2osc_lock(slice);
1608         int result;
1609
1610         result = osc_lock_flush(ols, 0);
1611         if (result)
1612                 CERROR("Pages for lockless lock %p were not purged(%d)\n",
1613                        ols, result);
1614         ols->ols_state = OLS_CANCELLED;
1615 }
1616
1617 static int osc_lock_lockless_wait(const struct lu_env *env,
1618                                   const struct cl_lock_slice *slice)
1619 {
1620         struct osc_lock *olck = cl2osc_lock(slice);
1621         struct cl_lock  *lock = olck->ols_cl.cls_lock;
1622
1623         LINVRNT(osc_lock_invariant(olck));
1624         LASSERT(olck->ols_state >= OLS_UPCALL_RECEIVED);
1625
1626         return lock->cll_error;
1627 }
1628
1629 static void osc_lock_lockless_state(const struct lu_env *env,
1630                                     const struct cl_lock_slice *slice,
1631                                     enum cl_lock_state state)
1632 {
1633         struct osc_lock *lock = cl2osc_lock(slice);
1634
1635         LINVRNT(osc_lock_invariant(lock));
1636         if (state == CLS_HELD) {
1637                 struct osc_io *oio  = osc_env_io(env);
1638
1639                 LASSERT(ergo(lock->ols_owner, lock->ols_owner == oio));
1640                 lock->ols_owner = oio;
1641
1642                 /* set the io to be lockless if this lock is for io's
1643                  * host object */
1644                 if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
1645                         oio->oi_lockless = 1;
1646         }
1647 }
1648
1649 static int osc_lock_lockless_fits_into(const struct lu_env *env,
1650                                        const struct cl_lock_slice *slice,
1651                                        const struct cl_lock_descr *need,
1652                                        const struct cl_io *io)
1653 {
1654         struct osc_lock *lock = cl2osc_lock(slice);
1655
1656         if (!(need->cld_enq_flags & CEF_NEVER))
1657                 return 0;
1658
1659         /* lockless lock should only be used by its owning io. b22147 */
1660         return (lock->ols_owner == osc_env_io(env));
1661 }
1662
1663 static const struct cl_lock_operations osc_lock_lockless_ops = {
1664         .clo_fini      = osc_lock_fini,
1665         .clo_enqueue   = osc_lock_enqueue,
1666         .clo_wait      = osc_lock_lockless_wait,
1667         .clo_unuse     = osc_lock_lockless_unuse,
1668         .clo_state     = osc_lock_lockless_state,
1669         .clo_fits_into = osc_lock_lockless_fits_into,
1670         .clo_cancel    = osc_lock_lockless_cancel,
1671         .clo_print     = osc_lock_print
1672 };
1673
1674 int osc_lock_init(const struct lu_env *env,
1675                   struct cl_object *obj, struct cl_lock *lock,
1676                   const struct cl_io *unused)
1677 {
1678         struct osc_lock *clk;
1679         int result;
1680
1681         OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO);
1682         if (clk != NULL) {
1683                 __u32 enqflags = lock->cll_descr.cld_enq_flags;
1684
1685                 osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
1686                 cfs_atomic_set(&clk->ols_pageref, 0);
1687                 clk->ols_state = OLS_NEW;
1688
1689                 clk->ols_flags = osc_enq2ldlm_flags(enqflags);
1690                 clk->ols_agl = !!(enqflags & CEF_AGL);
1691                 if (clk->ols_agl)
1692                         clk->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
1693                 if (clk->ols_flags & LDLM_FL_HAS_INTENT)
1694                         clk->ols_glimpse = 1;
1695
1696                 cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
1697
1698                 if (!(enqflags & CEF_MUST))
1699                         /* try to convert this lock to a lockless lock */
1700                         osc_lock_to_lockless(env, clk, (enqflags & CEF_NEVER));
1701                 if (clk->ols_locklessable && !(enqflags & CEF_DISCARD_DATA))
1702                         clk->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
1703
1704                 LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %llx\n",
1705                                 lock, clk, clk->ols_flags);
1706
1707                 result = 0;
1708         } else
1709                 result = -ENOMEM;
1710         return result;
1711 }
1712
1713 int osc_dlm_lock_pageref(struct ldlm_lock *dlm)
1714 {
1715         struct osc_lock *olock;
1716         int              rc = 0;
1717
1718         spin_lock(&osc_ast_guard);
1719         olock = dlm->l_ast_data;
1720         /*
1721          * there's a very rare race with osc_page_addref_lock(), but that
1722          * doesn't matter because in the worst case we don't cancel a lock
1723          * which we actually can, that's no harm.
1724          */
1725         if (olock != NULL &&
1726             cfs_atomic_add_return(_PAGEREF_MAGIC,
1727                                   &olock->ols_pageref) != _PAGEREF_MAGIC) {
1728                 cfs_atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref);
1729                 rc = 1;
1730         }
1731         spin_unlock(&osc_ast_guard);
1732         return rc;
1733 }
1734
1735 /** @} osc */