Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / osc / osc_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_lock for OSC layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 /** \addtogroup osc osc @{ */
42
43 #define DEBUG_SUBSYSTEM S_OSC
44
45 #ifdef __KERNEL__
46 # include <libcfs/libcfs.h>
47 #else
48 # include <liblustre.h>
49 #endif
50 /* fid_build_reg_res_name() */
51 #include <lustre_fid.h>
52
53 #include "osc_cl_internal.h"
54
55 /*****************************************************************************
56  *
57  * Type conversions.
58  *
59  */
60
61 static const struct cl_lock_operations osc_lock_ops;
62 static const struct cl_lock_operations osc_lock_lockless_ops;
63 static void osc_lock_to_lockless(const struct lu_env *env,
64                                  struct osc_lock *ols, int force);
65
66 int osc_lock_is_lockless(const struct osc_lock *olck)
67 {
68         return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops);
69 }
70
71 /**
72  * Returns a weak pointer to the ldlm lock identified by a handle. Returned
73  * pointer cannot be dereferenced, as lock is not protected from concurrent
74  * reclaim. This function is a helper for osc_lock_invariant().
75  */
76 static struct ldlm_lock *osc_handle_ptr(struct lustre_handle *handle)
77 {
78         struct ldlm_lock *lock;
79
80         lock = ldlm_handle2lock(handle);
81         if (lock != NULL)
82                 LDLM_LOCK_PUT(lock);
83         return lock;
84 }
85
86 /**
87  * Invariant that has to be true all of the time.
88  */
89 static int osc_lock_invariant(struct osc_lock *ols)
90 {
91         struct ldlm_lock *lock        = osc_handle_ptr(&ols->ols_handle);
92         struct ldlm_lock *olock       = ols->ols_lock;
93         int               handle_used = lustre_handle_is_used(&ols->ols_handle);
94
95         return
96                 ergo(osc_lock_is_lockless(ols),
97                      ols->ols_locklessable && ols->ols_lock == NULL)  ||
98                 (ergo(olock != NULL, handle_used) &&
99                  ergo(olock != NULL,
100                       olock->l_handle.h_cookie == ols->ols_handle.cookie) &&
101                  /*
102                   * Check that ->ols_handle and ->ols_lock are consistent, but
103                   * take into account that they are set at the different time.
104                   */
105                  ergo(handle_used,
106                       ergo(lock != NULL && olock != NULL, lock == olock) &&
107                       ergo(lock == NULL, olock == NULL)) &&
108                  ergo(ols->ols_state == OLS_CANCELLED,
109                       olock == NULL && !handle_used) &&
110                  /*
111                   * DLM lock is destroyed only after we have seen cancellation
112                   * ast.
113                   */
114                  ergo(olock != NULL && ols->ols_state < OLS_CANCELLED,
115                       !olock->l_destroyed) &&
116                  ergo(ols->ols_state == OLS_GRANTED,
117                       olock != NULL &&
118                       olock->l_req_mode == olock->l_granted_mode &&
119                       ols->ols_hold));
120 }
121
122 /*****************************************************************************
123  *
124  * Lock operations.
125  *
126  */
127
128 /**
129  * Breaks a link between osc_lock and dlm_lock.
130  */
131 static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck)
132 {
133         struct ldlm_lock *dlmlock;
134
135         spin_lock(&osc_ast_guard);
136         dlmlock = olck->ols_lock;
137         if (dlmlock == NULL) {
138                 spin_unlock(&osc_ast_guard);
139                 return;
140         }
141
142         olck->ols_lock = NULL;
143         /* wb(); --- for all who checks (ols->ols_lock != NULL) before
144          * call to osc_lock_detach() */
145         dlmlock->l_ast_data = NULL;
146         olck->ols_handle.cookie = 0ULL;
147         spin_unlock(&osc_ast_guard);
148
149         lock_res_and_lock(dlmlock);
150         if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
151                 struct cl_object *obj = olck->ols_cl.cls_obj;
152                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
153                 __u64 old_kms = cl2osc(obj)->oo_oinfo->loi_kms;
154
155                 /* Update the kms. Need to loop all granted locks.
156                  * Not a problem for the client */
157                 attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms);
158                 unlock_res_and_lock(dlmlock);
159
160                 cl_object_attr_lock(obj);
161                 cl_object_attr_set(env, obj, attr, CAT_KMS);
162                 cl_object_attr_unlock(obj);
163         } else
164                 unlock_res_and_lock(dlmlock);
165
166         /* release a reference taken in osc_lock_upcall0(). */
167         lu_ref_del(&dlmlock->l_reference, "osc_lock", olck);
168         LDLM_LOCK_RELEASE(dlmlock);
169 }
170
171 static int osc_lock_unuse(const struct lu_env *env,
172                           const struct cl_lock_slice *slice)
173 {
174         struct osc_lock *ols = cl2osc_lock(slice);
175         int result;
176
177         LASSERT(ols->ols_state == OLS_GRANTED ||
178                 ols->ols_state == OLS_UPCALL_RECEIVED);
179         LINVRNT(osc_lock_invariant(ols));
180
181         if (ols->ols_glimpse) {
182                 LASSERT(ols->ols_hold == 0);
183                 return 0;
184         }
185         LASSERT(ols->ols_hold);
186
187         /*
188          * Move lock into OLS_RELEASED state before calling osc_cancel_base()
189          * so that possible synchronous cancellation (that always happens
190          * e.g., for liblustre) sees that lock is released.
191          */
192         ols->ols_state = OLS_RELEASED;
193         ols->ols_hold = 0;
194         result = osc_cancel_base(&ols->ols_handle, ols->ols_einfo.ei_mode);
195         ols->ols_has_ref = 0;
196         return result;
197 }
198
199 static void osc_lock_fini(const struct lu_env *env,
200                           struct cl_lock_slice *slice)
201 {
202         struct osc_lock  *ols = cl2osc_lock(slice);
203
204         LINVRNT(osc_lock_invariant(ols));
205         /*
206          * ->ols_hold can still be true at this point if, for example, a
207          * thread that requested a lock was killed (and released a reference
208          * to the lock), before reply from a server was received. In this case
209          * lock is destroyed immediately after upcall.
210          */
211         if (ols->ols_hold)
212                 osc_lock_unuse(env, slice);
213         LASSERT(ols->ols_lock == NULL);
214
215         OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
216 }
217
218 void osc_lock_build_res(const struct lu_env *env, const struct osc_object *obj,
219                         struct ldlm_res_id *resname)
220 {
221         const struct lu_fid *fid = lu_object_fid(&obj->oo_cl.co_lu);
222         if (0) {
223                 /*
224                  * In the perfect world of the future, where ost servers talk
225                  * idif-fids...
226                  */
227                 fid_build_reg_res_name(fid, resname);
228         } else {
229                 /*
230                  * In reality, where ost server expects ->lsm_object_id and
231                  * ->lsm_object_gr in rename.
232                  */
233                 osc_build_res_name(obj->oo_oinfo->loi_id, obj->oo_oinfo->loi_gr,
234                                    resname);
235         }
236 }
237
238 static void osc_lock_build_policy(const struct lu_env *env,
239                                   const struct cl_lock *lock,
240                                   ldlm_policy_data_t *policy)
241 {
242         const struct cl_lock_descr *d = &lock->cll_descr;
243
244         osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
245         policy->l_extent.gid = d->cld_gid;
246 }
247
248 static int osc_enq2ldlm_flags(__u32 enqflags)
249 {
250         int result = 0;
251
252         LASSERT((enqflags & ~CEF_MASK) == 0);
253
254         if (enqflags & CEF_NONBLOCK)
255                 result |= LDLM_FL_BLOCK_NOWAIT;
256         if (enqflags & CEF_ASYNC)
257                 result |= LDLM_FL_HAS_INTENT;
258         if (enqflags & CEF_DISCARD_DATA)
259                 result |= LDLM_AST_DISCARD_DATA;
260         return result;
261 }
262
263 /**
264  * Global spin-lock protecting consistency of ldlm_lock::l_ast_data
265  * pointers. Initialized in osc_init().
266  */
267 spinlock_t osc_ast_guard;
268
269 static struct osc_lock *osc_ast_data_get(struct ldlm_lock *dlm_lock)
270 {
271         struct osc_lock *olck;
272
273         lock_res_and_lock(dlm_lock);
274         spin_lock(&osc_ast_guard);
275         olck = dlm_lock->l_ast_data;
276         if (olck != NULL) {
277                 struct cl_lock *lock = olck->ols_cl.cls_lock;
278                 /*
279                  * If osc_lock holds a reference on ldlm lock, return it even
280                  * when cl_lock is in CLS_FREEING state. This way
281                  *
282                  *         osc_ast_data_get(dlmlock) == NULL
283                  *
284                  * guarantees that all osc references on dlmlock were
285                  * released. osc_dlm_blocking_ast0() relies on that.
286                  */
287                 if (lock->cll_state < CLS_FREEING || olck->ols_has_ref) {
288                         cl_lock_get_trust(lock);
289                         lu_ref_add_atomic(&lock->cll_reference,
290                                           "ast", cfs_current());
291                 } else
292                         olck = NULL;
293         }
294         spin_unlock(&osc_ast_guard);
295         unlock_res_and_lock(dlm_lock);
296         return olck;
297 }
298
299 static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck)
300 {
301         struct cl_lock *lock;
302
303         lock = olck->ols_cl.cls_lock;
304         lu_ref_del(&lock->cll_reference, "ast", cfs_current());
305         cl_lock_put(env, lock);
306 }
307
308 /**
309  * Updates object attributes from a lock value block (lvb) received together
310  * with the DLM lock reply from the server. Copy of osc_update_enqueue()
311  * logic.
312  *
313  * This can be optimized to not update attributes when lock is a result of a
314  * local match.
315  *
316  * Called under lock and resource spin-locks.
317  */
318 static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
319                                 int rc)
320 {
321         struct ost_lvb    *lvb;
322         struct cl_object  *obj;
323         struct lov_oinfo  *oinfo;
324         struct cl_attr    *attr;
325         unsigned           valid;
326
327         ENTRY;
328
329         if (!(olck->ols_flags & LDLM_FL_LVB_READY)) {
330                 EXIT;
331                 return;
332         }
333
334         lvb   = &olck->ols_lvb;
335         obj   = olck->ols_cl.cls_obj;
336         oinfo = cl2osc(obj)->oo_oinfo;
337         attr  = &osc_env_info(env)->oti_attr;
338         valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE;
339         cl_lvb2attr(attr, lvb);
340
341         cl_object_attr_lock(obj);
342         if (rc == 0) {
343                 struct ldlm_lock  *dlmlock;
344                 __u64 size;
345
346                 dlmlock = olck->ols_lock;
347                 LASSERT(dlmlock != NULL);
348
349                 /* re-grab LVB from a dlm lock under DLM spin-locks. */
350                 *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
351                 size = lvb->lvb_size;
352                 /* Extend KMS up to the end of this lock and no further
353                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
354                 if (size > dlmlock->l_policy_data.l_extent.end)
355                         size = dlmlock->l_policy_data.l_extent.end + 1;
356                 if (size >= oinfo->loi_kms) {
357                         LDLM_DEBUG(dlmlock, "lock acquired, setting rss="LPU64
358                                    ", kms="LPU64, lvb->lvb_size, size);
359                         valid |= CAT_KMS;
360                         attr->cat_kms = size;
361                 } else {
362                         LDLM_DEBUG(dlmlock, "lock acquired, setting rss="
363                                    LPU64"; leaving kms="LPU64", end="LPU64,
364                                    lvb->lvb_size, oinfo->loi_kms,
365                                    dlmlock->l_policy_data.l_extent.end);
366                 }
367                 ldlm_lock_allow_match_locked(dlmlock);
368         } else if (rc == -ENAVAIL && olck->ols_glimpse) {
369                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
370                        " kms="LPU64"\n", lvb->lvb_size, oinfo->loi_kms);
371         } else
372                 valid = 0;
373
374         if (valid != 0)
375                 cl_object_attr_set(env, obj, attr, valid);
376
377         cl_object_attr_unlock(obj);
378
379         EXIT;
380 }
381
382 /**
383  * Called when a lock is granted, from an upcall (when server returned a
384  * granted lock), or from completion AST, when server returned a blocked lock.
385  *
386  * Called under lock and resource spin-locks, that are released temporarily
387  * here.
388  */
389 static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
390                              struct ldlm_lock *dlmlock, int rc)
391 {
392         struct ldlm_extent   *ext;
393         struct cl_lock       *lock;
394         struct cl_lock_descr *descr;
395
396         LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode);
397
398         ENTRY;
399         if (olck->ols_state != OLS_GRANTED) {
400                 lock  = olck->ols_cl.cls_lock;
401                 ext   = &dlmlock->l_policy_data.l_extent;
402                 descr = &osc_env_info(env)->oti_descr;
403                 descr->cld_obj = lock->cll_descr.cld_obj;
404
405                 /* XXX check that ->l_granted_mode is valid. */
406                 descr->cld_mode  = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
407                 descr->cld_start = cl_index(descr->cld_obj, ext->start);
408                 descr->cld_end   = cl_index(descr->cld_obj, ext->end);
409                 descr->cld_gid   = ext->gid;
410                 /*
411                  * tell upper layers the extent of the lock that was actually
412                  * granted
413                  */
414                 olck->ols_state = OLS_GRANTED;
415                 osc_lock_lvb_update(env, olck, rc);
416
417                 /* release DLM spin-locks to allow cl_lock_{modify,signal}()
418                  * to take a semaphore on a parent lock. This is safe, because
419                  * spin-locks are needed to protect consistency of
420                  * dlmlock->l_*_mode and LVB, and we have finished processing
421                  * them. */
422                 unlock_res_and_lock(dlmlock);
423                 cl_lock_modify(env, lock, descr);
424                 cl_lock_signal(env, lock);
425                 LINVRNT(osc_lock_invariant(olck));
426                 lock_res_and_lock(dlmlock);
427         }
428         EXIT;
429 }
430
431 static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck)
432
433 {
434         struct ldlm_lock *dlmlock;
435
436         ENTRY;
437
438         dlmlock = ldlm_handle2lock_long(&olck->ols_handle, 0);
439         LASSERT(dlmlock != NULL);
440
441         lock_res_and_lock(dlmlock);
442         spin_lock(&osc_ast_guard);
443         LASSERT(dlmlock->l_ast_data == olck);
444         LASSERT(olck->ols_lock == NULL);
445         olck->ols_lock = dlmlock;
446         spin_unlock(&osc_ast_guard);
447
448         /*
449          * Lock might be not yet granted. In this case, completion ast
450          * (osc_ldlm_completion_ast()) comes later and finishes lock
451          * granting.
452          */
453         if (dlmlock->l_granted_mode == dlmlock->l_req_mode)
454                 osc_lock_granted(env, olck, dlmlock, 0);
455         unlock_res_and_lock(dlmlock);
456
457         /*
458          * osc_enqueue_interpret() decrefs asynchronous locks, counter
459          * this.
460          */
461         ldlm_lock_addref(&olck->ols_handle, olck->ols_einfo.ei_mode);
462         olck->ols_hold = olck->ols_has_ref = 1;
463
464         /* lock reference taken by ldlm_handle2lock_long() is owned by
465          * osc_lock and released in osc_lock_detach() */
466         lu_ref_add(&dlmlock->l_reference, "osc_lock", olck);
467 }
468
469 /**
470  * Lock upcall function that is executed either when a reply to ENQUEUE rpc is
471  * received from a server, or after osc_enqueue_base() matched a local DLM
472  * lock.
473  */
474 static int osc_lock_upcall(void *cookie, int errcode)
475 {
476         struct osc_lock         *olck  = cookie;
477         struct cl_lock_slice    *slice = &olck->ols_cl;
478         struct cl_lock          *lock  = slice->cls_lock;
479         struct lu_env           *env;
480         struct cl_env_nest       nest;
481
482         ENTRY;
483         env = cl_env_nested_get(&nest);
484         if (!IS_ERR(env)) {
485                 int rc;
486
487                 cl_lock_mutex_get(env, lock);
488
489                 LASSERT(lock->cll_state >= CLS_QUEUING);
490                 if (olck->ols_state == OLS_ENQUEUED) {
491                         olck->ols_state = OLS_UPCALL_RECEIVED;
492                         rc = ldlm_error2errno(errcode);
493                 } else if (olck->ols_state == OLS_CANCELLED) {
494                         rc = -EIO;
495                 } else {
496                         CERROR("Impossible state: %i\n", olck->ols_state);
497                         LBUG();
498                 }
499                 if (rc) {
500                         struct ldlm_lock *dlmlock;
501
502                         dlmlock = ldlm_handle2lock(&olck->ols_handle);
503                         if (dlmlock != NULL) {
504                                 lock_res_and_lock(dlmlock);
505                                 spin_lock(&osc_ast_guard);
506                                 LASSERT(olck->ols_lock == NULL);
507                                 dlmlock->l_ast_data = NULL;
508                                 olck->ols_handle.cookie = 0ULL;
509                                 spin_unlock(&osc_ast_guard);
510                                 unlock_res_and_lock(dlmlock);
511                                 LDLM_LOCK_PUT(dlmlock);
512                         }
513                 } else {
514                         if (olck->ols_glimpse)
515                                 olck->ols_glimpse = 0;
516                         osc_lock_upcall0(env, olck);
517                 }
518
519                 /* Error handling, some errors are tolerable. */
520                 if (olck->ols_locklessable && rc == -EUSERS) {
521                         /* This is a tolerable error, turn this lock into
522                          * lockless lock.
523                          */
524                         osc_object_set_contended(cl2osc(slice->cls_obj));
525                         LASSERT(slice->cls_ops == &osc_lock_ops);
526
527                         /* Change this lock to ldlmlock-less lock. */
528                         osc_lock_to_lockless(env, olck, 1);
529                         olck->ols_state = OLS_GRANTED;
530                         rc = 0;
531                 } else if (olck->ols_glimpse && rc == -ENAVAIL) {
532                         osc_lock_lvb_update(env, olck, rc);
533                         cl_lock_delete(env, lock);
534                         /* Hide the error. */
535                         rc = 0;
536                 }
537
538                 if (rc == 0)
539                         /* on error, lock was signaled by cl_lock_error() */
540                         cl_lock_signal(env, lock);
541                 else
542                         cl_lock_error(env, lock, rc);
543
544                 cl_lock_mutex_put(env, lock);
545
546                 /* release cookie reference, acquired by osc_lock_enqueue() */
547                 lu_ref_del(&lock->cll_reference, "upcall", lock);
548                 cl_lock_put(env, lock);
549                 cl_env_nested_put(&nest, env);
550         } else
551                 /* should never happen, similar to osc_ldlm_blocking_ast(). */
552                 LBUG();
553         RETURN(errcode);
554 }
555
556 /**
557  * Core of osc_dlm_blocking_ast() logic.
558  */
559 static void osc_lock_blocking(const struct lu_env *env,
560                               struct ldlm_lock *dlmlock,
561                               struct osc_lock *olck, int blocking)
562 {
563         struct cl_lock *lock = olck->ols_cl.cls_lock;
564
565         LASSERT(olck->ols_lock == dlmlock);
566         CLASSERT(OLS_BLOCKED < OLS_CANCELLED);
567         LASSERT(!osc_lock_is_lockless(olck));
568
569         if (olck->ols_hold)
570                 /*
571                  * Lock might be still addref-ed here, if e.g., blocking ast
572                  * is sent for a failed lock.
573                  */
574                 osc_lock_unuse(env, &olck->ols_cl);
575
576         if (blocking && olck->ols_state < OLS_BLOCKED)
577                 /*
578                  * Move osc_lock into OLS_BLOCKED before canceling the lock,
579                  * because it recursively re-enters osc_lock_blocking(), with
580                  * the state set to OLS_CANCELLED.
581                  */
582                 olck->ols_state = OLS_BLOCKED;
583         /*
584          * cancel and destroy lock at least once no matter how blocking ast is
585          * entered (see comment above osc_ldlm_blocking_ast() for use
586          * cases). cl_lock_cancel() and cl_lock_delete() are idempotent.
587          */
588         cl_lock_cancel(env, lock);
589         cl_lock_delete(env, lock);
590 }
591
592 /**
593  * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock
594  * and ldlm_lock caches.
595  */
596 static int osc_dlm_blocking_ast0(const struct lu_env *env,
597                                  struct ldlm_lock *dlmlock,
598                                  void *data, int flag)
599 {
600         struct osc_lock *olck;
601         struct cl_lock  *lock;
602         int result;
603         int cancel;
604
605         LASSERT(flag == LDLM_CB_BLOCKING || flag == LDLM_CB_CANCELING);
606
607         cancel = 0;
608         olck = osc_ast_data_get(dlmlock);
609         if (olck != NULL) {
610                 lock = olck->ols_cl.cls_lock;
611                 cl_lock_mutex_get(env, lock);
612                 LINVRNT(osc_lock_invariant(olck));
613                 if (olck->ols_ast_wait) {
614                         /* wake up osc_lock_use() */
615                         cl_lock_signal(env, lock);
616                         olck->ols_ast_wait = 0;
617                 }
618                 /*
619                  * Lock might have been canceled while this thread was
620                  * sleeping for lock mutex, but olck is pinned in memory.
621                  */
622                 if (olck == dlmlock->l_ast_data) {
623                         /*
624                          * NOTE: DLM sends blocking AST's for failed locks
625                          *       (that are still in pre-OLS_GRANTED state)
626                          *       too, and they have to be canceled otherwise
627                          *       DLM lock is never destroyed and stuck in
628                          *       the memory.
629                          *
630                          *       Alternatively, ldlm_cli_cancel() can be
631                          *       called here directly for osc_locks with
632                          *       ols_state < OLS_GRANTED to maintain an
633                          *       invariant that ->clo_cancel() is only called
634                          *       for locks that were granted.
635                          */
636                         LASSERT(data == olck);
637                         osc_lock_blocking(env, dlmlock,
638                                           olck, flag == LDLM_CB_BLOCKING);
639                 } else
640                         cancel = 1;
641                 cl_lock_mutex_put(env, lock);
642                 osc_ast_data_put(env, olck);
643         } else
644                 /*
645                  * DLM lock exists, but there is no cl_lock attached to it.
646                  * This is a `normal' race. cl_object and its cl_lock's can be
647                  * removed by memory pressure, together with all pages.
648                  */
649                 cancel = (flag == LDLM_CB_BLOCKING);
650
651         if (cancel) {
652                 struct lustre_handle *lockh;
653
654                 lockh = &osc_env_info(env)->oti_handle;
655                 ldlm_lock2handle(dlmlock, lockh);
656                 result = ldlm_cli_cancel(lockh);
657         } else
658                 result = 0;
659         return result;
660 }
661
662 /**
663  * Blocking ast invoked by ldlm when dlm lock is either blocking progress of
664  * some other lock, or is canceled. This function is installed as a
665  * ldlm_lock::l_blocking_ast() for client extent locks.
666  *
667  * Control flow is tricky, because ldlm uses the same call-back
668  * (ldlm_lock::l_blocking_ast()) for both blocking and cancellation ast's.
669  *
670  * \param dlmlock lock for which ast occurred.
671  *
672  * \param new description of a conflicting lock in case of blocking ast.
673  *
674  * \param data value of dlmlock->l_ast_data
675  *
676  * \param flag LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
677  *             cancellation and blocking ast's.
678  *
679  * Possible use cases:
680  *
681  *     - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING) to cancel
682  *       lock due to lock lru pressure, or explicit user request to purge
683  *       locks.
684  *
685  *     - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_BLOCKING) to notify
686  *       us that dlmlock conflicts with another lock that some client is
687  *       enqueing. Lock is canceled.
688  *
689  *           - cl_lock_cancel() is called. osc_lock_cancel() calls
690  *             ldlm_cli_cancel() that calls
691  *
692  *                  dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
693  *
694  *             recursively entering osc_ldlm_blocking_ast().
695  *
696  *     - client cancels lock voluntary (e.g., as a part of early cancellation):
697  *
698  *           cl_lock_cancel()->
699  *             osc_lock_cancel()->
700  *               ldlm_cli_cancel()->
701  *                 dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
702  *
703  */
704 static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
705                                  struct ldlm_lock_desc *new, void *data,
706                                  int flag)
707 {
708         struct lu_env     *env;
709         struct cl_env_nest nest;
710         int                result;
711
712         /*
713          * This can be called in the context of outer IO, e.g.,
714          *
715          *     cl_enqueue()->...
716          *       ->osc_enqueue_base()->...
717          *         ->ldlm_prep_elc_req()->...
718          *           ->ldlm_cancel_callback()->...
719          *             ->osc_ldlm_blocking_ast()
720          *
721          * new environment has to be created to not corrupt outer context.
722          */
723         env = cl_env_nested_get(&nest);
724         if (!IS_ERR(env)) {
725                 result = osc_dlm_blocking_ast0(env, dlmlock, data, flag);
726                 cl_env_nested_put(&nest, env);
727         } else {
728                 result = PTR_ERR(env);
729                 /*
730                  * XXX This should never happen, as cl_lock is
731                  * stuck. Pre-allocated environment a la vvp_inode_fini_env
732                  * should be used.
733                  */
734                 LBUG();
735         }
736         if (result != 0) {
737                 if (result == -ENODATA)
738                         result = 0;
739                 else
740                         CERROR("BAST failed: %d\n", result);
741         }
742         return result;
743 }
744
745 static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock,
746                                    int flags, void *data)
747 {
748         struct cl_env_nest nest;
749         struct lu_env     *env;
750         struct osc_lock   *olck;
751         struct cl_lock    *lock;
752         int result;
753         int dlmrc;
754
755         /* first, do dlm part of the work */
756         dlmrc = ldlm_completion_ast_async(dlmlock, flags, data);
757         /* then, notify cl_lock */
758         env = cl_env_nested_get(&nest);
759         if (!IS_ERR(env)) {
760                 olck = osc_ast_data_get(dlmlock);
761                 if (olck != NULL) {
762                         lock = olck->ols_cl.cls_lock;
763                         cl_lock_mutex_get(env, lock);
764                         /*
765                          * ldlm_handle_cp_callback() copied LVB from request
766                          * to lock->l_lvb_data, store it in osc_lock.
767                          */
768                         LASSERT(dlmlock->l_lvb_data != NULL);
769                         lock_res_and_lock(dlmlock);
770                         olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
771                         if (olck->ols_lock == NULL)
772                                 /*
773                                  * upcall (osc_lock_upcall()) hasn't yet been
774                                  * called. Do nothing now, upcall will bind
775                                  * olck to dlmlock and signal the waiters.
776                                  *
777                                  * This maintains an invariant that osc_lock
778                                  * and ldlm_lock are always bound when
779                                  * osc_lock is in OLS_GRANTED state.
780                                  */
781                                 ;
782                         else if (dlmlock->l_granted_mode != LCK_MINMODE)
783                                 osc_lock_granted(env, olck, dlmlock, dlmrc);
784                         unlock_res_and_lock(dlmlock);
785                         if (dlmrc != 0)
786                                 cl_lock_error(env, lock, dlmrc);
787                         cl_lock_mutex_put(env, lock);
788                         osc_ast_data_put(env, olck);
789                         result = 0;
790                 } else
791                         result = -ELDLM_NO_LOCK_DATA;
792                 cl_env_nested_put(&nest, env);
793         } else
794                 result = PTR_ERR(env);
795         return dlmrc ?: result;
796 }
797
798 static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
799 {
800         struct ptlrpc_request  *req  = data;
801         struct osc_lock        *olck;
802         struct cl_lock         *lock;
803         struct cl_object       *obj;
804         struct cl_env_nest      nest;
805         struct lu_env          *env;
806         struct ost_lvb         *lvb;
807         struct req_capsule     *cap;
808         int                     result;
809
810         LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
811
812         env = cl_env_nested_get(&nest);
813         if (!IS_ERR(env)) {
814                 /*
815                  * osc_ast_data_get() has to go after environment is
816                  * allocated, because osc_ast_data() acquires a
817                  * reference to a lock, and it can only be released in
818                  * environment.
819                  */
820                 olck = osc_ast_data_get(dlmlock);
821                 if (olck != NULL) {
822                         lock = olck->ols_cl.cls_lock;
823                         cl_lock_mutex_get(env, lock);
824                         cap = &req->rq_pill;
825                         req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK);
826                         req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER,
827                                              sizeof *lvb);
828                         result = req_capsule_server_pack(cap);
829                         if (result == 0) {
830                                 lvb = req_capsule_server_get(cap, &RMF_DLM_LVB);
831                                 obj = lock->cll_descr.cld_obj;
832                                 result = cl_object_glimpse(env, obj, lvb);
833                         }
834                         cl_lock_mutex_put(env, lock);
835                         osc_ast_data_put(env, olck);
836                 } else {
837                         /*
838                          * These errors are normal races, so we don't want to
839                          * fill the console with messages by calling
840                          * ptlrpc_error()
841                          */
842                         lustre_pack_reply(req, 1, NULL, NULL);
843                         result = -ELDLM_NO_LOCK_DATA;
844                 }
845                 cl_env_nested_put(&nest, env);
846         } else
847                 result = PTR_ERR(env);
848         req->rq_status = result;
849         return result;
850 }
851
852 static unsigned long osc_lock_weigh(const struct lu_env *env,
853                                     const struct cl_lock_slice *slice)
854 {
855         /*
856          * don't need to grab coh_page_guard since we don't care the exact #
857          * of pages..
858          */
859         return cl_object_header(slice->cls_obj)->coh_pages;
860 }
861
862 /**
863  * Get the weight of dlm lock for early cancellation.
864  *
865  * XXX: it should return the pages covered by this \a dlmlock.
866  */
867 static unsigned long osc_ldlm_weigh_ast(struct ldlm_lock *dlmlock)
868 {
869         struct cl_env_nest       nest;
870         struct lu_env           *env;
871         struct osc_lock         *lock;
872         struct cl_lock          *cll;
873         unsigned long            weight;
874         ENTRY;
875
876         might_sleep();
877         /*
878          * osc_ldlm_weigh_ast has a complex context since it might be called
879          * because of lock canceling, or from user's input. We have to make
880          * a new environment for it. Probably it is implementation safe to use
881          * the upper context because cl_lock_put don't modify environment
882          * variables. But in case of ..
883          */
884         env = cl_env_nested_get(&nest);
885         if (IS_ERR(env))
886                 /* Mostly because lack of memory, tend to eliminate this lock*/
887                 RETURN(0);
888
889         LASSERT(dlmlock->l_resource->lr_type == LDLM_EXTENT);
890         lock = osc_ast_data_get(dlmlock);
891         if (lock == NULL) {
892                 /* cl_lock was destroyed because of memory pressure.
893                  * It is much reasonable to assign this type of lock
894                  * a lower cost.
895                  */
896                 GOTO(out, weight = 0);
897         }
898
899         cll = lock->ols_cl.cls_lock;
900         cl_lock_mutex_get(env, cll);
901         weight = cl_lock_weigh(env, cll);
902         cl_lock_mutex_put(env, cll);
903         osc_ast_data_put(env, lock);
904         EXIT;
905
906 out:
907         cl_env_nested_put(&nest, env);
908         return weight;
909 }
910
911 static void osc_lock_build_einfo(const struct lu_env *env,
912                                  const struct cl_lock *clock,
913                                  struct osc_lock *lock,
914                                  struct ldlm_enqueue_info *einfo)
915 {
916         enum cl_lock_mode mode;
917
918         mode = clock->cll_descr.cld_mode;
919         if (mode == CLM_PHANTOM)
920                 /*
921                  * For now, enqueue all glimpse locks in read mode. In the
922                  * future, client might choose to enqueue LCK_PW lock for
923                  * glimpse on a file opened for write.
924                  */
925                 mode = CLM_READ;
926
927         einfo->ei_type   = LDLM_EXTENT;
928         einfo->ei_mode   = osc_cl_lock2ldlm(mode);
929         einfo->ei_cb_bl  = osc_ldlm_blocking_ast;
930         einfo->ei_cb_cp  = osc_ldlm_completion_ast;
931         einfo->ei_cb_gl  = osc_ldlm_glimpse_ast;
932         einfo->ei_cb_wg  = osc_ldlm_weigh_ast;
933         einfo->ei_cbdata = lock; /* value to be put into ->l_ast_data */
934 }
935
936 static int osc_lock_delete0(struct cl_lock *conflict)
937 {
938         struct cl_env_nest    nest;
939         struct lu_env        *env;
940         int    rc = 0;        
941
942         env = cl_env_nested_get(&nest);
943         if (!IS_ERR(env)) {
944                 cl_lock_delete(env, conflict);
945                 cl_env_nested_put(&nest, env);
946         } else
947                 rc = PTR_ERR(env);
948         return rc; 
949 }
950 /**
951  * Cancels \a conflict lock and waits until it reached CLS_FREEING state. This
952  * is called as a part of enqueuing to cancel conflicting locks early.
953  *
954  * \retval            0: success, \a conflict was cancelled and destroyed.
955  *
956  * \retval   CLO_REPEAT: \a conflict was cancelled, but \a lock mutex was
957  *                       released in the process. Repeat enqueing.
958  *
959  * \retval -EWOULDBLOCK: \a conflict cannot be cancelled immediately, and
960  *                       either \a lock is non-blocking, or current thread
961  *                       holds other locks, that prevent it from waiting
962  *                       for cancel to complete.
963  *
964  * \retval          -ve: other error, including -EINTR.
965  *
966  */
967 static int osc_lock_cancel_wait(const struct lu_env *env, struct cl_lock *lock,
968                                 struct cl_lock *conflict, int canwait)
969 {
970         int rc;
971
972         LASSERT(cl_lock_is_mutexed(lock));
973         LASSERT(cl_lock_is_mutexed(conflict));
974
975         rc = 0;
976         if (conflict->cll_state != CLS_FREEING) {
977                 cl_lock_cancel(env, conflict);
978                 rc = osc_lock_delete0(conflict);
979                 if (rc)
980                         return rc; 
981                 if (conflict->cll_flags & (CLF_CANCELPEND|CLF_DOOMED)) {
982                         rc = -EWOULDBLOCK;
983                         if (cl_lock_nr_mutexed(env) > 2)
984                                 /*
985                                  * If mutices of locks other than @lock and
986                                  * @scan are held by the current thread, it
987                                  * cannot wait on @scan state change in a
988                                  * dead-lock safe matter, so simply skip early
989                                  * cancellation in this case.
990                                  *
991                                  * This means that early cancellation doesn't
992                                  * work when there is even slight mutex
993                                  * contention, as top-lock's mutex is usually
994                                  * held at this time.
995                                  */
996                                 ;
997                         else if (canwait) {
998                                 /* Waiting for @scan to be destroyed */
999                                 cl_lock_mutex_put(env, lock);
1000                                 do {
1001                                         rc = cl_lock_state_wait(env, conflict);
1002                                 } while (!rc &&
1003                                          conflict->cll_state < CLS_FREEING);
1004                                 /* mutex was released, repeat enqueue. */
1005                                 rc = rc ?: CLO_REPEAT;
1006                                 cl_lock_mutex_get(env, lock);
1007                         }
1008                 }
1009                 LASSERT(ergo(!rc, conflict->cll_state == CLS_FREEING));
1010                 CDEBUG(D_INFO, "lock %p was %s freed now, rc (%d)\n",
1011                        conflict, rc ? "not":"", rc);
1012         }
1013         return rc;
1014 }
1015
1016 /**
1017  * Determine if the lock should be converted into a lockless lock.
1018  *
1019  * Steps to check:
1020  * - if the lock has an explicite requirment for a non-lockless lock;
1021  * - if the io lock request type ci_lockreq;
1022  * - send the enqueue rpc to ost to make the further decision;
1023  * - special treat to truncate lockless lock
1024  *
1025  *  Additional policy can be implemented here, e.g., never do lockless-io
1026  *  for large extents.
1027  */
1028 static void osc_lock_to_lockless(const struct lu_env *env,
1029                                  struct osc_lock *ols, int force)
1030 {
1031         struct cl_lock_slice *slice = &ols->ols_cl;
1032         struct cl_lock *lock        = slice->cls_lock;
1033
1034         LASSERT(ols->ols_state == OLS_NEW ||
1035                 ols->ols_state == OLS_UPCALL_RECEIVED);
1036
1037         if (force) {
1038                 ols->ols_locklessable = 1;
1039                 LASSERT(cl_lock_is_mutexed(lock));
1040                 slice->cls_ops = &osc_lock_lockless_ops;
1041         } else {
1042                 struct osc_io *oio     = osc_env_io(env);
1043                 struct cl_io  *io      = oio->oi_cl.cis_io;
1044                 struct cl_object *obj  = slice->cls_obj;
1045                 struct osc_object *oob = cl2osc(obj);
1046                 const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
1047                 struct obd_connect_data *ocd;
1048
1049                 LASSERT(io->ci_lockreq == CILR_MANDATORY ||
1050                         io->ci_lockreq == CILR_MAYBE ||
1051                         io->ci_lockreq == CILR_NEVER);
1052
1053                 ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
1054                 ols->ols_locklessable = (io->ci_type != CIT_TRUNC) &&
1055                                 (io->ci_lockreq == CILR_MAYBE) &&
1056                                 (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
1057                 if (io->ci_lockreq == CILR_NEVER ||
1058                         /* lockless IO */
1059                     (ols->ols_locklessable && osc_object_is_contended(oob)) ||
1060                         /* lockless truncate */
1061                     (io->ci_type == CIT_TRUNC &&
1062                      (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
1063                       osd->od_lockless_truncate)) {
1064                         ols->ols_locklessable = 1;
1065                         slice->cls_ops = &osc_lock_lockless_ops;
1066                 }
1067         }
1068         LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
1069 }
1070
1071 /**
1072  * Cancel all conflicting locks and wait for them to be destroyed.
1073  *
1074  * This function is used for two purposes:
1075  *
1076  *     - early cancel all conflicting locks before starting IO, and
1077  *
1078  *     - guarantee that pages added to the page cache by lockless IO are never
1079  *       covered by locks other than lockless IO lock, and, hence, are not
1080  *       visible to other threads.
1081  */
1082 static int osc_lock_enqueue_wait(const struct lu_env *env,
1083                                  const struct osc_lock *olck)
1084 {
1085         struct cl_lock          *lock    = olck->ols_cl.cls_lock;
1086         struct cl_lock_descr    *descr   = &lock->cll_descr;
1087         struct cl_object_header *hdr     = cl_object_header(descr->cld_obj);
1088         struct cl_lock_closure  *closure = &osc_env_info(env)->oti_closure;
1089         struct cl_lock          *scan;
1090         struct cl_lock          *temp;
1091         int lockless                     = osc_lock_is_lockless(olck);
1092         int rc                           = 0;
1093         int canwait;
1094         int stop;
1095         ENTRY;
1096
1097         LASSERT(cl_lock_is_mutexed(lock));
1098         LASSERT(lock->cll_state == CLS_QUEUING);
1099
1100         /*
1101          * XXX This function could be sped up if we had asynchronous
1102          * cancellation.
1103          */
1104
1105         canwait =
1106                 !(olck->ols_flags & LDLM_FL_BLOCK_NOWAIT) &&
1107                 cl_lock_nr_mutexed(env) == 1;
1108         cl_lock_closure_init(env, closure, lock, canwait);
1109         spin_lock(&hdr->coh_lock_guard);
1110         list_for_each_entry_safe(scan, temp, &hdr->coh_locks, cll_linkage) {
1111                 if (scan == lock)
1112                         continue;
1113
1114                 if (scan->cll_state < CLS_QUEUING ||
1115                     scan->cll_state == CLS_FREEING ||
1116                     scan->cll_descr.cld_start > descr->cld_end ||
1117                     scan->cll_descr.cld_end < descr->cld_start)
1118                         continue;
1119
1120                 /* overlapped and living locks. */
1121
1122                 /* We're not supposed to give up group lock. */
1123                 if (scan->cll_descr.cld_mode == CLM_GROUP) {
1124                         LASSERT(descr->cld_mode != CLM_GROUP ||
1125                                 descr->cld_gid != scan->cll_descr.cld_gid);
1126                         continue;
1127                 }
1128
1129                 /* A tricky case for lockless pages:
1130                  * We need to cancel the compatible locks if we're enqueuing
1131                  * a lockless lock, for example:
1132                  * imagine that client has PR lock on [0, 1000], and thread T0
1133                  * is doing lockless IO in [500, 1500] region. Concurrent
1134                  * thread T1 can see lockless data in [500, 1000], which is
1135                  * wrong, because these data are possibly stale.
1136                  */
1137                 if (!lockless && cl_lock_compatible(scan, lock))
1138                         continue;
1139
1140                 /* Now @scan is conflicting with @lock, this means current
1141                  * thread have to sleep for @scan being destroyed. */
1142                 cl_lock_get_trust(scan);
1143                 if (&temp->cll_linkage != &hdr->coh_locks)
1144                         cl_lock_get_trust(temp);
1145                 spin_unlock(&hdr->coh_lock_guard);
1146                 lu_ref_add(&scan->cll_reference, "cancel-wait", lock);
1147
1148                 LASSERT(list_empty(&closure->clc_list));
1149                 rc = cl_lock_closure_build(env, scan, closure);
1150                 if (rc == 0) {
1151                         rc = osc_lock_cancel_wait(env, lock, scan, canwait);
1152                         cl_lock_disclosure(env, closure);
1153                         if (rc == -EWOULDBLOCK)
1154                                 rc = 0;
1155                 }
1156                 if (rc == CLO_REPEAT && !canwait)
1157                         /* cannot wait... no early cancellation. */
1158                         rc = 0;
1159
1160                 lu_ref_del(&scan->cll_reference, "cancel-wait", lock);
1161                 cl_lock_put(env, scan);
1162                 spin_lock(&hdr->coh_lock_guard);
1163                 /*
1164                  * Lock list could have been modified, while spin-lock was
1165                  * released. Check that it is safe to continue.
1166                  */
1167                 stop = list_empty(&temp->cll_linkage);
1168                 if (&temp->cll_linkage != &hdr->coh_locks)
1169                         cl_lock_put(env, temp);
1170                 if (stop || rc != 0)
1171                         break;
1172         }
1173         spin_unlock(&hdr->coh_lock_guard);
1174         cl_lock_closure_fini(closure);
1175         RETURN(rc);
1176 }
1177
1178 /**
1179  * Deadlock avoidance for osc_lock_enqueue(). Consider following scenario:
1180  *
1181  *     - Thread0: obtains PR:[0, 10]. Lock is busy.
1182  *
1183  *     - Thread1: enqueues PW:[5, 50]. Blocking ast is sent to
1184  *       PR:[0, 10], but cancellation of busy lock is postponed.
1185  *
1186  *     - Thread0: enqueue PR:[30, 40]. Lock is locally matched to
1187  *       PW:[5, 50], and thread0 waits for the lock completion never
1188  *       releasing PR:[0, 10]---deadlock.
1189  *
1190  * The second PR lock can be glimpse (it is to deal with that situation that
1191  * ll_glimpse_size() has second argument, preventing local match of
1192  * not-yet-granted locks, see bug 10295). Similar situation is possible in the
1193  * case of memory mapped user level buffer.
1194  *
1195  * To prevent this we can detect a situation when current "thread" or "io"
1196  * already holds a lock on this object and either add LDLM_FL_BLOCK_GRANTED to
1197  * the ols->ols_flags, or prevent local match with PW locks.
1198  */
1199 static int osc_deadlock_is_possible(const struct lu_env *env,
1200                                     struct cl_lock *lock)
1201 {
1202         struct cl_object        *obj;
1203         struct cl_object_header *head;
1204         struct cl_lock          *scan;
1205         struct osc_io           *oio;
1206
1207         int result;
1208
1209         ENTRY;
1210
1211         LASSERT(cl_lock_is_mutexed(lock));
1212
1213         oio  = osc_env_io(env);
1214         obj  = lock->cll_descr.cld_obj;
1215         head = cl_object_header(obj);
1216
1217         result = 0;
1218         spin_lock(&head->coh_lock_guard);
1219         list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
1220                 if (scan != lock) {
1221                         struct osc_lock *oscan;
1222
1223                         oscan = osc_lock_at(scan);
1224                         LASSERT(oscan != NULL);
1225                         if (oscan->ols_owner == oio) {
1226                                 result = 1;
1227                                 break;
1228                         }
1229                 }
1230         }
1231         spin_unlock(&head->coh_lock_guard);
1232         RETURN(result);
1233 }
1234
1235 /**
1236  * Implementation of cl_lock_operations::clo_enqueue() method for osc
1237  * layer. This initiates ldlm enqueue:
1238  *
1239  *     - checks for possible dead-lock conditions (osc_deadlock_is_possible());
1240  *
1241  *     - cancels conflicting locks early (osc_lock_enqueue_wait());
1242  *
1243  *     - calls osc_enqueue_base() to do actual enqueue.
1244  *
1245  * osc_enqueue_base() is supplied with an upcall function that is executed
1246  * when lock is received either after a local cached ldlm lock is matched, or
1247  * when a reply from the server is received.
1248  *
1249  * This function does not wait for the network communication to complete.
1250  */
1251 static int osc_lock_enqueue(const struct lu_env *env,
1252                             const struct cl_lock_slice *slice,
1253                             struct cl_io *_, __u32 enqflags)
1254 {
1255         struct osc_lock          *ols     = cl2osc_lock(slice);
1256         struct cl_lock           *lock    = ols->ols_cl.cls_lock;
1257         struct osc_object        *obj     = cl2osc(slice->cls_obj);
1258         struct osc_thread_info   *info    = osc_env_info(env);
1259         struct ldlm_res_id       *resname = &info->oti_resname;
1260         ldlm_policy_data_t       *policy  = &info->oti_policy;
1261         struct ldlm_enqueue_info *einfo   = &ols->ols_einfo;
1262         int result;
1263         ENTRY;
1264
1265         LASSERT(cl_lock_is_mutexed(lock));
1266         LASSERT(lock->cll_state == CLS_QUEUING);
1267         LASSERT(ols->ols_state == OLS_NEW);
1268
1269         osc_lock_build_res(env, obj, resname);
1270         osc_lock_build_policy(env, lock, policy);
1271         ols->ols_flags = osc_enq2ldlm_flags(enqflags);
1272         if (osc_deadlock_is_possible(env, lock))
1273                 ols->ols_flags |= LDLM_FL_BLOCK_GRANTED;
1274         if (ols->ols_flags & LDLM_FL_HAS_INTENT)
1275                 ols->ols_glimpse = 1;
1276
1277         result = osc_lock_enqueue_wait(env, ols);
1278         if (result == 0) {
1279                 if (!(enqflags & CEF_MUST))
1280                         /* try to convert this lock to a lockless lock */
1281                         osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER));
1282                 if (!osc_lock_is_lockless(ols)) {
1283                         if (ols->ols_locklessable)
1284                                 ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
1285
1286                         /* a reference for lock, passed as an upcall cookie */
1287                         cl_lock_get(lock);
1288                         lu_ref_add(&lock->cll_reference, "upcall", lock);
1289                         ols->ols_state = OLS_ENQUEUED;
1290
1291                         /*
1292                          * XXX: this is possible blocking point as
1293                          * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
1294                          * LDLM_CP_CALLBACK.
1295                          */
1296                         result = osc_enqueue_base(osc_export(obj), resname,
1297                                           &ols->ols_flags, policy,
1298                                           &ols->ols_lvb,
1299                                           obj->oo_oinfo->loi_kms_valid,
1300                                           osc_lock_upcall,
1301                                           ols, einfo, &ols->ols_handle,
1302                                           PTLRPCD_SET, 1);
1303                         if (result != 0) {
1304                                 lu_ref_del(&lock->cll_reference,
1305                                            "upcall", lock);
1306                                 cl_lock_put(env, lock);
1307                         }
1308                 } else {
1309                         ols->ols_state = OLS_GRANTED;
1310                 }
1311         }
1312         LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
1313         RETURN(result);
1314 }
1315
1316 static int osc_lock_wait(const struct lu_env *env,
1317                          const struct cl_lock_slice *slice)
1318 {
1319         struct osc_lock *olck = cl2osc_lock(slice);
1320         struct cl_lock  *lock = olck->ols_cl.cls_lock;
1321
1322         LINVRNT(osc_lock_invariant(olck));
1323         if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED)
1324                 return 0;
1325
1326         LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED &&
1327                      lock->cll_error == 0, olck->ols_lock != NULL));
1328
1329         return lock->cll_error ?: olck->ols_state >= OLS_GRANTED ? 0 : CLO_WAIT;
1330 }
1331
1332 /**
1333  * An implementation of cl_lock_operations::clo_use() method that pins cached
1334  * lock.
1335  */
1336 static int osc_lock_use(const struct lu_env *env,
1337                         const struct cl_lock_slice *slice)
1338 {
1339         struct osc_lock *olck = cl2osc_lock(slice);
1340         int rc;
1341
1342         LASSERT(!olck->ols_hold);
1343         /*
1344          * Atomically check for LDLM_FL_CBPENDING and addref a lock if this
1345          * flag is not set. This protects us from a concurrent blocking ast.
1346          */
1347         rc = ldlm_lock_addref_try(&olck->ols_handle, olck->ols_einfo.ei_mode);
1348         if (rc == 0) {
1349                 olck->ols_hold = olck->ols_has_ref = 1;
1350                 olck->ols_state = OLS_GRANTED;
1351         } else {
1352                 struct cl_lock *lock;
1353
1354                 /*
1355                  * Lock is being cancelled somewhere within
1356                  * ldlm_handle_bl_callback(): LDLM_FL_CBPENDING is already
1357                  * set, but osc_ldlm_blocking_ast() hasn't yet acquired
1358                  * cl_lock mutex.
1359                  */
1360                 lock = slice->cls_lock;
1361                 LASSERT(lock->cll_state == CLS_CACHED);
1362                 LASSERT(lock->cll_users > 0);
1363                 LASSERT(olck->ols_lock->l_flags & LDLM_FL_CBPENDING);
1364                 /* set a flag for osc_dlm_blocking_ast0() to signal the
1365                  * lock.*/
1366                 olck->ols_ast_wait = 1;
1367                 rc = CLO_WAIT;
1368         }
1369         return rc;
1370 }
1371
1372 static int osc_lock_flush(struct osc_lock *ols, int discard)
1373 {
1374         struct cl_lock       *lock  = ols->ols_cl.cls_lock;
1375         struct cl_env_nest    nest;
1376         struct lu_env        *env;
1377         int result = 0;
1378
1379         env = cl_env_nested_get(&nest);
1380         if (!IS_ERR(env)) {
1381                 result = cl_lock_page_out(env, lock, discard);
1382                 cl_env_nested_put(&nest, env);
1383         } else
1384                 result = PTR_ERR(env);
1385         if (result == 0)
1386                 ols->ols_flush = 1;
1387         return result;
1388 }
1389
1390 /**
1391  * Implements cl_lock_operations::clo_cancel() method for osc layer. This is
1392  * called (as part of cl_lock_cancel()) when lock is canceled either voluntary
1393  * (LRU pressure, early cancellation, umount, etc.) or due to the conflict
1394  * with some other lock some where in the cluster. This function does the
1395  * following:
1396  *
1397  *     - invalidates all pages protected by this lock (after sending dirty
1398  *       ones to the server, as necessary);
1399  *
1400  *     - decref's underlying ldlm lock;
1401  *
1402  *     - cancels ldlm lock (ldlm_cli_cancel()).
1403  */
1404 static void osc_lock_cancel(const struct lu_env *env,
1405                             const struct cl_lock_slice *slice)
1406 {
1407         struct cl_lock   *lock    = slice->cls_lock;
1408         struct osc_lock  *olck    = cl2osc_lock(slice);
1409         struct ldlm_lock *dlmlock = olck->ols_lock;
1410         int               result  = 0;
1411         int               discard;
1412
1413         LASSERT(cl_lock_is_mutexed(lock));
1414         LINVRNT(osc_lock_invariant(olck));
1415
1416         if (dlmlock != NULL) {
1417                 int do_cancel;
1418
1419                 discard = dlmlock->l_flags & LDLM_FL_DISCARD_DATA;
1420                 result = osc_lock_flush(olck, discard);
1421                 if (olck->ols_hold)
1422                         osc_lock_unuse(env, slice);
1423
1424                 lock_res_and_lock(dlmlock);
1425                 /* Now that we're the only user of dlm read/write reference,
1426                  * mostly the ->l_readers + ->l_writers should be zero.
1427                  * However, there is a corner case.
1428                  * See bug 18829 for details.*/
1429                 do_cancel = (dlmlock->l_readers == 0 &&
1430                              dlmlock->l_writers == 0);
1431                 dlmlock->l_flags |= LDLM_FL_CBPENDING;
1432                 unlock_res_and_lock(dlmlock);
1433                 if (do_cancel)
1434                         result = ldlm_cli_cancel(&olck->ols_handle);
1435                 if (result < 0)
1436                         CL_LOCK_DEBUG(D_ERROR, env, lock,
1437                                       "lock %p cancel failure with error(%d)\n",
1438                                       lock, result);
1439         }
1440         olck->ols_state = OLS_CANCELLED;
1441         osc_lock_detach(env, olck);
1442 }
1443
1444 void cl_lock_page_list_fixup(const struct lu_env *env,
1445                              struct cl_io *io, struct cl_lock *lock,
1446                              struct cl_page_list *queue);
1447
1448 #ifdef INVARIANT_CHECK
1449 /**
1450  * Returns true iff there are pages under \a olck not protected by other
1451  * locks.
1452  */
1453 static int osc_lock_has_pages(struct osc_lock *olck)
1454 {
1455         struct cl_lock       *lock;
1456         struct cl_lock_descr *descr;
1457         struct cl_object     *obj;
1458         struct osc_object    *oob;
1459         struct cl_page_list  *plist;
1460         struct cl_page       *page;
1461         struct cl_env_nest    nest;
1462         struct cl_io         *io;
1463         struct lu_env        *env;
1464         int                   result;
1465
1466         env = cl_env_nested_get(&nest);
1467         if (!IS_ERR(env)) {
1468                 obj   = olck->ols_cl.cls_obj;
1469                 oob   = cl2osc(obj);
1470                 io    = &oob->oo_debug_io;
1471                 lock  = olck->ols_cl.cls_lock;
1472                 descr = &lock->cll_descr;
1473                 plist = &osc_env_info(env)->oti_plist;
1474                 cl_page_list_init(plist);
1475
1476                 mutex_lock(&oob->oo_debug_mutex);
1477
1478                 io->ci_obj = cl_object_top(obj);
1479                 cl_io_init(env, io, CIT_MISC, io->ci_obj);
1480                 cl_page_gang_lookup(env, obj, io,
1481                                     descr->cld_start, descr->cld_end, plist);
1482                 cl_lock_page_list_fixup(env, io, lock, plist);
1483                 if (plist->pl_nr > 0) {
1484                         CL_LOCK_DEBUG(D_ERROR, env, lock, "still has pages\n");
1485                         cl_page_list_for_each(page, plist)
1486                                 CL_PAGE_DEBUG(D_ERROR, env, page, "\n");
1487                 }
1488                 result = plist->pl_nr > 0;
1489                 cl_page_list_disown(env, io, plist);
1490                 cl_page_list_fini(env, plist);
1491                 cl_io_fini(env, io);
1492                 mutex_unlock(&oob->oo_debug_mutex);
1493                 cl_env_nested_put(&nest, env);
1494         } else
1495                 result = 0;
1496         return result;
1497 }
1498 #else
1499 # define osc_lock_has_pages(olck) (0)
1500 #endif /* INVARIANT_CHECK */
1501
1502 static void osc_lock_delete(const struct lu_env *env,
1503                             const struct cl_lock_slice *slice)
1504 {
1505         struct osc_lock *olck;
1506
1507         olck = cl2osc_lock(slice);
1508         LINVRNT(osc_lock_invariant(olck));
1509         LINVRNT(!osc_lock_has_pages(olck));
1510
1511         if (olck->ols_hold)
1512                 osc_lock_unuse(env, slice);
1513         osc_lock_detach(env, olck);
1514 }
1515
1516 /**
1517  * Implements cl_lock_operations::clo_state() method for osc layer.
1518  *
1519  * Maintains osc_lock::ols_owner field.
1520  *
1521  * This assumes that lock always enters CLS_HELD (from some other state) in
1522  * the same IO context as one that requested the lock. This should not be a
1523  * problem, because context is by definition shared by all activity pertaining
1524  * to the same high-level IO.
1525  */
1526 static void osc_lock_state(const struct lu_env *env,
1527                            const struct cl_lock_slice *slice,
1528                            enum cl_lock_state state)
1529 {
1530         struct osc_lock *lock = cl2osc_lock(slice);
1531         struct osc_io   *oio  = osc_env_io(env);
1532
1533         /*
1534          * XXX multiple io contexts can use the lock at the same time.
1535          */
1536         LINVRNT(osc_lock_invariant(lock));
1537         if (state == CLS_HELD && slice->cls_lock->cll_state != CLS_HELD) {
1538                 LASSERT(lock->ols_owner == NULL);
1539                 lock->ols_owner = oio;
1540         } else if (state != CLS_HELD)
1541                 lock->ols_owner = NULL;
1542 }
1543
1544 static int osc_lock_print(const struct lu_env *env, void *cookie,
1545                           lu_printer_t p, const struct cl_lock_slice *slice)
1546 {
1547         struct osc_lock *lock = cl2osc_lock(slice);
1548
1549         /*
1550          * XXX print ldlm lock and einfo properly.
1551          */
1552         (*p)(env, cookie, "%p %08x "LPU64" %d %p ",
1553              lock->ols_lock, lock->ols_flags, lock->ols_handle.cookie,
1554              lock->ols_state, lock->ols_owner);
1555         osc_lvb_print(env, cookie, p, &lock->ols_lvb);
1556         return 0;
1557 }
1558
1559 static const struct cl_lock_operations osc_lock_ops = {
1560         .clo_fini    = osc_lock_fini,
1561         .clo_enqueue = osc_lock_enqueue,
1562         .clo_wait    = osc_lock_wait,
1563         .clo_unuse   = osc_lock_unuse,
1564         .clo_use     = osc_lock_use,
1565         .clo_delete  = osc_lock_delete,
1566         .clo_state   = osc_lock_state,
1567         .clo_cancel  = osc_lock_cancel,
1568         .clo_weigh   = osc_lock_weigh,
1569         .clo_print   = osc_lock_print
1570 };
1571
1572 static int osc_lock_lockless_enqueue(const struct lu_env *env,
1573                                      const struct cl_lock_slice *slice,
1574                                      struct cl_io *_, __u32 enqflags)
1575 {
1576         LBUG();
1577         return 0;
1578 }
1579
1580 static int osc_lock_lockless_unuse(const struct lu_env *env,
1581                                    const struct cl_lock_slice *slice)
1582 {
1583         struct osc_lock *ols = cl2osc_lock(slice);
1584         struct cl_lock *lock = slice->cls_lock;
1585
1586         LASSERT(ols->ols_state == OLS_GRANTED);
1587         LINVRNT(osc_lock_invariant(ols));
1588
1589         cl_lock_cancel(env, lock);
1590         cl_lock_delete(env, lock);
1591         return 0;
1592 }
1593
1594 static void osc_lock_lockless_cancel(const struct lu_env *env,
1595                                      const struct cl_lock_slice *slice)
1596 {
1597         struct osc_lock   *ols  = cl2osc_lock(slice);
1598         int result;
1599
1600         result = osc_lock_flush(ols, 0);
1601         if (result)
1602                 CERROR("Pages for lockless lock %p were not purged(%d)\n",
1603                        ols, result);
1604         ols->ols_state = OLS_CANCELLED;
1605 }
1606
1607 static int osc_lock_lockless_wait(const struct lu_env *env,
1608                                   const struct cl_lock_slice *slice)
1609 {
1610         struct osc_lock *olck = cl2osc_lock(slice);
1611         struct cl_lock  *lock = olck->ols_cl.cls_lock;
1612
1613         LINVRNT(osc_lock_invariant(olck));
1614         LASSERT(olck->ols_state >= OLS_UPCALL_RECEIVED);
1615
1616         return lock->cll_error;
1617 }
1618
1619 static void osc_lock_lockless_state(const struct lu_env *env,
1620                                     const struct cl_lock_slice *slice,
1621                                     enum cl_lock_state state)
1622 {
1623         struct osc_lock *lock = cl2osc_lock(slice);
1624         struct osc_io   *oio  = osc_env_io(env);
1625
1626         LINVRNT(osc_lock_invariant(lock));
1627         if (state == CLS_HELD) {
1628                 LASSERT(lock->ols_owner == NULL);
1629                 lock->ols_owner = oio;
1630
1631                 /* set the io to be lockless if this lock is for io's
1632                  * host object */
1633                 if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
1634                         oio->oi_lockless = 1;
1635         } else
1636                 lock->ols_owner = NULL;
1637 }
1638
1639 static int osc_lock_lockless_fits_into(const struct lu_env *env,
1640                                        const struct cl_lock_slice *slice,
1641                                        const struct cl_lock_descr *need,
1642                                        const struct cl_io *io)
1643 {
1644         return 0;
1645 }
1646
1647 static const struct cl_lock_operations osc_lock_lockless_ops = {
1648         .clo_fini      = osc_lock_fini,
1649         .clo_enqueue   = osc_lock_lockless_enqueue,
1650         .clo_wait      = osc_lock_lockless_wait,
1651         .clo_unuse     = osc_lock_lockless_unuse,
1652         .clo_state     = osc_lock_lockless_state,
1653         .clo_fits_into = osc_lock_lockless_fits_into,
1654         .clo_cancel    = osc_lock_lockless_cancel,
1655         .clo_print     = osc_lock_print
1656 };
1657
1658 int osc_lock_init(const struct lu_env *env,
1659                   struct cl_object *obj, struct cl_lock *lock,
1660                   const struct cl_io *_)
1661 {
1662         struct osc_lock *clk;
1663         int result;
1664
1665         OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO);
1666         if (clk != NULL) {
1667                 osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
1668                 clk->ols_state = OLS_NEW;
1669                 cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
1670                 result = 0;
1671         } else
1672                 result = -ENOMEM;
1673         return result;
1674 }
1675
1676
1677 /** @} osc */