Whamcloud - gitweb
LU-2675 build: assume __linux__ and __KERNEL__
[fs/lustre-release.git] / lustre / osc / osc_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_lock for OSC layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OSC
43
44 #include <libcfs/libcfs.h>
45 /* fid_build_reg_res_name() */
46 #include <lustre_fid.h>
47
48 #include "osc_cl_internal.h"
49
50 /** \addtogroup osc
51  *  @{
52  */
53
54 /*****************************************************************************
55  *
56  * Type conversions.
57  *
58  */
59
60 static const struct cl_lock_operations osc_lock_ops;
61 static const struct cl_lock_operations osc_lock_lockless_ops;
62 static void osc_lock_to_lockless(const struct lu_env *env,
63                                  struct osc_lock *ols, int force);
64 static bool osc_lock_has_pages(struct osc_lock *olck);
65
66 int osc_lock_is_lockless(const struct osc_lock *olck)
67 {
68         return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops);
69 }
70
71 /**
72  * Returns a weak pointer to the ldlm lock identified by a handle. Returned
73  * pointer cannot be dereferenced, as lock is not protected from concurrent
74  * reclaim. This function is a helper for osc_lock_invariant().
75  */
76 static struct ldlm_lock *osc_handle_ptr(struct lustre_handle *handle)
77 {
78         struct ldlm_lock *lock;
79
80         lock = ldlm_handle2lock(handle);
81         if (lock != NULL)
82                 LDLM_LOCK_PUT(lock);
83         return lock;
84 }
85
86 /**
87  * Invariant that has to be true all of the time.
88  */
89 static int osc_lock_invariant(struct osc_lock *ols)
90 {
91         struct ldlm_lock *lock        = osc_handle_ptr(&ols->ols_handle);
92         struct ldlm_lock *olock       = ols->ols_lock;
93         int               handle_used = lustre_handle_is_used(&ols->ols_handle);
94
95         if (ergo(osc_lock_is_lockless(ols),
96                  ols->ols_locklessable && ols->ols_lock == NULL))
97                 return 1;
98
99         /*
100          * If all the following "ergo"s are true, return 1, otherwise 0
101          */
102         if (! ergo(olock != NULL, handle_used))
103                 return 0;
104
105         if (! ergo(olock != NULL,
106                    olock->l_handle.h_cookie == ols->ols_handle.cookie))
107                 return 0;
108
109         if (! ergo(handle_used,
110                    ergo(lock != NULL && olock != NULL, lock == olock) &&
111                    ergo(lock == NULL, olock == NULL)))
112                 return 0;
113         /*
114          * Check that ->ols_handle and ->ols_lock are consistent, but
115          * take into account that they are set at the different time.
116          */
117         if (! ergo(ols->ols_state == OLS_CANCELLED,
118                    olock == NULL && !handle_used))
119                 return 0;
120         /*
121          * DLM lock is destroyed only after we have seen cancellation
122          * ast.
123          */
124         if (! ergo(olock != NULL && ols->ols_state < OLS_CANCELLED,
125                    !ldlm_is_destroyed(olock)))
126                 return 0;
127
128         if (! ergo(ols->ols_state == OLS_GRANTED,
129                    olock != NULL &&
130                    olock->l_req_mode == olock->l_granted_mode &&
131                    ols->ols_hold))
132                 return 0;
133         return 1;
134 }
135
136 /*****************************************************************************
137  *
138  * Lock operations.
139  *
140  */
141
142 /**
143  * Breaks a link between osc_lock and dlm_lock.
144  */
145 static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck)
146 {
147         struct ldlm_lock *dlmlock;
148
149         spin_lock(&osc_ast_guard);
150         dlmlock = olck->ols_lock;
151         if (dlmlock == NULL) {
152                 spin_unlock(&osc_ast_guard);
153                 return;
154         }
155
156         olck->ols_lock = NULL;
157         /* wb(); --- for all who checks (ols->ols_lock != NULL) before
158          * call to osc_lock_detach() */
159         dlmlock->l_ast_data = NULL;
160         olck->ols_handle.cookie = 0ULL;
161         spin_unlock(&osc_ast_guard);
162
163         lock_res_and_lock(dlmlock);
164         if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
165                 struct cl_object *obj = olck->ols_cl.cls_obj;
166                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
167                 __u64 old_kms;
168
169                 cl_object_attr_lock(obj);
170                 /* Must get the value under the lock to avoid possible races. */
171                 old_kms = cl2osc(obj)->oo_oinfo->loi_kms;
172                 /* Update the kms. Need to loop all granted locks.
173                  * Not a problem for the client */
174                 attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms);
175
176                 cl_object_attr_set(env, obj, attr, CAT_KMS);
177                 cl_object_attr_unlock(obj);
178         }
179         unlock_res_and_lock(dlmlock);
180
181         /* release a reference taken in osc_lock_upcall0(). */
182         LASSERT(olck->ols_has_ref);
183         lu_ref_del(&dlmlock->l_reference, "osc_lock", olck);
184         LDLM_LOCK_RELEASE(dlmlock);
185         olck->ols_has_ref = 0;
186 }
187
188 static int osc_lock_unhold(struct osc_lock *ols)
189 {
190         int result = 0;
191
192         if (ols->ols_hold) {
193                 ols->ols_hold = 0;
194                 result = osc_cancel_base(&ols->ols_handle,
195                                          ols->ols_einfo.ei_mode);
196         }
197         return result;
198 }
199
200 static int osc_lock_unuse(const struct lu_env *env,
201                           const struct cl_lock_slice *slice)
202 {
203         struct osc_lock *ols = cl2osc_lock(slice);
204
205         LINVRNT(osc_lock_invariant(ols));
206
207         switch (ols->ols_state) {
208         case OLS_NEW:
209                 LASSERT(!ols->ols_hold);
210                 LASSERT(ols->ols_agl);
211                 return 0;
212         case OLS_UPCALL_RECEIVED:
213                 osc_lock_unhold(ols);
214         case OLS_ENQUEUED:
215                 LASSERT(!ols->ols_hold);
216                 osc_lock_detach(env, ols);
217                 ols->ols_state = OLS_NEW;
218                 return 0;
219         case OLS_GRANTED:
220                 LASSERT(!ols->ols_glimpse);
221                 LASSERT(ols->ols_hold);
222                 /*
223                  * Move lock into OLS_RELEASED state before calling
224                  * osc_cancel_base() so that possible synchronous cancellation
225                  * (that always happens e.g., for liblustre) sees that lock is
226                  * released.
227                  */
228                 ols->ols_state = OLS_RELEASED;
229                 return osc_lock_unhold(ols);
230         default:
231                 CERROR("Impossible state: %d\n", ols->ols_state);
232                 LBUG();
233         }
234 }
235
236 static void osc_lock_fini(const struct lu_env *env,
237                           struct cl_lock_slice *slice)
238 {
239         struct osc_lock  *ols = cl2osc_lock(slice);
240
241         LINVRNT(osc_lock_invariant(ols));
242         /*
243          * ->ols_hold can still be true at this point if, for example, a
244          * thread that requested a lock was killed (and released a reference
245          * to the lock), before reply from a server was received. In this case
246          * lock is destroyed immediately after upcall.
247          */
248         osc_lock_unhold(ols);
249         LASSERT(ols->ols_lock == NULL);
250         OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
251 }
252
253 static void osc_lock_build_policy(const struct lu_env *env,
254                                   const struct cl_lock *lock,
255                                   ldlm_policy_data_t *policy)
256 {
257         const struct cl_lock_descr *d = &lock->cll_descr;
258
259         osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
260         policy->l_extent.gid = d->cld_gid;
261 }
262
263 static __u64 osc_enq2ldlm_flags(__u32 enqflags)
264 {
265         __u64 result = 0;
266
267         LASSERT((enqflags & ~CEF_MASK) == 0);
268
269         if (enqflags & CEF_NONBLOCK)
270                 result |= LDLM_FL_BLOCK_NOWAIT;
271         if (enqflags & CEF_ASYNC)
272                 result |= LDLM_FL_HAS_INTENT;
273         if (enqflags & CEF_DISCARD_DATA)
274                 result |= LDLM_FL_AST_DISCARD_DATA;
275         return result;
276 }
277
278 /**
279  * Global spin-lock protecting consistency of ldlm_lock::l_ast_data
280  * pointers. Initialized in osc_init().
281  */
282 spinlock_t osc_ast_guard;
283
284 static struct osc_lock *osc_ast_data_get(struct ldlm_lock *dlm_lock)
285 {
286         struct osc_lock *olck;
287
288         lock_res_and_lock(dlm_lock);
289         spin_lock(&osc_ast_guard);
290         olck = dlm_lock->l_ast_data;
291         if (olck != NULL) {
292                 struct cl_lock *lock = olck->ols_cl.cls_lock;
293                 /*
294                  * If osc_lock holds a reference on ldlm lock, return it even
295                  * when cl_lock is in CLS_FREEING state. This way
296                  *
297                  *         osc_ast_data_get(dlmlock) == NULL
298                  *
299                  * guarantees that all osc references on dlmlock were
300                  * released. osc_dlm_blocking_ast0() relies on that.
301                  */
302                 if (lock->cll_state < CLS_FREEING || olck->ols_has_ref) {
303                         cl_lock_get_trust(lock);
304                         lu_ref_add_atomic(&lock->cll_reference,
305                                           "ast", current);
306                 } else
307                         olck = NULL;
308         }
309         spin_unlock(&osc_ast_guard);
310         unlock_res_and_lock(dlm_lock);
311         return olck;
312 }
313
314 static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck)
315 {
316         struct cl_lock *lock;
317
318         lock = olck->ols_cl.cls_lock;
319         lu_ref_del(&lock->cll_reference, "ast", current);
320         cl_lock_put(env, lock);
321 }
322
323 /**
324  * Updates object attributes from a lock value block (lvb) received together
325  * with the DLM lock reply from the server. Copy of osc_update_enqueue()
326  * logic.
327  *
328  * This can be optimized to not update attributes when lock is a result of a
329  * local match.
330  *
331  * Called under lock and resource spin-locks.
332  */
333 static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
334                                 int rc)
335 {
336         struct ost_lvb    *lvb;
337         struct cl_object  *obj;
338         struct lov_oinfo  *oinfo;
339         struct cl_attr    *attr;
340         unsigned           valid;
341
342         ENTRY;
343
344         if (!(olck->ols_flags & LDLM_FL_LVB_READY))
345                 RETURN_EXIT;
346
347         lvb   = &olck->ols_lvb;
348         obj   = olck->ols_cl.cls_obj;
349         oinfo = cl2osc(obj)->oo_oinfo;
350         attr  = &osc_env_info(env)->oti_attr;
351         valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE;
352         cl_lvb2attr(attr, lvb);
353
354         cl_object_attr_lock(obj);
355         if (rc == 0) {
356                 struct ldlm_lock  *dlmlock;
357                 __u64 size;
358
359                 dlmlock = olck->ols_lock;
360                 LASSERT(dlmlock != NULL);
361
362                 /* re-grab LVB from a dlm lock under DLM spin-locks. */
363                 *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
364                 size = lvb->lvb_size;
365                 /* Extend KMS up to the end of this lock and no further
366                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
367                 if (size > dlmlock->l_policy_data.l_extent.end)
368                         size = dlmlock->l_policy_data.l_extent.end + 1;
369                 if (size >= oinfo->loi_kms) {
370                         LDLM_DEBUG(dlmlock, "lock acquired, setting rss="LPU64
371                                    ", kms="LPU64, lvb->lvb_size, size);
372                         valid |= CAT_KMS;
373                         attr->cat_kms = size;
374                 } else {
375                         LDLM_DEBUG(dlmlock, "lock acquired, setting rss="
376                                    LPU64"; leaving kms="LPU64", end="LPU64,
377                                    lvb->lvb_size, oinfo->loi_kms,
378                                    dlmlock->l_policy_data.l_extent.end);
379                 }
380                 ldlm_lock_allow_match_locked(dlmlock);
381         } else if (rc == -ENAVAIL && olck->ols_glimpse) {
382                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
383                        " kms="LPU64"\n", lvb->lvb_size, oinfo->loi_kms);
384         } else
385                 valid = 0;
386
387         if (valid != 0)
388                 cl_object_attr_set(env, obj, attr, valid);
389
390         cl_object_attr_unlock(obj);
391
392         EXIT;
393 }
394
395 /**
396  * Called when a lock is granted, from an upcall (when server returned a
397  * granted lock), or from completion AST, when server returned a blocked lock.
398  *
399  * Called under lock and resource spin-locks, that are released temporarily
400  * here.
401  */
402 static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
403                              struct ldlm_lock *dlmlock, int rc)
404 {
405         struct ldlm_extent   *ext;
406         struct cl_lock       *lock;
407         struct cl_lock_descr *descr;
408
409         LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode);
410
411         ENTRY;
412         if (olck->ols_state < OLS_GRANTED) {
413                 lock  = olck->ols_cl.cls_lock;
414                 ext   = &dlmlock->l_policy_data.l_extent;
415                 descr = &osc_env_info(env)->oti_descr;
416                 descr->cld_obj = lock->cll_descr.cld_obj;
417
418                 /* XXX check that ->l_granted_mode is valid. */
419                 descr->cld_mode  = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
420                 descr->cld_start = cl_index(descr->cld_obj, ext->start);
421                 descr->cld_end   = cl_index(descr->cld_obj, ext->end);
422                 descr->cld_gid   = ext->gid;
423                 /*
424                  * tell upper layers the extent of the lock that was actually
425                  * granted
426                  */
427                 olck->ols_state = OLS_GRANTED;
428                 osc_lock_lvb_update(env, olck, rc);
429
430                 /* release DLM spin-locks to allow cl_lock_{modify,signal}()
431                  * to take a semaphore on a parent lock. This is safe, because
432                  * spin-locks are needed to protect consistency of
433                  * dlmlock->l_*_mode and LVB, and we have finished processing
434                  * them. */
435                 unlock_res_and_lock(dlmlock);
436                 cl_lock_modify(env, lock, descr);
437                 cl_lock_signal(env, lock);
438                 LINVRNT(osc_lock_invariant(olck));
439                 lock_res_and_lock(dlmlock);
440         }
441         EXIT;
442 }
443
444 static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck)
445
446 {
447         struct ldlm_lock *dlmlock;
448
449         ENTRY;
450
451         dlmlock = ldlm_handle2lock_long(&olck->ols_handle, 0);
452         LASSERT(dlmlock != NULL);
453
454         lock_res_and_lock(dlmlock);
455         spin_lock(&osc_ast_guard);
456         LASSERT(dlmlock->l_ast_data == olck);
457         LASSERT(olck->ols_lock == NULL);
458         olck->ols_lock = dlmlock;
459         spin_unlock(&osc_ast_guard);
460
461         /*
462          * Lock might be not yet granted. In this case, completion ast
463          * (osc_ldlm_completion_ast()) comes later and finishes lock
464          * granting.
465          */
466         if (dlmlock->l_granted_mode == dlmlock->l_req_mode)
467                 osc_lock_granted(env, olck, dlmlock, 0);
468         unlock_res_and_lock(dlmlock);
469
470         /*
471          * osc_enqueue_interpret() decrefs asynchronous locks, counter
472          * this.
473          */
474         ldlm_lock_addref(&olck->ols_handle, olck->ols_einfo.ei_mode);
475         olck->ols_hold = 1;
476
477         /* lock reference taken by ldlm_handle2lock_long() is owned by
478          * osc_lock and released in osc_lock_detach() */
479         lu_ref_add(&dlmlock->l_reference, "osc_lock", olck);
480         olck->ols_has_ref = 1;
481 }
482
483 /**
484  * Lock upcall function that is executed either when a reply to ENQUEUE rpc is
485  * received from a server, or after osc_enqueue_base() matched a local DLM
486  * lock.
487  */
488 static int osc_lock_upcall(void *cookie, int errcode)
489 {
490         struct osc_lock         *olck  = cookie;
491         struct cl_lock_slice    *slice = &olck->ols_cl;
492         struct cl_lock          *lock  = slice->cls_lock;
493         struct lu_env           *env;
494         struct cl_env_nest       nest;
495
496         ENTRY;
497         env = cl_env_nested_get(&nest);
498         if (!IS_ERR(env)) {
499                 int rc;
500
501                 cl_lock_mutex_get(env, lock);
502
503                 LASSERT(lock->cll_state >= CLS_QUEUING);
504                 if (olck->ols_state == OLS_ENQUEUED) {
505                         olck->ols_state = OLS_UPCALL_RECEIVED;
506                         rc = ldlm_error2errno(errcode);
507                 } else if (olck->ols_state == OLS_CANCELLED) {
508                         rc = -EIO;
509                 } else {
510                         CERROR("Impossible state: %d\n", olck->ols_state);
511                         LBUG();
512                 }
513                 if (rc) {
514                         struct ldlm_lock *dlmlock;
515
516                         dlmlock = ldlm_handle2lock(&olck->ols_handle);
517                         if (dlmlock != NULL) {
518                                 lock_res_and_lock(dlmlock);
519                                 spin_lock(&osc_ast_guard);
520                                 LASSERT(olck->ols_lock == NULL);
521                                 dlmlock->l_ast_data = NULL;
522                                 olck->ols_handle.cookie = 0ULL;
523                                 spin_unlock(&osc_ast_guard);
524                                 ldlm_lock_fail_match_locked(dlmlock);
525                                 unlock_res_and_lock(dlmlock);
526                                 LDLM_LOCK_PUT(dlmlock);
527                         }
528                 } else {
529                         if (olck->ols_glimpse)
530                                 olck->ols_glimpse = 0;
531                         osc_lock_upcall0(env, olck);
532                 }
533
534                 /* Error handling, some errors are tolerable. */
535                 if (olck->ols_locklessable && rc == -EUSERS) {
536                         /* This is a tolerable error, turn this lock into
537                          * lockless lock.
538                          */
539                         osc_object_set_contended(cl2osc(slice->cls_obj));
540                         LASSERT(slice->cls_ops == &osc_lock_ops);
541
542                         /* Change this lock to ldlmlock-less lock. */
543                         osc_lock_to_lockless(env, olck, 1);
544                         olck->ols_state = OLS_GRANTED;
545                         rc = 0;
546                 } else if (olck->ols_glimpse && rc == -ENAVAIL) {
547                         osc_lock_lvb_update(env, olck, rc);
548                         cl_lock_delete(env, lock);
549                         /* Hide the error. */
550                         rc = 0;
551                 }
552
553                 if (rc == 0) {
554                         /* For AGL case, the RPC sponsor may exits the cl_lock
555                         *  processing without wait() called before related OSC
556                         *  lock upcall(). So update the lock status according
557                         *  to the enqueue result inside AGL upcall(). */
558                         if (olck->ols_agl) {
559                                 lock->cll_flags |= CLF_FROM_UPCALL;
560                                 cl_wait_try(env, lock);
561                                 lock->cll_flags &= ~CLF_FROM_UPCALL;
562                         }
563                         cl_lock_signal(env, lock);
564                         /* del user for lock upcall cookie */
565                         if (olck->ols_agl) {
566                                 if (!olck->ols_glimpse)
567                                         olck->ols_agl = 0;
568                                 cl_unuse_try(env, lock);
569                         }
570                 } else {
571                         /* del user for lock upcall cookie */
572                         if (olck->ols_agl)
573                                 cl_lock_user_del(env, lock);
574                         cl_lock_error(env, lock, rc);
575                 }
576
577                 /* release cookie reference, acquired by osc_lock_enqueue() */
578                 cl_lock_hold_release(env, lock, "upcall", lock);
579                 cl_lock_mutex_put(env, lock);
580
581                 lu_ref_del(&lock->cll_reference, "upcall", lock);
582                 /* This maybe the last reference, so must be called after
583                  * cl_lock_mutex_put(). */
584                 cl_lock_put(env, lock);
585
586                 cl_env_nested_put(&nest, env);
587         } else {
588                 /* should never happen, similar to osc_ldlm_blocking_ast(). */
589                 LBUG();
590         }
591         RETURN(errcode);
592 }
593
594 /**
595  * Core of osc_dlm_blocking_ast() logic.
596  */
597 static void osc_lock_blocking(const struct lu_env *env,
598                               struct ldlm_lock *dlmlock,
599                               struct osc_lock *olck, int blocking)
600 {
601         struct cl_lock *lock = olck->ols_cl.cls_lock;
602
603         LASSERT(olck->ols_lock == dlmlock);
604         CLASSERT(OLS_BLOCKED < OLS_CANCELLED);
605         LASSERT(!osc_lock_is_lockless(olck));
606
607         /*
608          * Lock might be still addref-ed here, if e.g., blocking ast
609          * is sent for a failed lock.
610          */
611         osc_lock_unhold(olck);
612
613         if (blocking && olck->ols_state < OLS_BLOCKED)
614                 /*
615                  * Move osc_lock into OLS_BLOCKED before canceling the lock,
616                  * because it recursively re-enters osc_lock_blocking(), with
617                  * the state set to OLS_CANCELLED.
618                  */
619                 olck->ols_state = OLS_BLOCKED;
620         /*
621          * cancel and destroy lock at least once no matter how blocking ast is
622          * entered (see comment above osc_ldlm_blocking_ast() for use
623          * cases). cl_lock_cancel() and cl_lock_delete() are idempotent.
624          */
625         cl_lock_cancel(env, lock);
626         cl_lock_delete(env, lock);
627 }
628
629 /**
630  * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock
631  * and ldlm_lock caches.
632  */
633 static int osc_dlm_blocking_ast0(const struct lu_env *env,
634                                  struct ldlm_lock *dlmlock,
635                                  void *data, int flag)
636 {
637         struct osc_lock *olck;
638         struct cl_lock  *lock;
639         int result;
640         int cancel;
641
642         LASSERT(flag == LDLM_CB_BLOCKING || flag == LDLM_CB_CANCELING);
643
644         cancel = 0;
645         olck = osc_ast_data_get(dlmlock);
646         if (olck != NULL) {
647                 lock = olck->ols_cl.cls_lock;
648                 cl_lock_mutex_get(env, lock);
649                 LINVRNT(osc_lock_invariant(olck));
650                 if (olck->ols_ast_wait) {
651                         /* wake up osc_lock_use() */
652                         cl_lock_signal(env, lock);
653                         olck->ols_ast_wait = 0;
654                 }
655                 /*
656                  * Lock might have been canceled while this thread was
657                  * sleeping for lock mutex, but olck is pinned in memory.
658                  */
659                 if (olck == dlmlock->l_ast_data) {
660                         /*
661                          * NOTE: DLM sends blocking AST's for failed locks
662                          *       (that are still in pre-OLS_GRANTED state)
663                          *       too, and they have to be canceled otherwise
664                          *       DLM lock is never destroyed and stuck in
665                          *       the memory.
666                          *
667                          *       Alternatively, ldlm_cli_cancel() can be
668                          *       called here directly for osc_locks with
669                          *       ols_state < OLS_GRANTED to maintain an
670                          *       invariant that ->clo_cancel() is only called
671                          *       for locks that were granted.
672                          */
673                         LASSERT(data == olck);
674                         osc_lock_blocking(env, dlmlock,
675                                           olck, flag == LDLM_CB_BLOCKING);
676                 } else
677                         cancel = 1;
678                 cl_lock_mutex_put(env, lock);
679                 osc_ast_data_put(env, olck);
680         } else
681                 /*
682                  * DLM lock exists, but there is no cl_lock attached to it.
683                  * This is a `normal' race. cl_object and its cl_lock's can be
684                  * removed by memory pressure, together with all pages.
685                  */
686                 cancel = (flag == LDLM_CB_BLOCKING);
687
688         if (cancel) {
689                 struct lustre_handle *lockh;
690
691                 lockh = &osc_env_info(env)->oti_handle;
692                 ldlm_lock2handle(dlmlock, lockh);
693                 result = ldlm_cli_cancel(lockh, LCF_ASYNC);
694         } else
695                 result = 0;
696         return result;
697 }
698
699 /**
700  * Blocking ast invoked by ldlm when dlm lock is either blocking progress of
701  * some other lock, or is canceled. This function is installed as a
702  * ldlm_lock::l_blocking_ast() for client extent locks.
703  *
704  * Control flow is tricky, because ldlm uses the same call-back
705  * (ldlm_lock::l_blocking_ast()) for both blocking and cancellation ast's.
706  *
707  * \param dlmlock lock for which ast occurred.
708  *
709  * \param new description of a conflicting lock in case of blocking ast.
710  *
711  * \param data value of dlmlock->l_ast_data
712  *
713  * \param flag LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
714  *             cancellation and blocking ast's.
715  *
716  * Possible use cases:
717  *
718  *     - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING) to cancel
719  *       lock due to lock lru pressure, or explicit user request to purge
720  *       locks.
721  *
722  *     - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_BLOCKING) to notify
723  *       us that dlmlock conflicts with another lock that some client is
724  *       enqueing. Lock is canceled.
725  *
726  *           - cl_lock_cancel() is called. osc_lock_cancel() calls
727  *             ldlm_cli_cancel() that calls
728  *
729  *                  dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
730  *
731  *             recursively entering osc_ldlm_blocking_ast().
732  *
733  *     - client cancels lock voluntary (e.g., as a part of early cancellation):
734  *
735  *           cl_lock_cancel()->
736  *             osc_lock_cancel()->
737  *               ldlm_cli_cancel()->
738  *                 dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
739  *
740  */
741 static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
742                                  struct ldlm_lock_desc *new, void *data,
743                                  int flag)
744 {
745         struct lu_env     *env;
746         struct cl_env_nest nest;
747         int                result;
748
749         /*
750          * This can be called in the context of outer IO, e.g.,
751          *
752          *     cl_enqueue()->...
753          *       ->osc_enqueue_base()->...
754          *         ->ldlm_prep_elc_req()->...
755          *           ->ldlm_cancel_callback()->...
756          *             ->osc_ldlm_blocking_ast()
757          *
758          * new environment has to be created to not corrupt outer context.
759          */
760         env = cl_env_nested_get(&nest);
761         if (!IS_ERR(env)) {
762                 result = osc_dlm_blocking_ast0(env, dlmlock, data, flag);
763                 cl_env_nested_put(&nest, env);
764         } else {
765                 result = PTR_ERR(env);
766                 /*
767                  * XXX This should never happen, as cl_lock is
768                  * stuck. Pre-allocated environment a la vvp_inode_fini_env
769                  * should be used.
770                  */
771                 LBUG();
772         }
773         if (result != 0) {
774                 if (result == -ENODATA)
775                         result = 0;
776                 else
777                         CERROR("BAST failed: %d\n", result);
778         }
779         return result;
780 }
781
782 static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock,
783                                    __u64 flags, void *data)
784 {
785         struct cl_env_nest nest;
786         struct lu_env     *env;
787         struct osc_lock   *olck;
788         struct cl_lock    *lock;
789         int result;
790         int dlmrc;
791
792         /* first, do dlm part of the work */
793         dlmrc = ldlm_completion_ast_async(dlmlock, flags, data);
794         if (flags == LDLM_FL_WAIT_NOREPROC)
795                 return dlmrc;
796
797         /* then, notify cl_lock */
798         env = cl_env_nested_get(&nest);
799         if (!IS_ERR(env)) {
800                 olck = osc_ast_data_get(dlmlock);
801                 if (olck != NULL) {
802                         lock = olck->ols_cl.cls_lock;
803                         cl_lock_mutex_get(env, lock);
804                         /*
805                          * ldlm_handle_cp_callback() copied LVB from request
806                          * to lock->l_lvb_data, store it in osc_lock.
807                          */
808                         LASSERT(dlmlock->l_lvb_data != NULL);
809                         lock_res_and_lock(dlmlock);
810                         olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
811                         if (olck->ols_lock == NULL) {
812                                 /*
813                                  * upcall (osc_lock_upcall()) hasn't yet been
814                                  * called. Do nothing now, upcall will bind
815                                  * olck to dlmlock and signal the waiters.
816                                  *
817                                  * This maintains an invariant that osc_lock
818                                  * and ldlm_lock are always bound when
819                                  * osc_lock is in OLS_GRANTED state.
820                                  */
821                         } else if (dlmlock->l_granted_mode ==
822                                    dlmlock->l_req_mode) {
823                                 osc_lock_granted(env, olck, dlmlock, dlmrc);
824                         }
825                         unlock_res_and_lock(dlmlock);
826
827                         if (dlmrc != 0) {
828                                 CL_LOCK_DEBUG(D_ERROR, env, lock,
829                                               "dlmlock returned %d\n", dlmrc);
830                                 cl_lock_error(env, lock, dlmrc);
831                         }
832                         cl_lock_mutex_put(env, lock);
833                         osc_ast_data_put(env, olck);
834                         result = 0;
835                 } else
836                         result = -ELDLM_NO_LOCK_DATA;
837                 cl_env_nested_put(&nest, env);
838         } else
839                 result = PTR_ERR(env);
840         return dlmrc ?: result;
841 }
842
843 static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
844 {
845         struct ptlrpc_request  *req  = data;
846         struct osc_lock        *olck;
847         struct cl_lock         *lock;
848         struct cl_object       *obj;
849         struct cl_env_nest      nest;
850         struct lu_env          *env;
851         struct ost_lvb         *lvb;
852         struct req_capsule     *cap;
853         int                     result;
854
855         LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
856
857         env = cl_env_nested_get(&nest);
858         if (!IS_ERR(env)) {
859                 /* osc_ast_data_get() has to go after environment is
860                  * allocated, because osc_ast_data() acquires a
861                  * reference to a lock, and it can only be released in
862                  * environment.
863                  */
864                 olck = osc_ast_data_get(dlmlock);
865                 if (olck != NULL) {
866                         lock = olck->ols_cl.cls_lock;
867                         /* Do not grab the mutex of cl_lock for glimpse.
868                          * See LU-1274 for details.
869                          * BTW, it's okay for cl_lock to be cancelled during
870                          * this period because server can handle this race.
871                          * See ldlm_server_glimpse_ast() for details.
872                          * cl_lock_mutex_get(env, lock); */
873                         cap = &req->rq_pill;
874                         req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK);
875                         req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER,
876                                              sizeof *lvb);
877                         result = req_capsule_server_pack(cap);
878                         if (result == 0) {
879                                 lvb = req_capsule_server_get(cap, &RMF_DLM_LVB);
880                                 obj = lock->cll_descr.cld_obj;
881                                 result = cl_object_glimpse(env, obj, lvb);
882                         }
883                         if (!exp_connect_lvb_type(req->rq_export))
884                                 req_capsule_shrink(&req->rq_pill,
885                                                    &RMF_DLM_LVB,
886                                                    sizeof(struct ost_lvb_v1),
887                                                    RCL_SERVER);
888                         osc_ast_data_put(env, olck);
889                 } else {
890                         /*
891                          * These errors are normal races, so we don't want to
892                          * fill the console with messages by calling
893                          * ptlrpc_error()
894                          */
895                         lustre_pack_reply(req, 1, NULL, NULL);
896                         result = -ELDLM_NO_LOCK_DATA;
897                 }
898                 cl_env_nested_put(&nest, env);
899         } else
900                 result = PTR_ERR(env);
901         req->rq_status = result;
902         return result;
903 }
904
905 static int weigh_cb(const struct lu_env *env, struct cl_io *io,
906                     struct osc_page *ops, void *cbdata)
907 {
908         struct cl_page *page = ops->ops_cl.cpl_page;
909
910         if (cl_page_is_vmlocked(env, page)
911             || PageDirty(page->cp_vmpage) || PageWriteback(page->cp_vmpage)
912            ) {
913                 (*(unsigned long *)cbdata)++;
914                 return CLP_GANG_ABORT;
915         }
916
917         return CLP_GANG_OKAY;
918 }
919
920 static unsigned long osc_lock_weight(const struct lu_env *env,
921                                      const struct osc_lock *ols)
922 {
923         struct cl_io *io = &osc_env_info(env)->oti_io;
924         struct cl_lock_descr *descr = &ols->ols_cl.cls_lock->cll_descr;
925         struct cl_object *obj = ols->ols_cl.cls_obj;
926         unsigned long npages = 0;
927         int result;
928         ENTRY;
929
930         io->ci_obj = cl_object_top(obj);
931         io->ci_ignore_layout = 1;
932         result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
933         if (result != 0)
934                 RETURN(result);
935
936         do {
937                 result = osc_page_gang_lookup(env, io, cl2osc(obj),
938                                               descr->cld_start, descr->cld_end,
939                                               weigh_cb, (void *)&npages);
940                 if (result == CLP_GANG_ABORT)
941                         break;
942                 if (result == CLP_GANG_RESCHED)
943                         cond_resched();
944         } while (result != CLP_GANG_OKAY);
945         cl_io_fini(env, io);
946
947         return npages;
948 }
949
950 /**
951  * Get the weight of dlm lock for early cancellation.
952  */
953 unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
954 {
955         struct cl_env_nest       nest;
956         struct lu_env           *env;
957         struct osc_lock         *lock;
958         unsigned long            weight;
959         ENTRY;
960
961         might_sleep();
962         /*
963          * osc_ldlm_weigh_ast has a complex context since it might be called
964          * because of lock canceling, or from user's input. We have to make
965          * a new environment for it. Probably it is implementation safe to use
966          * the upper context because cl_lock_put don't modify environment
967          * variables. But just in case ..
968          */
969         env = cl_env_nested_get(&nest);
970         if (IS_ERR(env))
971                 /* Mostly because lack of memory, do not eliminate this lock */
972                 RETURN(1);
973
974         LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT);
975         lock = osc_ast_data_get(dlmlock);
976         if (lock == NULL) {
977                 /* cl_lock was destroyed because of memory pressure.
978                  * It is much reasonable to assign this type of lock
979                  * a lower cost.
980                  */
981                 GOTO(out, weight = 0);
982         }
983
984         weight = osc_lock_weight(env, lock);
985         osc_ast_data_put(env, lock);
986         EXIT;
987
988 out:
989         cl_env_nested_put(&nest, env);
990         return weight;
991 }
992
993 static void osc_lock_build_einfo(const struct lu_env *env,
994                                  const struct cl_lock *clock,
995                                  struct osc_lock *lock,
996                                  struct ldlm_enqueue_info *einfo)
997 {
998         enum cl_lock_mode mode;
999
1000         mode = clock->cll_descr.cld_mode;
1001         if (mode == CLM_PHANTOM)
1002                 /*
1003                  * For now, enqueue all glimpse locks in read mode. In the
1004                  * future, client might choose to enqueue LCK_PW lock for
1005                  * glimpse on a file opened for write.
1006                  */
1007                 mode = CLM_READ;
1008
1009         einfo->ei_type   = LDLM_EXTENT;
1010         einfo->ei_mode   = osc_cl_lock2ldlm(mode);
1011         einfo->ei_cb_bl  = osc_ldlm_blocking_ast;
1012         einfo->ei_cb_cp  = osc_ldlm_completion_ast;
1013         einfo->ei_cb_gl  = osc_ldlm_glimpse_ast;
1014         einfo->ei_cbdata = lock; /* value to be put into ->l_ast_data */
1015 }
1016
1017 /**
1018  * Determine if the lock should be converted into a lockless lock.
1019  *
1020  * Steps to check:
1021  * - if the lock has an explicite requirment for a non-lockless lock;
1022  * - if the io lock request type ci_lockreq;
1023  * - send the enqueue rpc to ost to make the further decision;
1024  * - special treat to truncate lockless lock
1025  *
1026  *  Additional policy can be implemented here, e.g., never do lockless-io
1027  *  for large extents.
1028  */
1029 static void osc_lock_to_lockless(const struct lu_env *env,
1030                                  struct osc_lock *ols, int force)
1031 {
1032         struct cl_lock_slice *slice = &ols->ols_cl;
1033
1034         LASSERT(ols->ols_state == OLS_NEW ||
1035                 ols->ols_state == OLS_UPCALL_RECEIVED);
1036
1037         if (force) {
1038                 ols->ols_locklessable = 1;
1039                 slice->cls_ops = &osc_lock_lockless_ops;
1040         } else {
1041                 struct osc_io *oio     = osc_env_io(env);
1042                 struct cl_io  *io      = oio->oi_cl.cis_io;
1043                 struct cl_object *obj  = slice->cls_obj;
1044                 struct osc_object *oob = cl2osc(obj);
1045                 const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
1046                 struct obd_connect_data *ocd;
1047
1048                 LASSERT(io->ci_lockreq == CILR_MANDATORY ||
1049                         io->ci_lockreq == CILR_MAYBE ||
1050                         io->ci_lockreq == CILR_NEVER);
1051
1052                 ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
1053                 ols->ols_locklessable = (io->ci_type != CIT_SETATTR) &&
1054                                 (io->ci_lockreq == CILR_MAYBE) &&
1055                                 (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
1056                 if (io->ci_lockreq == CILR_NEVER ||
1057                         /* lockless IO */
1058                     (ols->ols_locklessable && osc_object_is_contended(oob)) ||
1059                         /* lockless truncate */
1060                     (cl_io_is_trunc(io) &&
1061                      (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
1062                       osd->od_lockless_truncate)) {
1063                         ols->ols_locklessable = 1;
1064                         slice->cls_ops = &osc_lock_lockless_ops;
1065                 }
1066         }
1067         LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
1068 }
1069
1070 static int osc_lock_compatible(const struct osc_lock *qing,
1071                                const struct osc_lock *qed)
1072 {
1073         enum cl_lock_mode qing_mode;
1074         enum cl_lock_mode qed_mode;
1075
1076         qing_mode = qing->ols_cl.cls_lock->cll_descr.cld_mode;
1077         if (qed->ols_glimpse &&
1078             (qed->ols_state >= OLS_UPCALL_RECEIVED || qing_mode == CLM_READ))
1079                 return 1;
1080
1081         qed_mode = qed->ols_cl.cls_lock->cll_descr.cld_mode;
1082         return ((qing_mode == CLM_READ) && (qed_mode == CLM_READ));
1083 }
1084
1085 /**
1086  * Cancel all conflicting locks and wait for them to be destroyed.
1087  *
1088  * This function is used for two purposes:
1089  *
1090  *     - early cancel all conflicting locks before starting IO, and
1091  *
1092  *     - guarantee that pages added to the page cache by lockless IO are never
1093  *       covered by locks other than lockless IO lock, and, hence, are not
1094  *       visible to other threads.
1095  */
1096 static int osc_lock_enqueue_wait(const struct lu_env *env,
1097                                  const struct osc_lock *olck)
1098 {
1099         struct cl_lock          *lock    = olck->ols_cl.cls_lock;
1100         struct cl_lock_descr    *descr   = &lock->cll_descr;
1101         struct cl_object_header *hdr     = cl_object_header(descr->cld_obj);
1102         struct cl_lock          *scan;
1103         struct cl_lock          *conflict= NULL;
1104         int lockless                     = osc_lock_is_lockless(olck);
1105         int rc                           = 0;
1106         ENTRY;
1107
1108         LASSERT(cl_lock_is_mutexed(lock));
1109
1110         /* make it enqueue anyway for glimpse lock, because we actually
1111          * don't need to cancel any conflicting locks. */
1112         if (olck->ols_glimpse)
1113                 return 0;
1114
1115         spin_lock(&hdr->coh_lock_guard);
1116         list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) {
1117                 struct cl_lock_descr *cld = &scan->cll_descr;
1118                 const struct osc_lock *scan_ols;
1119
1120                 if (scan == lock)
1121                         break;
1122
1123                 if (scan->cll_state < CLS_QUEUING ||
1124                     scan->cll_state == CLS_FREEING ||
1125                     cld->cld_start > descr->cld_end ||
1126                     cld->cld_end < descr->cld_start)
1127                         continue;
1128
1129                 /* overlapped and living locks. */
1130
1131                 /* We're not supposed to give up group lock. */
1132                 if (scan->cll_descr.cld_mode == CLM_GROUP) {
1133                         LASSERT(descr->cld_mode != CLM_GROUP ||
1134                                 descr->cld_gid != scan->cll_descr.cld_gid);
1135                         continue;
1136                 }
1137
1138                 scan_ols = osc_lock_at(scan);
1139
1140                 /* We need to cancel the compatible locks if we're enqueuing
1141                  * a lockless lock, for example:
1142                  * imagine that client has PR lock on [0, 1000], and thread T0
1143                  * is doing lockless IO in [500, 1500] region. Concurrent
1144                  * thread T1 can see lockless data in [500, 1000], which is
1145                  * wrong, because these data are possibly stale. */
1146                 if (!lockless && osc_lock_compatible(olck, scan_ols))
1147                         continue;
1148
1149                 cl_lock_get_trust(scan);
1150                 conflict = scan;
1151                 break;
1152         }
1153         spin_unlock(&hdr->coh_lock_guard);
1154
1155         if (conflict) {
1156                 if (lock->cll_descr.cld_mode == CLM_GROUP) {
1157                         /* we want a group lock but a previous lock request
1158                          * conflicts, we do not wait but return 0 so the
1159                          * request is send to the server
1160                          */
1161                         CDEBUG(D_DLMTRACE, "group lock %p is conflicted "
1162                                            "with %p, no wait, send to server\n",
1163                                lock, conflict);
1164                         cl_lock_put(env, conflict);
1165                         rc = 0;
1166                 } else {
1167                         CDEBUG(D_DLMTRACE, "lock %p is conflicted with %p, "
1168                                            "will wait\n",
1169                                lock, conflict);
1170                         LASSERT(lock->cll_conflict == NULL);
1171                         lu_ref_add(&conflict->cll_reference, "cancel-wait",
1172                                    lock);
1173                         lock->cll_conflict = conflict;
1174                         rc = CLO_WAIT;
1175                 }
1176         }
1177         RETURN(rc);
1178 }
1179
1180 /**
1181  * Implementation of cl_lock_operations::clo_enqueue() method for osc
1182  * layer. This initiates ldlm enqueue:
1183  *
1184  *     - cancels conflicting locks early (osc_lock_enqueue_wait());
1185  *
1186  *     - calls osc_enqueue_base() to do actual enqueue.
1187  *
1188  * osc_enqueue_base() is supplied with an upcall function that is executed
1189  * when lock is received either after a local cached ldlm lock is matched, or
1190  * when a reply from the server is received.
1191  *
1192  * This function does not wait for the network communication to complete.
1193  */
1194 static int osc_lock_enqueue(const struct lu_env *env,
1195                             const struct cl_lock_slice *slice,
1196                             struct cl_io *unused, __u32 enqflags)
1197 {
1198         struct osc_lock          *ols     = cl2osc_lock(slice);
1199         struct cl_lock           *lock    = ols->ols_cl.cls_lock;
1200         int result;
1201         ENTRY;
1202
1203         LASSERT(cl_lock_is_mutexed(lock));
1204         LASSERTF(ols->ols_state == OLS_NEW,
1205                  "Impossible state: %d\n", ols->ols_state);
1206
1207         LASSERTF(ergo(ols->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ),
1208                 "lock = %p, ols = %p\n", lock, ols);
1209
1210         result = osc_lock_enqueue_wait(env, ols);
1211         if (result == 0) {
1212                 if (!osc_lock_is_lockless(ols)) {
1213                         struct osc_object        *obj = cl2osc(slice->cls_obj);
1214                         struct osc_thread_info   *info = osc_env_info(env);
1215                         struct ldlm_res_id       *resname = &info->oti_resname;
1216                         ldlm_policy_data_t       *policy = &info->oti_policy;
1217                         struct ldlm_enqueue_info *einfo = &ols->ols_einfo;
1218
1219                         /* lock will be passed as upcall cookie,
1220                          * hold ref to prevent to be released. */
1221                         cl_lock_hold_add(env, lock, "upcall", lock);
1222                         /* a user for agl lock also */
1223                         if (ols->ols_agl)
1224                                 cl_lock_user_add(env, lock);
1225                         ols->ols_state = OLS_ENQUEUED;
1226
1227                         /*
1228                          * XXX: this is possible blocking point as
1229                          * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
1230                          * LDLM_CP_CALLBACK.
1231                          */
1232                         ostid_build_res_name(&obj->oo_oinfo->loi_oi, resname);
1233                         osc_lock_build_policy(env, lock, policy);
1234                         result = osc_enqueue_base(osc_export(obj), resname,
1235                                           &ols->ols_flags, policy,
1236                                           &ols->ols_lvb,
1237                                           obj->oo_oinfo->loi_kms_valid,
1238                                           osc_lock_upcall,
1239                                           ols, einfo, &ols->ols_handle,
1240                                           PTLRPCD_SET, 1, ols->ols_agl);
1241                         if (result != 0) {
1242                                 if (ols->ols_agl)
1243                                         cl_lock_user_del(env, lock);
1244                                 cl_lock_unhold(env, lock, "upcall", lock);
1245                                 if (unlikely(result == -ECANCELED)) {
1246                                         ols->ols_state = OLS_NEW;
1247                                         result = 0;
1248                                 }
1249                         }
1250                 } else {
1251                         ols->ols_state = OLS_GRANTED;
1252                         ols->ols_owner = osc_env_io(env);
1253                 }
1254         }
1255         LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
1256         RETURN(result);
1257 }
1258
1259 static int osc_lock_wait(const struct lu_env *env,
1260                          const struct cl_lock_slice *slice)
1261 {
1262         struct osc_lock *olck = cl2osc_lock(slice);
1263         struct cl_lock  *lock = olck->ols_cl.cls_lock;
1264
1265         LINVRNT(osc_lock_invariant(olck));
1266
1267         if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) {
1268                 if (olck->ols_flags & LDLM_FL_LVB_READY) {
1269                         return 0;
1270                 } else if (olck->ols_agl) {
1271                         if (lock->cll_flags & CLF_FROM_UPCALL)
1272                                 /* It is from enqueue RPC reply upcall for
1273                                  * updating state. Do not re-enqueue. */
1274                                 return -ENAVAIL;
1275                         else
1276                                 olck->ols_state = OLS_NEW;
1277                 } else {
1278                         LASSERT(lock->cll_error);
1279                         return lock->cll_error;
1280                 }
1281         }
1282
1283         if (olck->ols_state == OLS_NEW) {
1284                 int rc;
1285
1286                 LASSERT(olck->ols_agl);
1287                 olck->ols_agl = 0;
1288                 olck->ols_flags &= ~LDLM_FL_BLOCK_NOWAIT;
1289                 rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC | CEF_MUST);
1290                 if (rc != 0)
1291                         return rc;
1292                 else
1293                         return CLO_REENQUEUED;
1294         }
1295
1296         LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED &&
1297                      lock->cll_error == 0, olck->ols_lock != NULL));
1298
1299         return lock->cll_error ?: olck->ols_state >= OLS_GRANTED ? 0 : CLO_WAIT;
1300 }
1301
1302 /**
1303  * An implementation of cl_lock_operations::clo_use() method that pins cached
1304  * lock.
1305  */
1306 static int osc_lock_use(const struct lu_env *env,
1307                         const struct cl_lock_slice *slice)
1308 {
1309         struct osc_lock *olck = cl2osc_lock(slice);
1310         int rc;
1311
1312         LASSERT(!olck->ols_hold);
1313
1314         /*
1315          * Atomically check for LDLM_FL_CBPENDING and addref a lock if this
1316          * flag is not set. This protects us from a concurrent blocking ast.
1317          */
1318         rc = ldlm_lock_addref_try(&olck->ols_handle, olck->ols_einfo.ei_mode);
1319         if (rc == 0) {
1320                 olck->ols_hold = 1;
1321                 olck->ols_state = OLS_GRANTED;
1322         } else {
1323                 struct cl_lock *lock;
1324
1325                 /*
1326                  * Lock is being cancelled somewhere within
1327                  * ldlm_handle_bl_callback(): LDLM_FL_CBPENDING is already
1328                  * set, but osc_ldlm_blocking_ast() hasn't yet acquired
1329                  * cl_lock mutex.
1330                  */
1331                 lock = slice->cls_lock;
1332                 LASSERT(lock->cll_state == CLS_INTRANSIT);
1333                 LASSERT(lock->cll_users > 0);
1334                 /* set a flag for osc_dlm_blocking_ast0() to signal the
1335                  * lock.*/
1336                 olck->ols_ast_wait = 1;
1337                 rc = CLO_WAIT;
1338         }
1339         return rc;
1340 }
1341
1342 static int osc_lock_flush(struct osc_lock *ols, int discard)
1343 {
1344         struct cl_lock       *lock  = ols->ols_cl.cls_lock;
1345         struct cl_env_nest    nest;
1346         struct lu_env        *env;
1347         int result = 0;
1348         ENTRY;
1349
1350         env = cl_env_nested_get(&nest);
1351         if (!IS_ERR(env)) {
1352                 struct osc_object    *obj   = cl2osc(ols->ols_cl.cls_obj);
1353                 struct cl_lock_descr *descr = &lock->cll_descr;
1354                 int rc = 0;
1355
1356                 if (descr->cld_mode >= CLM_WRITE) {
1357                         result = osc_cache_writeback_range(env, obj,
1358                                         descr->cld_start, descr->cld_end,
1359                                         1, discard);
1360                         LDLM_DEBUG(ols->ols_lock,
1361                                 "lock %p: %d pages were %s.\n", lock, result,
1362                                 discard ? "discarded" : "written");
1363                         if (result > 0)
1364                                 result = 0;
1365                 }
1366
1367                 rc = osc_lock_discard_pages(env, ols);
1368                 if (result == 0 && rc < 0)
1369                         result = rc;
1370
1371                 cl_env_nested_put(&nest, env);
1372         } else
1373                 result = PTR_ERR(env);
1374         if (result == 0) {
1375                 ols->ols_flush = 1;
1376                 LINVRNT(!osc_lock_has_pages(ols));
1377         }
1378         RETURN(result);
1379 }
1380
1381 /**
1382  * Implements cl_lock_operations::clo_cancel() method for osc layer. This is
1383  * called (as part of cl_lock_cancel()) when lock is canceled either voluntary
1384  * (LRU pressure, early cancellation, umount, etc.) or due to the conflict
1385  * with some other lock some where in the cluster. This function does the
1386  * following:
1387  *
1388  *     - invalidates all pages protected by this lock (after sending dirty
1389  *       ones to the server, as necessary);
1390  *
1391  *     - decref's underlying ldlm lock;
1392  *
1393  *     - cancels ldlm lock (ldlm_cli_cancel()).
1394  */
1395 static void osc_lock_cancel(const struct lu_env *env,
1396                             const struct cl_lock_slice *slice)
1397 {
1398         struct cl_lock   *lock    = slice->cls_lock;
1399         struct osc_lock  *olck    = cl2osc_lock(slice);
1400         struct ldlm_lock *dlmlock = olck->ols_lock;
1401
1402         LASSERT(cl_lock_is_mutexed(lock));
1403         LINVRNT(osc_lock_invariant(olck));
1404
1405         if (dlmlock != NULL) {
1406                 bool do_cancel;
1407                 int  result = 0;
1408
1409                 if (olck->ols_state >= OLS_GRANTED)
1410                         result = osc_lock_flush(olck,
1411                                 ldlm_is_discard_data(dlmlock));
1412                 osc_lock_unhold(olck);
1413
1414                 lock_res_and_lock(dlmlock);
1415                 /* Now that we're the only user of dlm read/write reference,
1416                  * mostly the ->l_readers + ->l_writers should be zero.
1417                  * However, there is a corner case.
1418                  * See b=18829 for details.*/
1419                 do_cancel = (dlmlock->l_readers == 0 &&
1420                              dlmlock->l_writers == 0);
1421                 ldlm_set_cbpending(dlmlock);
1422                 unlock_res_and_lock(dlmlock);
1423                 if (do_cancel)
1424                         result = ldlm_cli_cancel(&olck->ols_handle, LCF_ASYNC);
1425                 if (result < 0)
1426                         CL_LOCK_DEBUG(D_ERROR, env, lock,
1427                                       "lock %p cancel failure with error(%d)\n",
1428                                       lock, result);
1429         }
1430         olck->ols_state = OLS_CANCELLED;
1431         olck->ols_flags &= ~LDLM_FL_LVB_READY;
1432         osc_lock_detach(env, olck);
1433 }
1434
1435 #ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
1436 static int check_cb(const struct lu_env *env, struct cl_io *io,
1437                     struct osc_page *ops, void *cbdata)
1438 {
1439         struct cl_lock *lock = cbdata;
1440
1441         if (lock->cll_descr.cld_mode == CLM_READ) {
1442                 struct cl_lock *tmp;
1443                 tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj,
1444                                        osc_index(ops), lock, 1, 0);
1445                 if (tmp != NULL) {
1446                         cl_lock_put(env, tmp);
1447                         return CLP_GANG_OKAY;
1448                 }
1449         }
1450
1451         CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n");
1452         CL_PAGE_DEBUG(D_ERROR, env, ops->ops_cl.cpl_page, "\n");
1453         return CLP_GANG_ABORT;
1454 }
1455
1456 /**
1457  * Returns true iff there are pages under \a olck not protected by other
1458  * locks.
1459  */
1460 static bool osc_lock_has_pages(struct osc_lock *olck)
1461 {
1462         struct cl_lock       *lock;
1463         struct cl_lock_descr *descr;
1464         struct cl_object     *obj;
1465         struct osc_object    *oob;
1466         struct cl_env_nest    nest;
1467         struct cl_io         *io;
1468         struct lu_env        *env;
1469         bool                     has_pages;
1470         int                      rc;
1471
1472         env = cl_env_nested_get(&nest);
1473         if (IS_ERR(env))
1474                 return false;
1475
1476         obj   = olck->ols_cl.cls_obj;
1477         oob   = cl2osc(obj);
1478         io    = &oob->oo_debug_io;
1479         lock  = olck->ols_cl.cls_lock;
1480         descr = &lock->cll_descr;
1481
1482         mutex_lock(&oob->oo_debug_mutex);
1483         io->ci_obj = cl_object_top(obj);
1484         io->ci_ignore_layout = 1;
1485         rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
1486         if (rc != 0)
1487                 GOTO(out, has_pages = false);
1488
1489         do {
1490                 rc = osc_page_gang_lookup(env, io, oob,
1491                                           descr->cld_start, descr->cld_end,
1492                                           check_cb, (void *)lock);
1493                 if (rc == CLP_GANG_ABORT)
1494                         break;
1495                 if (rc == CLP_GANG_RESCHED)
1496                         cond_resched();
1497         } while (rc != CLP_GANG_OKAY);
1498         has_pages = (rc == CLP_GANG_ABORT);
1499 out:
1500         cl_io_fini(env, io);
1501         mutex_unlock(&oob->oo_debug_mutex);
1502         cl_env_nested_put(&nest, env);
1503
1504         return has_pages;
1505 }
1506 #else
1507 static bool osc_lock_has_pages(struct osc_lock *olck)
1508 {
1509         return false;
1510 }
1511 #endif /* CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK */
1512
1513 static void osc_lock_delete(const struct lu_env *env,
1514                             const struct cl_lock_slice *slice)
1515 {
1516         struct osc_lock *olck;
1517
1518         olck = cl2osc_lock(slice);
1519         if (olck->ols_glimpse) {
1520                 LASSERT(!olck->ols_hold);
1521                 LASSERT(!olck->ols_lock);
1522                 return;
1523         }
1524
1525         LINVRNT(osc_lock_invariant(olck));
1526         LINVRNT(!osc_lock_has_pages(olck));
1527
1528         osc_lock_unhold(olck);
1529         osc_lock_detach(env, olck);
1530 }
1531
1532 /**
1533  * Implements cl_lock_operations::clo_state() method for osc layer.
1534  *
1535  * Maintains osc_lock::ols_owner field.
1536  *
1537  * This assumes that lock always enters CLS_HELD (from some other state) in
1538  * the same IO context as one that requested the lock. This should not be a
1539  * problem, because context is by definition shared by all activity pertaining
1540  * to the same high-level IO.
1541  */
1542 static void osc_lock_state(const struct lu_env *env,
1543                            const struct cl_lock_slice *slice,
1544                            enum cl_lock_state state)
1545 {
1546         struct osc_lock *lock = cl2osc_lock(slice);
1547
1548         /*
1549          * XXX multiple io contexts can use the lock at the same time.
1550          */
1551         LINVRNT(osc_lock_invariant(lock));
1552         if (state == CLS_HELD && slice->cls_lock->cll_state != CLS_HELD) {
1553                 struct osc_io *oio = osc_env_io(env);
1554
1555                 LASSERT(lock->ols_owner == NULL);
1556                 lock->ols_owner = oio;
1557         } else if (state != CLS_HELD)
1558                 lock->ols_owner = NULL;
1559 }
1560
1561 static int osc_lock_print(const struct lu_env *env, void *cookie,
1562                           lu_printer_t p, const struct cl_lock_slice *slice)
1563 {
1564         struct osc_lock *lock = cl2osc_lock(slice);
1565
1566         /*
1567          * XXX print ldlm lock and einfo properly.
1568          */
1569         (*p)(env, cookie, "%p "LPX64" "LPX64" %d %p ",
1570              lock->ols_lock, lock->ols_flags, lock->ols_handle.cookie,
1571              lock->ols_state, lock->ols_owner);
1572         osc_lvb_print(env, cookie, p, &lock->ols_lvb);
1573         return 0;
1574 }
1575
1576 static int osc_lock_fits_into(const struct lu_env *env,
1577                               const struct cl_lock_slice *slice,
1578                               const struct cl_lock_descr *need,
1579                               const struct cl_io *io)
1580 {
1581         struct osc_lock *ols = cl2osc_lock(slice);
1582
1583         if (need->cld_enq_flags & CEF_NEVER)
1584                 return 0;
1585
1586         if (ols->ols_state >= OLS_CANCELLED)
1587                 return 0;
1588
1589         if (need->cld_mode == CLM_PHANTOM) {
1590                 if (ols->ols_agl)
1591                         return !(ols->ols_state > OLS_RELEASED);
1592
1593                 /*
1594                  * Note: the QUEUED lock can't be matched here, otherwise
1595                  * it might cause the deadlocks.
1596                  * In read_process,
1597                  * P1: enqueued read lock, create sublock1
1598                  * P2: enqueued write lock, create sublock2(conflicted
1599                  *     with sublock1).
1600                  * P1: Grant read lock.
1601                  * P1: enqueued glimpse lock(with holding sublock1_read),
1602                  *     matched with sublock2, waiting sublock2 to be granted.
1603                  *     But sublock2 can not be granted, because P1
1604                  *     will not release sublock1. Bang!
1605                  */
1606                 if (ols->ols_state < OLS_GRANTED ||
1607                     ols->ols_state > OLS_RELEASED)
1608                         return 0;
1609         } else if (need->cld_enq_flags & CEF_MUST) {
1610                 /*
1611                  * If the lock hasn't ever enqueued, it can't be matched
1612                  * because enqueue process brings in many information
1613                  * which can be used to determine things such as lockless,
1614                  * CEF_MUST, etc.
1615                  */
1616                 if (ols->ols_state < OLS_UPCALL_RECEIVED &&
1617                     ols->ols_locklessable)
1618                         return 0;
1619         }
1620         return 1;
1621 }
1622
1623 static const struct cl_lock_operations osc_lock_ops = {
1624         .clo_fini    = osc_lock_fini,
1625         .clo_enqueue = osc_lock_enqueue,
1626         .clo_wait    = osc_lock_wait,
1627         .clo_unuse   = osc_lock_unuse,
1628         .clo_use     = osc_lock_use,
1629         .clo_delete  = osc_lock_delete,
1630         .clo_state   = osc_lock_state,
1631         .clo_cancel  = osc_lock_cancel,
1632         .clo_print   = osc_lock_print,
1633         .clo_fits_into = osc_lock_fits_into,
1634 };
1635
1636 static int osc_lock_lockless_unuse(const struct lu_env *env,
1637                                    const struct cl_lock_slice *slice)
1638 {
1639         struct osc_lock *ols = cl2osc_lock(slice);
1640         struct cl_lock *lock = slice->cls_lock;
1641
1642         LASSERT(ols->ols_state == OLS_GRANTED);
1643         LINVRNT(osc_lock_invariant(ols));
1644
1645         cl_lock_cancel(env, lock);
1646         cl_lock_delete(env, lock);
1647         return 0;
1648 }
1649
1650 static void osc_lock_lockless_cancel(const struct lu_env *env,
1651                                      const struct cl_lock_slice *slice)
1652 {
1653         struct osc_lock   *ols  = cl2osc_lock(slice);
1654         int result;
1655
1656         result = osc_lock_flush(ols, 0);
1657         if (result)
1658                 CERROR("Pages for lockless lock %p were not purged(%d)\n",
1659                        ols, result);
1660         ols->ols_state = OLS_CANCELLED;
1661 }
1662
1663 static int osc_lock_lockless_wait(const struct lu_env *env,
1664                                   const struct cl_lock_slice *slice)
1665 {
1666         struct osc_lock *olck = cl2osc_lock(slice);
1667         struct cl_lock  *lock = olck->ols_cl.cls_lock;
1668
1669         LINVRNT(osc_lock_invariant(olck));
1670         LASSERT(olck->ols_state >= OLS_UPCALL_RECEIVED);
1671
1672         return lock->cll_error;
1673 }
1674
1675 static void osc_lock_lockless_state(const struct lu_env *env,
1676                                     const struct cl_lock_slice *slice,
1677                                     enum cl_lock_state state)
1678 {
1679         struct osc_lock *lock = cl2osc_lock(slice);
1680
1681         LINVRNT(osc_lock_invariant(lock));
1682         if (state == CLS_HELD) {
1683                 struct osc_io *oio  = osc_env_io(env);
1684
1685                 LASSERT(ergo(lock->ols_owner, lock->ols_owner == oio));
1686                 lock->ols_owner = oio;
1687
1688                 /* set the io to be lockless if this lock is for io's
1689                  * host object */
1690                 if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
1691                         oio->oi_lockless = 1;
1692         }
1693 }
1694
1695 static int osc_lock_lockless_fits_into(const struct lu_env *env,
1696                                        const struct cl_lock_slice *slice,
1697                                        const struct cl_lock_descr *need,
1698                                        const struct cl_io *io)
1699 {
1700         struct osc_lock *lock = cl2osc_lock(slice);
1701
1702         if (!(need->cld_enq_flags & CEF_NEVER))
1703                 return 0;
1704
1705         /* lockless lock should only be used by its owning io. b22147 */
1706         return (lock->ols_owner == osc_env_io(env));
1707 }
1708
1709 static const struct cl_lock_operations osc_lock_lockless_ops = {
1710         .clo_fini      = osc_lock_fini,
1711         .clo_enqueue   = osc_lock_enqueue,
1712         .clo_wait      = osc_lock_lockless_wait,
1713         .clo_unuse     = osc_lock_lockless_unuse,
1714         .clo_state     = osc_lock_lockless_state,
1715         .clo_fits_into = osc_lock_lockless_fits_into,
1716         .clo_cancel    = osc_lock_lockless_cancel,
1717         .clo_print     = osc_lock_print
1718 };
1719
1720 int osc_lock_init(const struct lu_env *env,
1721                   struct cl_object *obj, struct cl_lock *lock,
1722                   const struct cl_io *unused)
1723 {
1724         struct osc_lock *clk;
1725         int result;
1726
1727         OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, GFP_NOFS);
1728         if (clk != NULL) {
1729                 __u32 enqflags = lock->cll_descr.cld_enq_flags;
1730
1731                 osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
1732                 clk->ols_state = OLS_NEW;
1733
1734                 clk->ols_flags = osc_enq2ldlm_flags(enqflags);
1735                 clk->ols_agl = !!(enqflags & CEF_AGL);
1736                 if (clk->ols_agl)
1737                         clk->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
1738                 if (clk->ols_flags & LDLM_FL_HAS_INTENT)
1739                         clk->ols_glimpse = 1;
1740
1741                 cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
1742
1743                 if (!(enqflags & CEF_MUST))
1744                         /* try to convert this lock to a lockless lock */
1745                         osc_lock_to_lockless(env, clk, (enqflags & CEF_NEVER));
1746                 if (clk->ols_locklessable && !(enqflags & CEF_DISCARD_DATA))
1747                         clk->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
1748
1749                 LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags "LPX64,
1750                                 lock, clk, clk->ols_flags);
1751
1752                 result = 0;
1753         } else
1754                 result = -ENOMEM;
1755         return result;
1756 }
1757
1758 /** @} osc */