Whamcloud - gitweb
LU-1070 agl: update lock state when AGL upcall
[fs/lustre-release.git] / lustre / osc / osc_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * Implementation of cl_lock for OSC layer.
39  *
40  *   Author: Nikita Danilov <nikita.danilov@sun.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_OSC
44
45 #ifdef __KERNEL__
46 # include <libcfs/libcfs.h>
47 #else
48 # include <liblustre.h>
49 #endif
50 /* fid_build_reg_res_name() */
51 #include <lustre_fid.h>
52
53 #include "osc_cl_internal.h"
54
55 /** \addtogroup osc 
56  *  @{ 
57  */
58
59 #define _PAGEREF_MAGIC  (-10000000)
60
61 /*****************************************************************************
62  *
63  * Type conversions.
64  *
65  */
66
67 static const struct cl_lock_operations osc_lock_ops;
68 static const struct cl_lock_operations osc_lock_lockless_ops;
69 static void osc_lock_to_lockless(const struct lu_env *env,
70                                  struct osc_lock *ols, int force);
71 static int osc_lock_has_pages(struct osc_lock *olck);
72
73 int osc_lock_is_lockless(const struct osc_lock *olck)
74 {
75         return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops);
76 }
77
78 /**
79  * Returns a weak pointer to the ldlm lock identified by a handle. Returned
80  * pointer cannot be dereferenced, as lock is not protected from concurrent
81  * reclaim. This function is a helper for osc_lock_invariant().
82  */
83 static struct ldlm_lock *osc_handle_ptr(struct lustre_handle *handle)
84 {
85         struct ldlm_lock *lock;
86
87         lock = ldlm_handle2lock(handle);
88         if (lock != NULL)
89                 LDLM_LOCK_PUT(lock);
90         return lock;
91 }
92
93 /**
94  * Invariant that has to be true all of the time.
95  */
96 static int osc_lock_invariant(struct osc_lock *ols)
97 {
98         struct ldlm_lock *lock        = osc_handle_ptr(&ols->ols_handle);
99         struct ldlm_lock *olock       = ols->ols_lock;
100         int               handle_used = lustre_handle_is_used(&ols->ols_handle);
101
102         return
103                 ergo(osc_lock_is_lockless(ols),
104                      ols->ols_locklessable && ols->ols_lock == NULL)  ||
105                 (ergo(olock != NULL, handle_used) &&
106                  ergo(olock != NULL,
107                       olock->l_handle.h_cookie == ols->ols_handle.cookie) &&
108                  /*
109                   * Check that ->ols_handle and ->ols_lock are consistent, but
110                   * take into account that they are set at the different time.
111                   */
112                  ergo(handle_used,
113                       ergo(lock != NULL && olock != NULL, lock == olock) &&
114                       ergo(lock == NULL, olock == NULL)) &&
115                  ergo(ols->ols_state == OLS_CANCELLED,
116                       olock == NULL && !handle_used) &&
117                  /*
118                   * DLM lock is destroyed only after we have seen cancellation
119                   * ast.
120                   */
121                  ergo(olock != NULL && ols->ols_state < OLS_CANCELLED,
122                       !olock->l_destroyed) &&
123                  ergo(ols->ols_state == OLS_GRANTED,
124                       olock != NULL &&
125                       olock->l_req_mode == olock->l_granted_mode &&
126                       ols->ols_hold));
127 }
128
129 /*****************************************************************************
130  *
131  * Lock operations.
132  *
133  */
134
135 /**
136  * Breaks a link between osc_lock and dlm_lock.
137  */
138 static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck)
139 {
140         struct ldlm_lock *dlmlock;
141
142         cfs_spin_lock(&osc_ast_guard);
143         dlmlock = olck->ols_lock;
144         if (dlmlock == NULL) {
145                 cfs_spin_unlock(&osc_ast_guard);
146                 return;
147         }
148
149         olck->ols_lock = NULL;
150         /* wb(); --- for all who checks (ols->ols_lock != NULL) before
151          * call to osc_lock_detach() */
152         dlmlock->l_ast_data = NULL;
153         olck->ols_handle.cookie = 0ULL;
154         cfs_spin_unlock(&osc_ast_guard);
155
156         lock_res_and_lock(dlmlock);
157         if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
158                 struct cl_object *obj = olck->ols_cl.cls_obj;
159                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
160                 __u64 old_kms;
161
162                 cl_object_attr_lock(obj);
163                 /* Must get the value under the lock to avoid possible races. */
164                 old_kms = cl2osc(obj)->oo_oinfo->loi_kms;
165                 /* Update the kms. Need to loop all granted locks.
166                  * Not a problem for the client */
167                 attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms);
168
169                 cl_object_attr_set(env, obj, attr, CAT_KMS);
170                 cl_object_attr_unlock(obj);
171         }
172         unlock_res_and_lock(dlmlock);
173
174         /* release a reference taken in osc_lock_upcall0(). */
175         LASSERT(olck->ols_has_ref);
176         lu_ref_del(&dlmlock->l_reference, "osc_lock", olck);
177         LDLM_LOCK_RELEASE(dlmlock);
178         olck->ols_has_ref = 0;
179 }
180
181 static int osc_lock_unhold(struct osc_lock *ols)
182 {
183         int result = 0;
184
185         if (ols->ols_hold) {
186                 ols->ols_hold = 0;
187                 result = osc_cancel_base(&ols->ols_handle,
188                                          ols->ols_einfo.ei_mode);
189         }
190         return result;
191 }
192
193 static int osc_lock_unuse(const struct lu_env *env,
194                           const struct cl_lock_slice *slice)
195 {
196         struct osc_lock *ols = cl2osc_lock(slice);
197
198         LINVRNT(osc_lock_invariant(ols));
199
200         switch (ols->ols_state) {
201         case OLS_NEW:
202                 LASSERT(!ols->ols_hold);
203                 LASSERT(ols->ols_agl);
204                 return 0;
205         case OLS_UPCALL_RECEIVED:
206                 LASSERT(!ols->ols_hold);
207                 ols->ols_state = OLS_NEW;
208                 return 0;
209         case OLS_GRANTED:
210                 LASSERT(!ols->ols_glimpse);
211                 LASSERT(ols->ols_hold);
212                 /*
213                  * Move lock into OLS_RELEASED state before calling
214                  * osc_cancel_base() so that possible synchronous cancellation
215                  * (that always happens e.g., for liblustre) sees that lock is
216                  * released.
217                  */
218                 ols->ols_state = OLS_RELEASED;
219                 return osc_lock_unhold(ols);
220         default:
221                 CERROR("Impossible state: %d\n", ols->ols_state);
222                 LBUG();
223         }
224 }
225
226 static void osc_lock_fini(const struct lu_env *env,
227                           struct cl_lock_slice *slice)
228 {
229         struct osc_lock  *ols = cl2osc_lock(slice);
230
231         LINVRNT(osc_lock_invariant(ols));
232         /*
233          * ->ols_hold can still be true at this point if, for example, a
234          * thread that requested a lock was killed (and released a reference
235          * to the lock), before reply from a server was received. In this case
236          * lock is destroyed immediately after upcall.
237          */
238         osc_lock_unhold(ols);
239         LASSERT(ols->ols_lock == NULL);
240         LASSERT(cfs_atomic_read(&ols->ols_pageref) == 0 ||
241                 cfs_atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC);
242
243         OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
244 }
245
246 void osc_lock_build_res(const struct lu_env *env, const struct osc_object *obj,
247                         struct ldlm_res_id *resname)
248 {
249         const struct lu_fid *fid = lu_object_fid(&obj->oo_cl.co_lu);
250         if (0) {
251                 /*
252                  * In the perfect world of the future, where ost servers talk
253                  * idif-fids...
254                  */
255                 fid_build_reg_res_name(fid, resname);
256         } else {
257                 /*
258                  * In reality, where ost server expects ->lsm_object_id and
259                  * ->lsm_object_seq in rename.
260                  */
261                 osc_build_res_name(obj->oo_oinfo->loi_id, obj->oo_oinfo->loi_seq,
262                                    resname);
263         }
264 }
265
266 static void osc_lock_build_policy(const struct lu_env *env,
267                                   const struct cl_lock *lock,
268                                   ldlm_policy_data_t *policy)
269 {
270         const struct cl_lock_descr *d = &lock->cll_descr;
271
272         osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
273         policy->l_extent.gid = d->cld_gid;
274 }
275
276 static int osc_enq2ldlm_flags(__u32 enqflags)
277 {
278         int result = 0;
279
280         LASSERT((enqflags & ~CEF_MASK) == 0);
281
282         if (enqflags & CEF_NONBLOCK)
283                 result |= LDLM_FL_BLOCK_NOWAIT;
284         if (enqflags & CEF_ASYNC)
285                 result |= LDLM_FL_HAS_INTENT;
286         if (enqflags & CEF_DISCARD_DATA)
287                 result |= LDLM_AST_DISCARD_DATA;
288         return result;
289 }
290
291 /**
292  * Global spin-lock protecting consistency of ldlm_lock::l_ast_data
293  * pointers. Initialized in osc_init().
294  */
295 cfs_spinlock_t osc_ast_guard;
296
297 static struct osc_lock *osc_ast_data_get(struct ldlm_lock *dlm_lock)
298 {
299         struct osc_lock *olck;
300
301         lock_res_and_lock(dlm_lock);
302         cfs_spin_lock(&osc_ast_guard);
303         olck = dlm_lock->l_ast_data;
304         if (olck != NULL) {
305                 struct cl_lock *lock = olck->ols_cl.cls_lock;
306                 /*
307                  * If osc_lock holds a reference on ldlm lock, return it even
308                  * when cl_lock is in CLS_FREEING state. This way
309                  *
310                  *         osc_ast_data_get(dlmlock) == NULL
311                  *
312                  * guarantees that all osc references on dlmlock were
313                  * released. osc_dlm_blocking_ast0() relies on that.
314                  */
315                 if (lock->cll_state < CLS_FREEING || olck->ols_has_ref) {
316                         cl_lock_get_trust(lock);
317                         lu_ref_add_atomic(&lock->cll_reference,
318                                           "ast", cfs_current());
319                 } else
320                         olck = NULL;
321         }
322         cfs_spin_unlock(&osc_ast_guard);
323         unlock_res_and_lock(dlm_lock);
324         return olck;
325 }
326
327 static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck)
328 {
329         struct cl_lock *lock;
330
331         lock = olck->ols_cl.cls_lock;
332         lu_ref_del(&lock->cll_reference, "ast", cfs_current());
333         cl_lock_put(env, lock);
334 }
335
336 /**
337  * Updates object attributes from a lock value block (lvb) received together
338  * with the DLM lock reply from the server. Copy of osc_update_enqueue()
339  * logic.
340  *
341  * This can be optimized to not update attributes when lock is a result of a
342  * local match.
343  *
344  * Called under lock and resource spin-locks.
345  */
346 static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
347                                 int rc)
348 {
349         struct ost_lvb    *lvb;
350         struct cl_object  *obj;
351         struct lov_oinfo  *oinfo;
352         struct cl_attr    *attr;
353         unsigned           valid;
354
355         ENTRY;
356
357         if (!(olck->ols_flags & LDLM_FL_LVB_READY))
358                 RETURN_EXIT;
359
360         lvb   = &olck->ols_lvb;
361         obj   = olck->ols_cl.cls_obj;
362         oinfo = cl2osc(obj)->oo_oinfo;
363         attr  = &osc_env_info(env)->oti_attr;
364         valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE;
365         cl_lvb2attr(attr, lvb);
366
367         cl_object_attr_lock(obj);
368         if (rc == 0) {
369                 struct ldlm_lock  *dlmlock;
370                 __u64 size;
371
372                 dlmlock = olck->ols_lock;
373                 LASSERT(dlmlock != NULL);
374
375                 /* re-grab LVB from a dlm lock under DLM spin-locks. */
376                 *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
377                 size = lvb->lvb_size;
378                 /* Extend KMS up to the end of this lock and no further
379                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
380                 if (size > dlmlock->l_policy_data.l_extent.end)
381                         size = dlmlock->l_policy_data.l_extent.end + 1;
382                 if (size >= oinfo->loi_kms) {
383                         LDLM_DEBUG(dlmlock, "lock acquired, setting rss="LPU64
384                                    ", kms="LPU64, lvb->lvb_size, size);
385                         valid |= CAT_KMS;
386                         attr->cat_kms = size;
387                 } else {
388                         LDLM_DEBUG(dlmlock, "lock acquired, setting rss="
389                                    LPU64"; leaving kms="LPU64", end="LPU64,
390                                    lvb->lvb_size, oinfo->loi_kms,
391                                    dlmlock->l_policy_data.l_extent.end);
392                 }
393                 ldlm_lock_allow_match_locked(dlmlock);
394         } else if (rc == -ENAVAIL && olck->ols_glimpse) {
395                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
396                        " kms="LPU64"\n", lvb->lvb_size, oinfo->loi_kms);
397         } else
398                 valid = 0;
399
400         if (valid != 0)
401                 cl_object_attr_set(env, obj, attr, valid);
402
403         cl_object_attr_unlock(obj);
404
405         EXIT;
406 }
407
408 /**
409  * Called when a lock is granted, from an upcall (when server returned a
410  * granted lock), or from completion AST, when server returned a blocked lock.
411  *
412  * Called under lock and resource spin-locks, that are released temporarily
413  * here.
414  */
415 static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
416                              struct ldlm_lock *dlmlock, int rc)
417 {
418         struct ldlm_extent   *ext;
419         struct cl_lock       *lock;
420         struct cl_lock_descr *descr;
421
422         LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode);
423
424         ENTRY;
425         if (olck->ols_state < OLS_GRANTED) {
426                 lock  = olck->ols_cl.cls_lock;
427                 ext   = &dlmlock->l_policy_data.l_extent;
428                 descr = &osc_env_info(env)->oti_descr;
429                 descr->cld_obj = lock->cll_descr.cld_obj;
430
431                 /* XXX check that ->l_granted_mode is valid. */
432                 descr->cld_mode  = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
433                 descr->cld_start = cl_index(descr->cld_obj, ext->start);
434                 descr->cld_end   = cl_index(descr->cld_obj, ext->end);
435                 descr->cld_gid   = ext->gid;
436                 /*
437                  * tell upper layers the extent of the lock that was actually
438                  * granted
439                  */
440                 olck->ols_state = OLS_GRANTED;
441                 osc_lock_lvb_update(env, olck, rc);
442
443                 /* release DLM spin-locks to allow cl_lock_{modify,signal}()
444                  * to take a semaphore on a parent lock. This is safe, because
445                  * spin-locks are needed to protect consistency of
446                  * dlmlock->l_*_mode and LVB, and we have finished processing
447                  * them. */
448                 unlock_res_and_lock(dlmlock);
449                 cl_lock_modify(env, lock, descr);
450                 cl_lock_signal(env, lock);
451                 LINVRNT(osc_lock_invariant(olck));
452                 lock_res_and_lock(dlmlock);
453         }
454         EXIT;
455 }
456
457 static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck)
458
459 {
460         struct ldlm_lock *dlmlock;
461
462         ENTRY;
463
464         dlmlock = ldlm_handle2lock_long(&olck->ols_handle, 0);
465         LASSERT(dlmlock != NULL);
466
467         lock_res_and_lock(dlmlock);
468         cfs_spin_lock(&osc_ast_guard);
469         LASSERT(dlmlock->l_ast_data == olck);
470         LASSERT(olck->ols_lock == NULL);
471         olck->ols_lock = dlmlock;
472         cfs_spin_unlock(&osc_ast_guard);
473
474         /*
475          * Lock might be not yet granted. In this case, completion ast
476          * (osc_ldlm_completion_ast()) comes later and finishes lock
477          * granting.
478          */
479         if (dlmlock->l_granted_mode == dlmlock->l_req_mode)
480                 osc_lock_granted(env, olck, dlmlock, 0);
481         unlock_res_and_lock(dlmlock);
482
483         /*
484          * osc_enqueue_interpret() decrefs asynchronous locks, counter
485          * this.
486          */
487         ldlm_lock_addref(&olck->ols_handle, olck->ols_einfo.ei_mode);
488         olck->ols_hold = 1;
489
490         /* lock reference taken by ldlm_handle2lock_long() is owned by
491          * osc_lock and released in osc_lock_detach() */
492         lu_ref_add(&dlmlock->l_reference, "osc_lock", olck);
493         olck->ols_has_ref = 1;
494 }
495
496 /**
497  * Lock upcall function that is executed either when a reply to ENQUEUE rpc is
498  * received from a server, or after osc_enqueue_base() matched a local DLM
499  * lock.
500  */
501 static int osc_lock_upcall(void *cookie, int errcode)
502 {
503         struct osc_lock         *olck  = cookie;
504         struct cl_lock_slice    *slice = &olck->ols_cl;
505         struct cl_lock          *lock  = slice->cls_lock;
506         struct lu_env           *env;
507         struct cl_env_nest       nest;
508
509         ENTRY;
510         env = cl_env_nested_get(&nest);
511         if (!IS_ERR(env)) {
512                 int rc;
513
514                 cl_lock_mutex_get(env, lock);
515
516                 LASSERT(lock->cll_state >= CLS_QUEUING);
517                 if (olck->ols_state == OLS_ENQUEUED) {
518                         olck->ols_state = OLS_UPCALL_RECEIVED;
519                         rc = ldlm_error2errno(errcode);
520                 } else if (olck->ols_state == OLS_CANCELLED) {
521                         rc = -EIO;
522                 } else {
523                         CERROR("Impossible state: %d\n", olck->ols_state);
524                         LBUG();
525                 }
526                 if (rc) {
527                         struct ldlm_lock *dlmlock;
528
529                         dlmlock = ldlm_handle2lock(&olck->ols_handle);
530                         if (dlmlock != NULL) {
531                                 lock_res_and_lock(dlmlock);
532                                 cfs_spin_lock(&osc_ast_guard);
533                                 LASSERT(olck->ols_lock == NULL);
534                                 dlmlock->l_ast_data = NULL;
535                                 olck->ols_handle.cookie = 0ULL;
536                                 cfs_spin_unlock(&osc_ast_guard);
537                                 ldlm_lock_fail_match_locked(dlmlock);
538                                 unlock_res_and_lock(dlmlock);
539                                 LDLM_LOCK_PUT(dlmlock);
540                         }
541                 } else {
542                         if (olck->ols_glimpse)
543                                 olck->ols_glimpse = 0;
544                         osc_lock_upcall0(env, olck);
545                 }
546
547                 /* Error handling, some errors are tolerable. */
548                 if (olck->ols_locklessable && rc == -EUSERS) {
549                         /* This is a tolerable error, turn this lock into
550                          * lockless lock.
551                          */
552                         osc_object_set_contended(cl2osc(slice->cls_obj));
553                         LASSERT(slice->cls_ops == &osc_lock_ops);
554
555                         /* Change this lock to ldlmlock-less lock. */
556                         osc_lock_to_lockless(env, olck, 1);
557                         olck->ols_state = OLS_GRANTED;
558                         rc = 0;
559                 } else if (olck->ols_glimpse && rc == -ENAVAIL) {
560                         osc_lock_lvb_update(env, olck, rc);
561                         cl_lock_delete(env, lock);
562                         /* Hide the error. */
563                         rc = 0;
564                 }
565
566                 if (rc == 0) {
567                         /* For AGL case, the RPC sponsor may exits the cl_lock
568                         *  processing without wait() called before related OSC
569                         *  lock upcall(). So update the lock status according
570                         *  to the enqueue result inside AGL upcall(). */
571                         if (olck->ols_agl) {
572                                 lock->cll_flags |= CLF_FROM_UPCALL;
573                                 cl_wait_try(env, lock);
574                                 lock->cll_flags &= ~CLF_FROM_UPCALL;
575                                 if (!olck->ols_glimpse)
576                                         olck->ols_agl = 0;
577                         }
578                         cl_lock_signal(env, lock);
579                         /* del user for lock upcall cookie */
580                         cl_unuse_try(env, lock);
581                 } else {
582                         /* del user for lock upcall cookie */
583                         cl_lock_user_del(env, lock);
584                         cl_lock_error(env, lock, rc);
585                 }
586
587                 cl_lock_mutex_put(env, lock);
588
589                 /* release cookie reference, acquired by osc_lock_enqueue() */
590                 lu_ref_del(&lock->cll_reference, "upcall", lock);
591                 cl_lock_put(env, lock);
592
593                 cl_env_nested_put(&nest, env);
594         } else
595                 /* should never happen, similar to osc_ldlm_blocking_ast(). */
596                 LBUG();
597         RETURN(errcode);
598 }
599
600 /**
601  * Core of osc_dlm_blocking_ast() logic.
602  */
603 static void osc_lock_blocking(const struct lu_env *env,
604                               struct ldlm_lock *dlmlock,
605                               struct osc_lock *olck, int blocking)
606 {
607         struct cl_lock *lock = olck->ols_cl.cls_lock;
608
609         LASSERT(olck->ols_lock == dlmlock);
610         CLASSERT(OLS_BLOCKED < OLS_CANCELLED);
611         LASSERT(!osc_lock_is_lockless(olck));
612
613         /*
614          * Lock might be still addref-ed here, if e.g., blocking ast
615          * is sent for a failed lock.
616          */
617         osc_lock_unhold(olck);
618
619         if (blocking && olck->ols_state < OLS_BLOCKED)
620                 /*
621                  * Move osc_lock into OLS_BLOCKED before canceling the lock,
622                  * because it recursively re-enters osc_lock_blocking(), with
623                  * the state set to OLS_CANCELLED.
624                  */
625                 olck->ols_state = OLS_BLOCKED;
626         /*
627          * cancel and destroy lock at least once no matter how blocking ast is
628          * entered (see comment above osc_ldlm_blocking_ast() for use
629          * cases). cl_lock_cancel() and cl_lock_delete() are idempotent.
630          */
631         cl_lock_cancel(env, lock);
632         cl_lock_delete(env, lock);
633 }
634
635 /**
636  * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock
637  * and ldlm_lock caches.
638  */
639 static int osc_dlm_blocking_ast0(const struct lu_env *env,
640                                  struct ldlm_lock *dlmlock,
641                                  void *data, int flag)
642 {
643         struct osc_lock *olck;
644         struct cl_lock  *lock;
645         int result;
646         int cancel;
647
648         LASSERT(flag == LDLM_CB_BLOCKING || flag == LDLM_CB_CANCELING);
649
650         cancel = 0;
651         olck = osc_ast_data_get(dlmlock);
652         if (olck != NULL) {
653                 lock = olck->ols_cl.cls_lock;
654                 cl_lock_mutex_get(env, lock);
655                 LINVRNT(osc_lock_invariant(olck));
656                 if (olck->ols_ast_wait) {
657                         /* wake up osc_lock_use() */
658                         cl_lock_signal(env, lock);
659                         olck->ols_ast_wait = 0;
660                 }
661                 /*
662                  * Lock might have been canceled while this thread was
663                  * sleeping for lock mutex, but olck is pinned in memory.
664                  */
665                 if (olck == dlmlock->l_ast_data) {
666                         /*
667                          * NOTE: DLM sends blocking AST's for failed locks
668                          *       (that are still in pre-OLS_GRANTED state)
669                          *       too, and they have to be canceled otherwise
670                          *       DLM lock is never destroyed and stuck in
671                          *       the memory.
672                          *
673                          *       Alternatively, ldlm_cli_cancel() can be
674                          *       called here directly for osc_locks with
675                          *       ols_state < OLS_GRANTED to maintain an
676                          *       invariant that ->clo_cancel() is only called
677                          *       for locks that were granted.
678                          */
679                         LASSERT(data == olck);
680                         osc_lock_blocking(env, dlmlock,
681                                           olck, flag == LDLM_CB_BLOCKING);
682                 } else
683                         cancel = 1;
684                 cl_lock_mutex_put(env, lock);
685                 osc_ast_data_put(env, olck);
686         } else
687                 /*
688                  * DLM lock exists, but there is no cl_lock attached to it.
689                  * This is a `normal' race. cl_object and its cl_lock's can be
690                  * removed by memory pressure, together with all pages.
691                  */
692                 cancel = (flag == LDLM_CB_BLOCKING);
693
694         if (cancel) {
695                 struct lustre_handle *lockh;
696
697                 lockh = &osc_env_info(env)->oti_handle;
698                 ldlm_lock2handle(dlmlock, lockh);
699                 result = ldlm_cli_cancel(lockh);
700         } else
701                 result = 0;
702         return result;
703 }
704
705 /**
706  * Blocking ast invoked by ldlm when dlm lock is either blocking progress of
707  * some other lock, or is canceled. This function is installed as a
708  * ldlm_lock::l_blocking_ast() for client extent locks.
709  *
710  * Control flow is tricky, because ldlm uses the same call-back
711  * (ldlm_lock::l_blocking_ast()) for both blocking and cancellation ast's.
712  *
713  * \param dlmlock lock for which ast occurred.
714  *
715  * \param new description of a conflicting lock in case of blocking ast.
716  *
717  * \param data value of dlmlock->l_ast_data
718  *
719  * \param flag LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
720  *             cancellation and blocking ast's.
721  *
722  * Possible use cases:
723  *
724  *     - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING) to cancel
725  *       lock due to lock lru pressure, or explicit user request to purge
726  *       locks.
727  *
728  *     - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_BLOCKING) to notify
729  *       us that dlmlock conflicts with another lock that some client is
730  *       enqueing. Lock is canceled.
731  *
732  *           - cl_lock_cancel() is called. osc_lock_cancel() calls
733  *             ldlm_cli_cancel() that calls
734  *
735  *                  dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
736  *
737  *             recursively entering osc_ldlm_blocking_ast().
738  *
739  *     - client cancels lock voluntary (e.g., as a part of early cancellation):
740  *
741  *           cl_lock_cancel()->
742  *             osc_lock_cancel()->
743  *               ldlm_cli_cancel()->
744  *                 dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
745  *
746  */
747 static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
748                                  struct ldlm_lock_desc *new, void *data,
749                                  int flag)
750 {
751         struct lu_env     *env;
752         struct cl_env_nest nest;
753         int                result;
754
755         /*
756          * This can be called in the context of outer IO, e.g.,
757          *
758          *     cl_enqueue()->...
759          *       ->osc_enqueue_base()->...
760          *         ->ldlm_prep_elc_req()->...
761          *           ->ldlm_cancel_callback()->...
762          *             ->osc_ldlm_blocking_ast()
763          *
764          * new environment has to be created to not corrupt outer context.
765          */
766         env = cl_env_nested_get(&nest);
767         if (!IS_ERR(env)) {
768                 result = osc_dlm_blocking_ast0(env, dlmlock, data, flag);
769                 cl_env_nested_put(&nest, env);
770         } else {
771                 result = PTR_ERR(env);
772                 /*
773                  * XXX This should never happen, as cl_lock is
774                  * stuck. Pre-allocated environment a la vvp_inode_fini_env
775                  * should be used.
776                  */
777                 LBUG();
778         }
779         if (result != 0) {
780                 if (result == -ENODATA)
781                         result = 0;
782                 else
783                         CERROR("BAST failed: %d\n", result);
784         }
785         return result;
786 }
787
788 static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock,
789                                    int flags, void *data)
790 {
791         struct cl_env_nest nest;
792         struct lu_env     *env;
793         struct osc_lock   *olck;
794         struct cl_lock    *lock;
795         int result;
796         int dlmrc;
797
798         /* first, do dlm part of the work */
799         dlmrc = ldlm_completion_ast_async(dlmlock, flags, data);
800         /* then, notify cl_lock */
801         env = cl_env_nested_get(&nest);
802         if (!IS_ERR(env)) {
803                 olck = osc_ast_data_get(dlmlock);
804                 if (olck != NULL) {
805                         lock = olck->ols_cl.cls_lock;
806                         cl_lock_mutex_get(env, lock);
807                         /*
808                          * ldlm_handle_cp_callback() copied LVB from request
809                          * to lock->l_lvb_data, store it in osc_lock.
810                          */
811                         LASSERT(dlmlock->l_lvb_data != NULL);
812                         lock_res_and_lock(dlmlock);
813                         olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
814                         if (olck->ols_lock == NULL) {
815                                 /*
816                                  * upcall (osc_lock_upcall()) hasn't yet been
817                                  * called. Do nothing now, upcall will bind
818                                  * olck to dlmlock and signal the waiters.
819                                  *
820                                  * This maintains an invariant that osc_lock
821                                  * and ldlm_lock are always bound when
822                                  * osc_lock is in OLS_GRANTED state.
823                                  */
824                         } else if (dlmlock->l_granted_mode ==
825                                    dlmlock->l_req_mode) {
826                                 osc_lock_granted(env, olck, dlmlock, dlmrc);
827                         }
828                         unlock_res_and_lock(dlmlock);
829
830                         if (dlmrc != 0) {
831                                 CL_LOCK_DEBUG(D_ERROR, env, lock,
832                                               "dlmlock returned %d\n", dlmrc);
833                                 cl_lock_error(env, lock, dlmrc);
834                         }
835                         cl_lock_mutex_put(env, lock);
836                         osc_ast_data_put(env, olck);
837                         result = 0;
838                 } else
839                         result = -ELDLM_NO_LOCK_DATA;
840                 cl_env_nested_put(&nest, env);
841         } else
842                 result = PTR_ERR(env);
843         return dlmrc ?: result;
844 }
845
846 static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
847 {
848         struct ptlrpc_request  *req  = data;
849         struct osc_lock        *olck;
850         struct cl_lock         *lock;
851         struct cl_object       *obj;
852         struct cl_env_nest      nest;
853         struct lu_env          *env;
854         struct ost_lvb         *lvb;
855         struct req_capsule     *cap;
856         int                     result;
857
858         LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
859
860         env = cl_env_nested_get(&nest);
861         if (!IS_ERR(env)) {
862                 /*
863                  * osc_ast_data_get() has to go after environment is
864                  * allocated, because osc_ast_data() acquires a
865                  * reference to a lock, and it can only be released in
866                  * environment.
867                  */
868                 olck = osc_ast_data_get(dlmlock);
869                 if (olck != NULL) {
870                         lock = olck->ols_cl.cls_lock;
871                         cl_lock_mutex_get(env, lock);
872                         cap = &req->rq_pill;
873                         req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK);
874                         req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER,
875                                              sizeof *lvb);
876                         result = req_capsule_server_pack(cap);
877                         if (result == 0) {
878                                 lvb = req_capsule_server_get(cap, &RMF_DLM_LVB);
879                                 obj = lock->cll_descr.cld_obj;
880                                 result = cl_object_glimpse(env, obj, lvb);
881                         }
882                         cl_lock_mutex_put(env, lock);
883                         osc_ast_data_put(env, olck);
884                 } else {
885                         /*
886                          * These errors are normal races, so we don't want to
887                          * fill the console with messages by calling
888                          * ptlrpc_error()
889                          */
890                         lustre_pack_reply(req, 1, NULL, NULL);
891                         result = -ELDLM_NO_LOCK_DATA;
892                 }
893                 cl_env_nested_put(&nest, env);
894         } else
895                 result = PTR_ERR(env);
896         req->rq_status = result;
897         return result;
898 }
899
900 static unsigned long osc_lock_weigh(const struct lu_env *env,
901                                     const struct cl_lock_slice *slice)
902 {
903         /*
904          * don't need to grab coh_page_guard since we don't care the exact #
905          * of pages..
906          */
907         return cl_object_header(slice->cls_obj)->coh_pages;
908 }
909
910 /**
911  * Get the weight of dlm lock for early cancellation.
912  *
913  * XXX: it should return the pages covered by this \a dlmlock.
914  */
915 static unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
916 {
917         struct cl_env_nest       nest;
918         struct lu_env           *env;
919         struct osc_lock         *lock;
920         struct cl_lock          *cll;
921         unsigned long            weight;
922         ENTRY;
923
924         cfs_might_sleep();
925         /*
926          * osc_ldlm_weigh_ast has a complex context since it might be called
927          * because of lock canceling, or from user's input. We have to make
928          * a new environment for it. Probably it is implementation safe to use
929          * the upper context because cl_lock_put don't modify environment
930          * variables. But in case of ..
931          */
932         env = cl_env_nested_get(&nest);
933         if (IS_ERR(env))
934                 /* Mostly because lack of memory, tend to eliminate this lock*/
935                 RETURN(0);
936
937         LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT);
938         lock = osc_ast_data_get(dlmlock);
939         if (lock == NULL) {
940                 /* cl_lock was destroyed because of memory pressure.
941                  * It is much reasonable to assign this type of lock
942                  * a lower cost.
943                  */
944                 GOTO(out, weight = 0);
945         }
946
947         cll = lock->ols_cl.cls_lock;
948         cl_lock_mutex_get(env, cll);
949         weight = cl_lock_weigh(env, cll);
950         cl_lock_mutex_put(env, cll);
951         osc_ast_data_put(env, lock);
952         EXIT;
953
954 out:
955         cl_env_nested_put(&nest, env);
956         return weight;
957 }
958
959 static void osc_lock_build_einfo(const struct lu_env *env,
960                                  const struct cl_lock *clock,
961                                  struct osc_lock *lock,
962                                  struct ldlm_enqueue_info *einfo)
963 {
964         enum cl_lock_mode mode;
965
966         mode = clock->cll_descr.cld_mode;
967         if (mode == CLM_PHANTOM)
968                 /*
969                  * For now, enqueue all glimpse locks in read mode. In the
970                  * future, client might choose to enqueue LCK_PW lock for
971                  * glimpse on a file opened for write.
972                  */
973                 mode = CLM_READ;
974
975         einfo->ei_type   = LDLM_EXTENT;
976         einfo->ei_mode   = osc_cl_lock2ldlm(mode);
977         einfo->ei_cb_bl  = osc_ldlm_blocking_ast;
978         einfo->ei_cb_cp  = osc_ldlm_completion_ast;
979         einfo->ei_cb_gl  = osc_ldlm_glimpse_ast;
980         einfo->ei_cb_wg  = osc_ldlm_weigh_ast;
981         einfo->ei_cbdata = lock; /* value to be put into ->l_ast_data */
982 }
983
984 /**
985  * Determine if the lock should be converted into a lockless lock.
986  *
987  * Steps to check:
988  * - if the lock has an explicite requirment for a non-lockless lock;
989  * - if the io lock request type ci_lockreq;
990  * - send the enqueue rpc to ost to make the further decision;
991  * - special treat to truncate lockless lock
992  *
993  *  Additional policy can be implemented here, e.g., never do lockless-io
994  *  for large extents.
995  */
996 static void osc_lock_to_lockless(const struct lu_env *env,
997                                  struct osc_lock *ols, int force)
998 {
999         struct cl_lock_slice *slice = &ols->ols_cl;
1000         struct cl_lock *lock        = slice->cls_lock;
1001
1002         LASSERT(ols->ols_state == OLS_NEW ||
1003                 ols->ols_state == OLS_UPCALL_RECEIVED);
1004
1005         if (force) {
1006                 ols->ols_locklessable = 1;
1007                 LASSERT(cl_lock_is_mutexed(lock));
1008                 slice->cls_ops = &osc_lock_lockless_ops;
1009         } else {
1010                 struct osc_io *oio     = osc_env_io(env);
1011                 struct cl_io  *io      = oio->oi_cl.cis_io;
1012                 struct cl_object *obj  = slice->cls_obj;
1013                 struct osc_object *oob = cl2osc(obj);
1014                 const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
1015                 struct obd_connect_data *ocd;
1016
1017                 LASSERT(io->ci_lockreq == CILR_MANDATORY ||
1018                         io->ci_lockreq == CILR_MAYBE ||
1019                         io->ci_lockreq == CILR_NEVER);
1020
1021                 ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
1022                 ols->ols_locklessable = (io->ci_type != CIT_SETATTR) &&
1023                                 (io->ci_lockreq == CILR_MAYBE) &&
1024                                 (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
1025                 if (io->ci_lockreq == CILR_NEVER ||
1026                         /* lockless IO */
1027                     (ols->ols_locklessable && osc_object_is_contended(oob)) ||
1028                         /* lockless truncate */
1029                     (cl_io_is_trunc(io) &&
1030                      (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
1031                       osd->od_lockless_truncate)) {
1032                         ols->ols_locklessable = 1;
1033                         slice->cls_ops = &osc_lock_lockless_ops;
1034                 }
1035         }
1036         LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
1037 }
1038
1039 static int osc_lock_compatible(const struct osc_lock *qing,
1040                                const struct osc_lock *qed)
1041 {
1042         enum cl_lock_mode qing_mode;
1043         enum cl_lock_mode qed_mode;
1044
1045         qing_mode = qing->ols_cl.cls_lock->cll_descr.cld_mode;
1046         if (qed->ols_glimpse &&
1047             (qed->ols_state >= OLS_UPCALL_RECEIVED || qing_mode == CLM_READ))
1048                 return 1;
1049
1050         qed_mode = qed->ols_cl.cls_lock->cll_descr.cld_mode;
1051         return ((qing_mode == CLM_READ) && (qed_mode == CLM_READ));
1052 }
1053
1054 /**
1055  * Cancel all conflicting locks and wait for them to be destroyed.
1056  *
1057  * This function is used for two purposes:
1058  *
1059  *     - early cancel all conflicting locks before starting IO, and
1060  *
1061  *     - guarantee that pages added to the page cache by lockless IO are never
1062  *       covered by locks other than lockless IO lock, and, hence, are not
1063  *       visible to other threads.
1064  */
1065 static int osc_lock_enqueue_wait(const struct lu_env *env,
1066                                  const struct osc_lock *olck)
1067 {
1068         struct cl_lock          *lock    = olck->ols_cl.cls_lock;
1069         struct cl_lock_descr    *descr   = &lock->cll_descr;
1070         struct cl_object_header *hdr     = cl_object_header(descr->cld_obj);
1071         struct cl_lock          *scan;
1072         struct cl_lock          *conflict= NULL;
1073         int lockless                     = osc_lock_is_lockless(olck);
1074         int rc                           = 0;
1075         ENTRY;
1076
1077         LASSERT(cl_lock_is_mutexed(lock));
1078
1079         /* make it enqueue anyway for glimpse lock, because we actually
1080          * don't need to cancel any conflicting locks. */
1081         if (olck->ols_glimpse)
1082                 return 0;
1083
1084         cfs_spin_lock(&hdr->coh_lock_guard);
1085         cfs_list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) {
1086                 struct cl_lock_descr *cld = &scan->cll_descr;
1087                 const struct osc_lock *scan_ols;
1088
1089                 if (scan == lock)
1090                         break;
1091
1092                 if (scan->cll_state < CLS_QUEUING ||
1093                     scan->cll_state == CLS_FREEING ||
1094                     cld->cld_start > descr->cld_end ||
1095                     cld->cld_end < descr->cld_start)
1096                         continue;
1097
1098                 /* overlapped and living locks. */
1099
1100                 /* We're not supposed to give up group lock. */
1101                 if (scan->cll_descr.cld_mode == CLM_GROUP) {
1102                         LASSERT(descr->cld_mode != CLM_GROUP ||
1103                                 descr->cld_gid != scan->cll_descr.cld_gid);
1104                         continue;
1105                 }
1106
1107                 scan_ols = osc_lock_at(scan);
1108
1109                 /* We need to cancel the compatible locks if we're enqueuing
1110                  * a lockless lock, for example:
1111                  * imagine that client has PR lock on [0, 1000], and thread T0
1112                  * is doing lockless IO in [500, 1500] region. Concurrent
1113                  * thread T1 can see lockless data in [500, 1000], which is
1114                  * wrong, because these data are possibly stale. */
1115                 if (!lockless && osc_lock_compatible(olck, scan_ols))
1116                         continue;
1117
1118                 /* Now @scan is conflicting with @lock, this means current
1119                  * thread have to sleep for @scan being destroyed. */
1120                 if (scan_ols->ols_owner == osc_env_io(env)) {
1121                         CERROR("DEADLOCK POSSIBLE!\n");
1122                         CL_LOCK_DEBUG(D_ERROR, env, scan, "queued.\n");
1123                         CL_LOCK_DEBUG(D_ERROR, env, lock, "queuing.\n");
1124                         libcfs_debug_dumpstack(NULL);
1125                 }
1126                 cl_lock_get_trust(scan);
1127                 conflict = scan;
1128                 break;
1129         }
1130         cfs_spin_unlock(&hdr->coh_lock_guard);
1131
1132         if (conflict) {
1133                 if (lock->cll_descr.cld_mode == CLM_GROUP) {
1134                         /* we want a group lock but a previous lock request
1135                          * conflicts, we do not wait but return 0 so the
1136                          * request is send to the server
1137                          */
1138                         CDEBUG(D_DLMTRACE, "group lock %p is conflicted "
1139                                            "with %p, no wait, send to server\n",
1140                                lock, conflict);
1141                         cl_lock_put(env, conflict);
1142                         rc = 0;
1143                 } else {
1144                         CDEBUG(D_DLMTRACE, "lock %p is conflicted with %p, "
1145                                            "will wait\n",
1146                                lock, conflict);
1147                         LASSERT(lock->cll_conflict == NULL);
1148                         lu_ref_add(&conflict->cll_reference, "cancel-wait",
1149                                    lock);
1150                         lock->cll_conflict = conflict;
1151                         rc = CLO_WAIT;
1152                 }
1153         }
1154         RETURN(rc);
1155 }
1156
1157 /**
1158  * Implementation of cl_lock_operations::clo_enqueue() method for osc
1159  * layer. This initiates ldlm enqueue:
1160  *
1161  *     - cancels conflicting locks early (osc_lock_enqueue_wait());
1162  *
1163  *     - calls osc_enqueue_base() to do actual enqueue.
1164  *
1165  * osc_enqueue_base() is supplied with an upcall function that is executed
1166  * when lock is received either after a local cached ldlm lock is matched, or
1167  * when a reply from the server is received.
1168  *
1169  * This function does not wait for the network communication to complete.
1170  */
1171 static int osc_lock_enqueue(const struct lu_env *env,
1172                             const struct cl_lock_slice *slice,
1173                             struct cl_io *unused, __u32 enqflags)
1174 {
1175         struct osc_lock          *ols     = cl2osc_lock(slice);
1176         struct cl_lock           *lock    = ols->ols_cl.cls_lock;
1177         int result;
1178         ENTRY;
1179
1180         LASSERT(cl_lock_is_mutexed(lock));
1181         LASSERTF(ols->ols_state == OLS_NEW,
1182                  "Impossible state: %d\n", ols->ols_state);
1183
1184         ols->ols_flags = osc_enq2ldlm_flags(enqflags);
1185         if (enqflags & CEF_AGL) {
1186                 ols->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
1187                 ols->ols_agl = 1;
1188         }
1189         if (ols->ols_flags & LDLM_FL_HAS_INTENT)
1190                 ols->ols_glimpse = 1;
1191         if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST))
1192                 /* try to convert this lock to a lockless lock */
1193                 osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER));
1194
1195         result = osc_lock_enqueue_wait(env, ols);
1196         if (result == 0) {
1197                 if (!osc_lock_is_lockless(ols)) {
1198                         struct osc_object        *obj = cl2osc(slice->cls_obj);
1199                         struct osc_thread_info   *info = osc_env_info(env);
1200                         struct ldlm_res_id       *resname = &info->oti_resname;
1201                         ldlm_policy_data_t       *policy = &info->oti_policy;
1202                         struct ldlm_enqueue_info *einfo = &ols->ols_einfo;
1203
1204                         if (ols->ols_locklessable)
1205                                 ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
1206
1207                         /* a reference for lock, passed as an upcall cookie */
1208                         cl_lock_get(lock);
1209                         lu_ref_add(&lock->cll_reference, "upcall", lock);
1210                         /* a user for lock also */
1211                         cl_lock_user_add(env, lock);
1212                         ols->ols_state = OLS_ENQUEUED;
1213
1214                         /*
1215                          * XXX: this is possible blocking point as
1216                          * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
1217                          * LDLM_CP_CALLBACK.
1218                          */
1219                         osc_lock_build_res(env, obj, resname);
1220                         osc_lock_build_policy(env, lock, policy);
1221                         result = osc_enqueue_base(osc_export(obj), resname,
1222                                           &ols->ols_flags, policy,
1223                                           &ols->ols_lvb,
1224                                           obj->oo_oinfo->loi_kms_valid,
1225                                           osc_lock_upcall,
1226                                           ols, einfo, &ols->ols_handle,
1227                                           PTLRPCD_SET, 1, ols->ols_agl);
1228                         if (result != 0) {
1229                                 cl_lock_user_del(env, lock);
1230                                 lu_ref_del(&lock->cll_reference,
1231                                            "upcall", lock);
1232                                 cl_lock_put(env, lock);
1233                                 if (unlikely(result == -ECANCELED)) {
1234                                         ols->ols_state = OLS_NEW;
1235                                         result = 0;
1236                                 }
1237                         }
1238                 } else {
1239                         ols->ols_state = OLS_GRANTED;
1240                         ols->ols_owner = osc_env_io(env);
1241                 }
1242         }
1243         LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
1244         RETURN(result);
1245 }
1246
1247 static int osc_lock_wait(const struct lu_env *env,
1248                          const struct cl_lock_slice *slice)
1249 {
1250         struct osc_lock *olck = cl2osc_lock(slice);
1251         struct cl_lock  *lock = olck->ols_cl.cls_lock;
1252
1253         LINVRNT(osc_lock_invariant(olck));
1254
1255         if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) {
1256                 if (olck->ols_flags & LDLM_FL_LVB_READY) {
1257                         return 0;
1258                 } else if (olck->ols_agl) {
1259                         if (lock->cll_flags & CLF_FROM_UPCALL)
1260                                 /* It is from enqueue RPC reply upcall for
1261                                  * updating state. Do not re-enqueue. */
1262                                 return -ENAVAIL;
1263                         else
1264                                 olck->ols_state = OLS_NEW;
1265                 } else {
1266                         LASSERT(lock->cll_error);
1267                         return lock->cll_error;
1268                 }
1269         }
1270
1271         if (olck->ols_state == OLS_NEW) {
1272                 int rc;
1273
1274                 LASSERT(olck->ols_agl);
1275
1276                 rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC | CEF_MUST);
1277                 if (rc != 0)
1278                         return rc;
1279                 else
1280                         return CLO_REENQUEUED;
1281         }
1282
1283         LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED &&
1284                      lock->cll_error == 0, olck->ols_lock != NULL));
1285
1286         return lock->cll_error ?: olck->ols_state >= OLS_GRANTED ? 0 : CLO_WAIT;
1287 }
1288
1289 /**
1290  * An implementation of cl_lock_operations::clo_use() method that pins cached
1291  * lock.
1292  */
1293 static int osc_lock_use(const struct lu_env *env,
1294                         const struct cl_lock_slice *slice)
1295 {
1296         struct osc_lock *olck = cl2osc_lock(slice);
1297         int rc;
1298
1299         LASSERT(!olck->ols_hold);
1300
1301         /*
1302          * Atomically check for LDLM_FL_CBPENDING and addref a lock if this
1303          * flag is not set. This protects us from a concurrent blocking ast.
1304          */
1305         rc = ldlm_lock_addref_try(&olck->ols_handle, olck->ols_einfo.ei_mode);
1306         if (rc == 0) {
1307                 olck->ols_hold = 1;
1308                 olck->ols_state = OLS_GRANTED;
1309         } else {
1310                 struct cl_lock *lock;
1311
1312                 /*
1313                  * Lock is being cancelled somewhere within
1314                  * ldlm_handle_bl_callback(): LDLM_FL_CBPENDING is already
1315                  * set, but osc_ldlm_blocking_ast() hasn't yet acquired
1316                  * cl_lock mutex.
1317                  */
1318                 lock = slice->cls_lock;
1319                 LASSERT(lock->cll_state == CLS_INTRANSIT);
1320                 LASSERT(lock->cll_users > 0);
1321                 /* set a flag for osc_dlm_blocking_ast0() to signal the
1322                  * lock.*/
1323                 olck->ols_ast_wait = 1;
1324                 rc = CLO_WAIT;
1325         }
1326         return rc;
1327 }
1328
1329 static int osc_lock_flush(struct osc_lock *ols, int discard)
1330 {
1331         struct cl_lock       *lock  = ols->ols_cl.cls_lock;
1332         struct cl_env_nest    nest;
1333         struct lu_env        *env;
1334         int result = 0;
1335
1336         env = cl_env_nested_get(&nest);
1337         if (!IS_ERR(env)) {
1338                 result = cl_lock_page_out(env, lock, discard);
1339                 cl_env_nested_put(&nest, env);
1340         } else
1341                 result = PTR_ERR(env);
1342         if (result == 0) {
1343                 ols->ols_flush = 1;
1344                 LINVRNT(!osc_lock_has_pages(ols));
1345         }
1346         return result;
1347 }
1348
1349 /**
1350  * Implements cl_lock_operations::clo_cancel() method for osc layer. This is
1351  * called (as part of cl_lock_cancel()) when lock is canceled either voluntary
1352  * (LRU pressure, early cancellation, umount, etc.) or due to the conflict
1353  * with some other lock some where in the cluster. This function does the
1354  * following:
1355  *
1356  *     - invalidates all pages protected by this lock (after sending dirty
1357  *       ones to the server, as necessary);
1358  *
1359  *     - decref's underlying ldlm lock;
1360  *
1361  *     - cancels ldlm lock (ldlm_cli_cancel()).
1362  */
1363 static void osc_lock_cancel(const struct lu_env *env,
1364                             const struct cl_lock_slice *slice)
1365 {
1366         struct cl_lock   *lock    = slice->cls_lock;
1367         struct osc_lock  *olck    = cl2osc_lock(slice);
1368         struct ldlm_lock *dlmlock = olck->ols_lock;
1369         int               result  = 0;
1370         int               discard;
1371
1372         LASSERT(cl_lock_is_mutexed(lock));
1373         LINVRNT(osc_lock_invariant(olck));
1374
1375         if (dlmlock != NULL) {
1376                 int do_cancel;
1377
1378                 discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA);
1379                 result = osc_lock_flush(olck, discard);
1380                 osc_lock_unhold(olck);
1381
1382                 lock_res_and_lock(dlmlock);
1383                 /* Now that we're the only user of dlm read/write reference,
1384                  * mostly the ->l_readers + ->l_writers should be zero.
1385                  * However, there is a corner case.
1386                  * See bug 18829 for details.*/
1387                 do_cancel = (dlmlock->l_readers == 0 &&
1388                              dlmlock->l_writers == 0);
1389                 dlmlock->l_flags |= LDLM_FL_CBPENDING;
1390                 unlock_res_and_lock(dlmlock);
1391                 if (do_cancel)
1392                         result = ldlm_cli_cancel(&olck->ols_handle);
1393                 if (result < 0)
1394                         CL_LOCK_DEBUG(D_ERROR, env, lock,
1395                                       "lock %p cancel failure with error(%d)\n",
1396                                       lock, result);
1397         }
1398         olck->ols_state = OLS_CANCELLED;
1399         olck->ols_flags &= ~LDLM_FL_LVB_READY;
1400         osc_lock_detach(env, olck);
1401 }
1402
1403 #ifdef INVARIANT_CHECK
1404 static int check_cb(const struct lu_env *env, struct cl_io *io,
1405                     struct cl_page *page, void *cbdata)
1406 {
1407         struct cl_lock *lock = cbdata;
1408
1409         if (lock->cll_descr.cld_mode == CLM_READ) {
1410                 struct cl_lock *tmp;
1411                 tmp = cl_lock_at_page(env, lock->cll_descr.cld_obj,
1412                                      page, lock, 1, 0);
1413                 if (tmp != NULL) {
1414                         cl_lock_put(env, tmp);
1415                         return CLP_GANG_OKAY;
1416                 }
1417         }
1418
1419         CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n");
1420         CL_PAGE_DEBUG(D_ERROR, env, page, "\n");
1421         return CLP_GANG_ABORT;
1422 }
1423
1424 /**
1425  * Returns true iff there are pages under \a olck not protected by other
1426  * locks.
1427  */
1428 static int osc_lock_has_pages(struct osc_lock *olck)
1429 {
1430         struct cl_lock       *lock;
1431         struct cl_lock_descr *descr;
1432         struct cl_object     *obj;
1433         struct osc_object    *oob;
1434         struct cl_env_nest    nest;
1435         struct cl_io         *io;
1436         struct lu_env        *env;
1437         int                   result;
1438
1439         env = cl_env_nested_get(&nest);
1440         if (IS_ERR(env))
1441                 return 0;
1442
1443         obj   = olck->ols_cl.cls_obj;
1444         oob   = cl2osc(obj);
1445         io    = &oob->oo_debug_io;
1446         lock  = olck->ols_cl.cls_lock;
1447         descr = &lock->cll_descr;
1448
1449         cfs_mutex_lock(&oob->oo_debug_mutex);
1450
1451         io->ci_obj = cl_object_top(obj);
1452         cl_io_init(env, io, CIT_MISC, io->ci_obj);
1453         do {
1454                 result = cl_page_gang_lookup(env, obj, io,
1455                                              descr->cld_start, descr->cld_end,
1456                                              check_cb, (void *)lock);
1457                 if (result == CLP_GANG_ABORT)
1458                         break;
1459                 if (result == CLP_GANG_RESCHED)
1460                         cfs_cond_resched();
1461         } while (result != CLP_GANG_OKAY);
1462         cl_io_fini(env, io);
1463         cfs_mutex_unlock(&oob->oo_debug_mutex);
1464         cl_env_nested_put(&nest, env);
1465
1466         return (result == CLP_GANG_ABORT);
1467 }
1468 #else
1469 static int osc_lock_has_pages(struct osc_lock *olck)
1470 {
1471         return 0;
1472 }
1473 #endif /* INVARIANT_CHECK */
1474
1475 static void osc_lock_delete(const struct lu_env *env,
1476                             const struct cl_lock_slice *slice)
1477 {
1478         struct osc_lock *olck;
1479
1480         olck = cl2osc_lock(slice);
1481         if (olck->ols_glimpse) {
1482                 LASSERT(!olck->ols_hold);
1483                 LASSERT(!olck->ols_lock);
1484                 return;
1485         }
1486
1487         LINVRNT(osc_lock_invariant(olck));
1488         LINVRNT(!osc_lock_has_pages(olck));
1489
1490         osc_lock_unhold(olck);
1491         osc_lock_detach(env, olck);
1492 }
1493
1494 /**
1495  * Implements cl_lock_operations::clo_state() method for osc layer.
1496  *
1497  * Maintains osc_lock::ols_owner field.
1498  *
1499  * This assumes that lock always enters CLS_HELD (from some other state) in
1500  * the same IO context as one that requested the lock. This should not be a
1501  * problem, because context is by definition shared by all activity pertaining
1502  * to the same high-level IO.
1503  */
1504 static void osc_lock_state(const struct lu_env *env,
1505                            const struct cl_lock_slice *slice,
1506                            enum cl_lock_state state)
1507 {
1508         struct osc_lock *lock = cl2osc_lock(slice);
1509
1510         /*
1511          * XXX multiple io contexts can use the lock at the same time.
1512          */
1513         LINVRNT(osc_lock_invariant(lock));
1514         if (state == CLS_HELD && slice->cls_lock->cll_state != CLS_HELD) {
1515                 struct osc_io *oio = osc_env_io(env);
1516
1517                 LASSERT(lock->ols_owner == NULL);
1518                 lock->ols_owner = oio;
1519         } else if (state != CLS_HELD)
1520                 lock->ols_owner = NULL;
1521 }
1522
1523 static int osc_lock_print(const struct lu_env *env, void *cookie,
1524                           lu_printer_t p, const struct cl_lock_slice *slice)
1525 {
1526         struct osc_lock *lock = cl2osc_lock(slice);
1527
1528         /*
1529          * XXX print ldlm lock and einfo properly.
1530          */
1531         (*p)(env, cookie, "%p %08x "LPX64" %d %p ",
1532              lock->ols_lock, lock->ols_flags, lock->ols_handle.cookie,
1533              lock->ols_state, lock->ols_owner);
1534         osc_lvb_print(env, cookie, p, &lock->ols_lvb);
1535         return 0;
1536 }
1537
1538 static int osc_lock_fits_into(const struct lu_env *env,
1539                               const struct cl_lock_slice *slice,
1540                               const struct cl_lock_descr *need,
1541                               const struct cl_io *io)
1542 {
1543         struct osc_lock *ols = cl2osc_lock(slice);
1544
1545         if (need->cld_enq_flags & CEF_NEVER)
1546                 return 0;
1547
1548         if (need->cld_mode == CLM_PHANTOM) {
1549                 if (ols->ols_agl)
1550                         return !(ols->ols_state > OLS_RELEASED);
1551
1552                 /*
1553                  * Note: the QUEUED lock can't be matched here, otherwise
1554                  * it might cause the deadlocks.
1555                  * In read_process,
1556                  * P1: enqueued read lock, create sublock1
1557                  * P2: enqueued write lock, create sublock2(conflicted
1558                  *     with sublock1).
1559                  * P1: Grant read lock.
1560                  * P1: enqueued glimpse lock(with holding sublock1_read),
1561                  *     matched with sublock2, waiting sublock2 to be granted.
1562                  *     But sublock2 can not be granted, because P1
1563                  *     will not release sublock1. Bang!
1564                  */
1565                 if (ols->ols_state < OLS_GRANTED ||
1566                     ols->ols_state > OLS_RELEASED)
1567                         return 0;
1568         } else if (need->cld_enq_flags & CEF_MUST) {
1569                 /*
1570                  * If the lock hasn't ever enqueued, it can't be matched
1571                  * because enqueue process brings in many information
1572                  * which can be used to determine things such as lockless,
1573                  * CEF_MUST, etc.
1574                  */
1575                 if (ols->ols_state < OLS_UPCALL_RECEIVED &&
1576                     ols->ols_locklessable)
1577                         return 0;
1578         }
1579         return 1;
1580 }
1581
1582 static const struct cl_lock_operations osc_lock_ops = {
1583         .clo_fini    = osc_lock_fini,
1584         .clo_enqueue = osc_lock_enqueue,
1585         .clo_wait    = osc_lock_wait,
1586         .clo_unuse   = osc_lock_unuse,
1587         .clo_use     = osc_lock_use,
1588         .clo_delete  = osc_lock_delete,
1589         .clo_state   = osc_lock_state,
1590         .clo_cancel  = osc_lock_cancel,
1591         .clo_weigh   = osc_lock_weigh,
1592         .clo_print   = osc_lock_print,
1593         .clo_fits_into = osc_lock_fits_into,
1594 };
1595
1596 static int osc_lock_lockless_unuse(const struct lu_env *env,
1597                                    const struct cl_lock_slice *slice)
1598 {
1599         struct osc_lock *ols = cl2osc_lock(slice);
1600         struct cl_lock *lock = slice->cls_lock;
1601
1602         LASSERT(ols->ols_state == OLS_GRANTED);
1603         LINVRNT(osc_lock_invariant(ols));
1604
1605         cl_lock_cancel(env, lock);
1606         cl_lock_delete(env, lock);
1607         return 0;
1608 }
1609
1610 static void osc_lock_lockless_cancel(const struct lu_env *env,
1611                                      const struct cl_lock_slice *slice)
1612 {
1613         struct osc_lock   *ols  = cl2osc_lock(slice);
1614         int result;
1615
1616         result = osc_lock_flush(ols, 0);
1617         if (result)
1618                 CERROR("Pages for lockless lock %p were not purged(%d)\n",
1619                        ols, result);
1620         ols->ols_state = OLS_CANCELLED;
1621 }
1622
1623 static int osc_lock_lockless_wait(const struct lu_env *env,
1624                                   const struct cl_lock_slice *slice)
1625 {
1626         struct osc_lock *olck = cl2osc_lock(slice);
1627         struct cl_lock  *lock = olck->ols_cl.cls_lock;
1628
1629         LINVRNT(osc_lock_invariant(olck));
1630         LASSERT(olck->ols_state >= OLS_UPCALL_RECEIVED);
1631
1632         return lock->cll_error;
1633 }
1634
1635 static void osc_lock_lockless_state(const struct lu_env *env,
1636                                     const struct cl_lock_slice *slice,
1637                                     enum cl_lock_state state)
1638 {
1639         struct osc_lock *lock = cl2osc_lock(slice);
1640
1641         LINVRNT(osc_lock_invariant(lock));
1642         if (state == CLS_HELD) {
1643                 struct osc_io *oio  = osc_env_io(env);
1644
1645                 LASSERT(ergo(lock->ols_owner, lock->ols_owner == oio));
1646                 lock->ols_owner = oio;
1647
1648                 /* set the io to be lockless if this lock is for io's
1649                  * host object */
1650                 if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
1651                         oio->oi_lockless = 1;
1652         }
1653 }
1654
1655 static int osc_lock_lockless_fits_into(const struct lu_env *env,
1656                                        const struct cl_lock_slice *slice,
1657                                        const struct cl_lock_descr *need,
1658                                        const struct cl_io *io)
1659 {
1660         struct osc_lock *lock = cl2osc_lock(slice);
1661
1662         if (!(need->cld_enq_flags & CEF_NEVER))
1663                 return 0;
1664
1665         /* lockless lock should only be used by its owning io. b22147 */
1666         return (lock->ols_owner == osc_env_io(env));
1667 }
1668
1669 static const struct cl_lock_operations osc_lock_lockless_ops = {
1670         .clo_fini      = osc_lock_fini,
1671         .clo_enqueue   = osc_lock_enqueue,
1672         .clo_wait      = osc_lock_lockless_wait,
1673         .clo_unuse     = osc_lock_lockless_unuse,
1674         .clo_state     = osc_lock_lockless_state,
1675         .clo_fits_into = osc_lock_lockless_fits_into,
1676         .clo_cancel    = osc_lock_lockless_cancel,
1677         .clo_print     = osc_lock_print
1678 };
1679
1680 int osc_lock_init(const struct lu_env *env,
1681                   struct cl_object *obj, struct cl_lock *lock,
1682                   const struct cl_io *unused)
1683 {
1684         struct osc_lock *clk;
1685         int result;
1686
1687         OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO);
1688         if (clk != NULL) {
1689                 osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
1690                 cfs_atomic_set(&clk->ols_pageref, 0);
1691                 clk->ols_state = OLS_NEW;
1692                 cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
1693                 result = 0;
1694         } else
1695                 result = -ENOMEM;
1696         return result;
1697 }
1698
1699 int osc_dlm_lock_pageref(struct ldlm_lock *dlm)
1700 {
1701         struct osc_lock *olock;
1702         int              rc = 0;
1703
1704         cfs_spin_lock(&osc_ast_guard);
1705         olock = dlm->l_ast_data;
1706         /*
1707          * there's a very rare race with osc_page_addref_lock(), but that
1708          * doesn't matter because in the worst case we don't cancel a lock
1709          * which we actually can, that's no harm.
1710          */
1711         if (olock != NULL &&
1712             cfs_atomic_add_return(_PAGEREF_MAGIC,
1713                                   &olock->ols_pageref) != _PAGEREF_MAGIC) {
1714                 cfs_atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref);
1715                 rc = 1;
1716         }
1717         cfs_spin_unlock(&osc_ast_guard);
1718         return rc;
1719 }
1720
1721 /** @} osc */