Whamcloud - gitweb
14ecd6850290ccf59e608bf74176cf1437f096fb
[fs/lustre-release.git] / lustre / lov / lov_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_lock for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_LOV
42
43 #include "lov_cl_internal.h"
44
45 /** \addtogroup lov lov @{ */
46
47 static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
48                                                struct cl_lock *parent);
49
50 /*****************************************************************************
51  *
52  * Lov lock operations.
53  *
54  */
55
56 static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck,
57                               struct cl_lock *sublock, int idx,
58                               struct lov_lock_link *link)
59 {
60         struct lovsub_lock *lsl;
61         struct cl_lock     *parent = lck->lls_cl.cls_lock;
62         int                 rc;
63
64         LASSERT(cl_lock_is_mutexed(parent));
65         LASSERT(cl_lock_is_mutexed(sublock));
66         ENTRY;
67
68         lsl = cl2sub_lock(sublock);
69         /*
70          * check that sub-lock doesn't have lock link to this top-lock.
71          */
72         LASSERT(lov_lock_link_find(env, lck, lsl) == NULL);
73         LASSERT(idx < lck->lls_nr);
74
75         lck->lls_sub[idx].sub_lock = lsl;
76         lck->lls_nr_filled++;
77         LASSERT(lck->lls_nr_filled <= lck->lls_nr);
78         list_add_tail(&link->lll_list, &lsl->lss_parents);
79         link->lll_idx = idx;
80         link->lll_super = lck;
81         cl_lock_get(parent);
82         lu_ref_add(&parent->cll_reference, "lov-child", sublock);
83         lck->lls_sub[idx].sub_flags |= LSF_HELD;
84         cl_lock_user_add(env, sublock);
85
86         rc = lov_sublock_modify(env, lck, lsl, &sublock->cll_descr, idx);
87         LASSERT(rc == 0); /* there is no way this can fail, currently */
88         EXIT;
89 }
90
91 static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
92                                          const struct cl_io *io,
93                                          struct lov_lock *lck,
94                                          int idx, struct lov_lock_link **out)
95 {
96         struct cl_lock       *sublock;
97         struct cl_lock       *parent;
98         struct lov_lock_link *link;
99
100         LASSERT(idx < lck->lls_nr);
101         ENTRY;
102
103         OBD_SLAB_ALLOC_PTR(link, lov_lock_link_kmem);
104         if (link != NULL) {
105                 struct lov_lock_sub  *sub;
106                 struct cl_lock_descr *descr;
107
108                 parent = lck->lls_cl.cls_lock;
109                 sub    = &lck->lls_sub[idx];
110                 descr  = &sub->sub_descr;
111
112                 /* XXX maybe sub-io? */
113                 sublock = cl_lock_hold(env, io, descr, "lov-parent", parent);
114                 if (!IS_ERR(sublock))
115                         *out = link;
116                 else
117                         OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
118         } else
119                 sublock = ERR_PTR(-ENOMEM);
120         RETURN(sublock);
121 }
122
123 static void lov_sublock_unlock(const struct lu_env *env,
124                                struct lovsub_lock *lsl,
125                                struct cl_lock_closure *closure)
126 {
127         ENTRY;
128         lsl->lss_active = NULL;
129         cl_lock_disclosure(env, closure);
130         EXIT;
131 }
132
133 static int lov_sublock_lock(const struct lu_env *env, struct lovsub_lock *lsl,
134                             struct cl_lock_closure *closure)
135 {
136         struct cl_lock *child;
137         int             result;
138
139         LASSERT(list_empty(&closure->clc_list));
140
141         ENTRY;
142         child = lsl->lss_cl.cls_lock;
143         result = cl_lock_closure_build(env, child, closure);
144         if (result == 0) {
145                 LASSERT(cl_lock_is_mutexed(child));
146                 lsl->lss_active = closure->clc_origin;
147         }
148         RETURN(result);
149 }
150
151 /**
152  * Updates the result of a top-lock operation from a result of sub-lock
153  * sub-operations. Top-operations like lov_lock_{enqueue,use,unuse}() iterate
154  * over sub-locks and lov_subresult() is used to calculate return value of a
155  * top-operation. To this end, possible return values of sub-operations are
156  * ordered as
157  *
158  *     - 0                  success
159  *     - CLO_WAIT           wait for event
160  *     - CLO_REPEAT         repeat top-operation
161  *     - -ne                fundamental error
162  *
163  * Top-level return code can only go down through this list. CLO_REPEAT
164  * overwrites CLO_WAIT, because lock mutex was released and sleeping condition
165  * has to be rechecked by the upper layer.
166  */
167 static int lov_subresult(int result, int rc)
168 {
169         int result_rank;
170         int rc_rank;
171
172         LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT);
173         LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT);
174         CLASSERT(CLO_WAIT < CLO_REPEAT);
175
176         ENTRY;
177
178         /* calculate ranks in the ordering above */
179         result_rank = result < 0 ? 1 + CLO_REPEAT : result;
180         rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc;
181
182         if (result_rank < rc_rank)
183                 result = rc;
184         RETURN(result);
185 }
186
187 /**
188  * Creates sub-locks for a given lov_lock for the first time.
189  *
190  * Goes through all sub-objects of top-object, and creates sub-locks on every
191  * sub-object intersecting with top-lock extent. This is complicated by the
192  * fact that top-lock (that is being created) can be accessed concurrently
193  * through already created sub-locks (possibly shared with other top-locks).
194  */
195 static int lov_lock_sub_init(const struct lu_env *env,
196                              struct lov_lock *lck, const struct cl_io *io)
197 {
198         int result = 0;
199         int i;
200         int j;
201         int nr;
202         int stripe;
203         int start_stripe;
204         obd_off start;
205         obd_off end;
206         obd_off file_start;
207         obd_off file_end;
208
209         struct lov_object       *loo    = cl2lov(lck->lls_cl.cls_obj);
210         struct lov_layout_raid0 *r0     = lov_r0(loo);
211         struct cl_lock          *parent = lck->lls_cl.cls_lock;
212
213         ENTRY;
214
215         lck->lls_orig = parent->cll_descr;
216         file_start = cl_offset(lov2cl(loo), parent->cll_descr.cld_start);
217         file_end   = cl_offset(lov2cl(loo), parent->cll_descr.cld_end + 1) - 1;
218
219         start_stripe = lov_stripe_number(r0->lo_lsm, file_start);
220         for (i = 0, nr = 0; i < r0->lo_nr; i++) {
221                 /*
222                  * XXX for wide striping smarter algorithm is desirable,
223                  * breaking out of the loop, early.
224                  */
225                 stripe = (start_stripe + i) % r0->lo_nr;
226                 if (lov_stripe_intersects(r0->lo_lsm, stripe,
227                                           file_start, file_end, &start, &end))
228                         nr++;
229         }
230         LASSERT(nr > 0);
231         OBD_ALLOC(lck->lls_sub, nr * sizeof lck->lls_sub[0]);
232         if (lck->lls_sub == NULL)
233                 RETURN(-ENOMEM);
234
235         lck->lls_nr = nr;
236         /*
237          * First, fill in sub-lock descriptions in
238          * lck->lls_sub[].sub_descr. They are used by lov_sublock_alloc()
239          * (called below in this function, and by lov_lock_enqueue()) to
240          * create sub-locks. At this moment, no other thread can access
241          * top-lock.
242          */
243         for (j = 0, nr = 0; j < i; ++j) {
244                 stripe = (start_stripe + j) % r0->lo_nr;
245                 if (lov_stripe_intersects(r0->lo_lsm, stripe,
246                                           file_start, file_end, &start, &end)) {
247                         struct cl_lock_descr *descr;
248
249                         descr = &lck->lls_sub[nr].sub_descr;
250
251                         LASSERT(descr->cld_obj == NULL);
252                         descr->cld_obj   = lovsub2cl(r0->lo_sub[stripe]);
253                         descr->cld_start = cl_index(descr->cld_obj, start);
254                         descr->cld_end   = cl_index(descr->cld_obj, end);
255                         descr->cld_mode  = parent->cll_descr.cld_mode;
256                         lck->lls_sub[nr].sub_got = *descr;
257                         lck->lls_sub[nr].sub_stripe = stripe;
258                         nr++;
259                 }
260         }
261         LASSERT(nr == lck->lls_nr);
262         /*
263          * Then, create sub-locks. Once at least one sub-lock was created,
264          * top-lock can be reached by other threads.
265          */
266         for (i = 0; i < lck->lls_nr; ++i) {
267                 struct cl_lock       *sublock;
268                 struct lov_lock_link *link;
269
270                 if (lck->lls_sub[i].sub_lock == NULL) {
271                         sublock = lov_sublock_alloc(env, io, lck, i, &link);
272                         if (IS_ERR(sublock)) {
273                                 result = PTR_ERR(sublock);
274                                 break;
275                         }
276                         cl_lock_mutex_get(env, sublock);
277                         cl_lock_mutex_get(env, parent);
278                         /*
279                          * recheck under mutex that sub-lock wasn't created
280                          * concurrently, and that top-lock is still alive.
281                          */
282                         if (lck->lls_sub[i].sub_lock == NULL &&
283                             parent->cll_state < CLS_FREEING) {
284                                 lov_sublock_adopt(env, lck, sublock, i, link);
285                                 cl_lock_mutex_put(env, parent);
286                         } else {
287                                 cl_lock_mutex_put(env, parent);
288                                 cl_lock_unhold(env, sublock,
289                                                "lov-parent", parent);
290                         }
291                         cl_lock_mutex_put(env, sublock);
292                 }
293         }
294         /*
295          * Some sub-locks can be missing at this point. This is not a problem,
296          * because enqueue will create them anyway. Main duty of this function
297          * is to fill in sub-lock descriptions in a race free manner.
298          */
299         RETURN(result);
300 }
301
302 static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
303                                int i, int deluser, int rc)
304 {
305         struct cl_lock *parent = lck->lls_cl.cls_lock;
306
307         LASSERT(cl_lock_is_mutexed(parent));
308         ENTRY;
309
310         if (lck->lls_sub[i].sub_flags & LSF_HELD) {
311                 struct cl_lock *sublock;
312                 int dying;
313
314                 LASSERT(lck->lls_sub[i].sub_lock != NULL);
315                 sublock = lck->lls_sub[i].sub_lock->lss_cl.cls_lock;
316                 LASSERT(cl_lock_is_mutexed(sublock));
317
318                 lck->lls_sub[i].sub_flags &= ~LSF_HELD;
319                 if (deluser)
320                         cl_lock_user_del(env, sublock);
321                 /*
322                  * If the last hold is released, and cancellation is pending
323                  * for a sub-lock, release parent mutex, to avoid keeping it
324                  * while sub-lock is being paged out.
325                  */
326                 dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
327                          (sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
328                         sublock->cll_holds == 1;
329                 if (dying)
330                         cl_lock_mutex_put(env, parent);
331                 cl_lock_unhold(env, sublock, "lov-parent", parent);
332                 if (dying) {
333                         cl_lock_mutex_get(env, parent);
334                         rc = lov_subresult(rc, CLO_REPEAT);
335                 }
336                 /*
337                  * From now on lck->lls_sub[i].sub_lock is a "weak" pointer,
338                  * not backed by a reference on a
339                  * sub-lock. lovsub_lock_delete() will clear
340                  * lck->lls_sub[i].sub_lock under semaphores, just before
341                  * sub-lock is destroyed.
342                  */
343         }
344         RETURN(rc);
345 }
346
347 static void lov_sublock_hold(const struct lu_env *env, struct lov_lock *lck,
348                              int i)
349 {
350         struct cl_lock *parent = lck->lls_cl.cls_lock;
351
352         LASSERT(cl_lock_is_mutexed(parent));
353         ENTRY;
354
355         if (!(lck->lls_sub[i].sub_flags & LSF_HELD)) {
356                 struct cl_lock *sublock;
357
358                 LASSERT(lck->lls_sub[i].sub_lock != NULL);
359                 sublock = lck->lls_sub[i].sub_lock->lss_cl.cls_lock;
360                 LASSERT(cl_lock_is_mutexed(sublock));
361                 LASSERT(sublock->cll_state != CLS_FREEING);
362
363                 lck->lls_sub[i].sub_flags |= LSF_HELD;
364
365                 cl_lock_get_trust(sublock);
366                 cl_lock_hold_add(env, sublock, "lov-parent", parent);
367                 cl_lock_user_add(env, sublock);
368                 cl_lock_put(env, sublock);
369         }
370         EXIT;
371 }
372
373 static void lov_lock_fini(const struct lu_env *env,
374                           struct cl_lock_slice *slice)
375 {
376         struct lov_lock *lck;
377         int i;
378
379         ENTRY;
380         lck = cl2lov_lock(slice);
381         LASSERT(lck->lls_nr_filled == 0);
382         if (lck->lls_sub != NULL) {
383                 for (i = 0; i < lck->lls_nr; ++i)
384                         /*
385                          * No sub-locks exists at this point, as sub-lock has
386                          * a reference on its parent.
387                          */
388                         LASSERT(lck->lls_sub[i].sub_lock == NULL);
389                 OBD_FREE(lck->lls_sub, lck->lls_nr * sizeof lck->lls_sub[0]);
390         }
391         OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
392         EXIT;
393 }
394
395 /**
396  * Tries to advance a state machine of a given sub-lock toward enqueuing of
397  * the top-lock.
398  *
399  * \retval 0 if state-transition can proceed
400  * \retval -ve otherwise.
401  */
402 static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck,
403                                 struct cl_lock *sublock,
404                                 struct cl_io *io, __u32 enqflags, int last)
405 {
406         int result;
407
408         ENTRY;
409         /* first, try to enqueue a sub-lock ... */
410         result = cl_enqueue_try(env, sublock, io, enqflags);
411         if (sublock->cll_state == CLS_ENQUEUED)
412                 /* if it is enqueued, try to `wait' on it---maybe it's already
413                  * granted */
414                 result = cl_wait_try(env, sublock);
415         /*
416          * If CEF_ASYNC flag is set, then all sub-locks can be enqueued in
417          * parallel, otherwise---enqueue has to wait until sub-lock is granted
418          * before proceeding to the next one.
419          */
420         if (result == CLO_WAIT && sublock->cll_state <= CLS_HELD &&
421             enqflags & CEF_ASYNC && !last)
422                 result = 0;
423         RETURN(result);
424 }
425
426 /**
427  * Helper function for lov_lock_enqueue() that creates missing sub-lock.
428  */
429 static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent,
430                             struct cl_io *io, struct lov_lock *lck, int idx)
431 {
432         struct lov_lock_link *link;
433         struct cl_lock       *sublock;
434         int                   result;
435
436         LASSERT(parent->cll_depth == 1);
437         cl_lock_mutex_put(env, parent);
438         sublock = lov_sublock_alloc(env, io, lck, idx, &link);
439         if (!IS_ERR(sublock))
440                 cl_lock_mutex_get(env, sublock);
441         cl_lock_mutex_get(env, parent);
442
443         if (!IS_ERR(sublock)) {
444                 if (parent->cll_state == CLS_QUEUING &&
445                     lck->lls_sub[idx].sub_lock == NULL)
446                         lov_sublock_adopt(env, lck, sublock, idx, link);
447                 else {
448                         /* other thread allocated sub-lock, or enqueue is no
449                          * longer going on */
450                         cl_lock_mutex_put(env, parent);
451                         cl_lock_unhold(env, sublock, "lov-parent", parent);
452                         cl_lock_mutex_get(env, parent);
453                 }
454                 cl_lock_mutex_put(env, sublock);
455                 result = CLO_REPEAT;
456         } else
457                 result = PTR_ERR(sublock);
458         return result;
459 }
460
461 /**
462  * Implementation of cl_lock_operations::clo_enqueue() for lov layer. This
463  * function is rather subtle, as it enqueues top-lock (i.e., advances top-lock
464  * state machine from CLS_QUEUING to CLS_ENQUEUED states) by juggling sub-lock
465  * state machines in the face of sub-locks sharing (by multiple top-locks),
466  * and concurrent sub-lock cancellations.
467  */
468 static int lov_lock_enqueue(const struct lu_env *env,
469                             const struct cl_lock_slice *slice,
470                             struct cl_io *io, __u32 enqflags)
471 {
472         struct cl_lock         *lock    = slice->cls_lock;
473         struct lov_lock        *lck     = cl2lov_lock(slice);
474         struct cl_lock_closure *closure = lov_closure_get(env, lock);
475         int i;
476         int result;
477         enum cl_lock_state minstate;
478
479         ENTRY;
480
481         for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
482                 int rc;
483                 struct lovsub_lock *sub;
484                 struct cl_lock *sublock;
485
486                 if (lock->cll_state != CLS_QUEUING) {
487                         /*
488                          * Lock might have left QUEUING state if previous
489                          * iteration released its mutex. Stop enqueing in this
490                          * case and let the upper layer to decide what to do.
491                          */
492                         LASSERT(i > 0 && result != 0);
493                         break;
494                 }
495
496                 sub = lck->lls_sub[i].sub_lock;
497                 /*
498                  * Sub-lock might have been canceled, while top-lock was
499                  * cached.
500                  */
501                 if (sub == NULL) {
502                         result = lov_sublock_fill(env, lock, io, lck, i);
503                         /* lov_sublock_fill() released @lock mutex,
504                          * restart. */
505                         break;
506                 }
507                 sublock = sub->lss_cl.cls_lock;
508                 rc = lov_sublock_lock(env, sub, closure);
509                 if (rc == 0) {
510                         lov_sublock_hold(env, lck, i);
511                         rc = lov_lock_enqueue_one(env, lck, sublock, io,
512                                                   enqflags,
513                                                   i == lck->lls_nr - 1);
514                         minstate = min(minstate, sublock->cll_state);
515                         /*
516                          * Don't hold a sub-lock in CLS_CACHED state, see
517                          * description for lov_lock::lls_sub.
518                          */
519                         if (sublock->cll_state > CLS_HELD)
520                                 rc = lov_sublock_release(env, lck, i, 1, rc);
521                         lov_sublock_unlock(env, sub, closure);
522                 }
523                 result = lov_subresult(result, rc);
524                 if (result < 0)
525                         break;
526         }
527         cl_lock_closure_fini(closure);
528         RETURN(result ?: minstate >= CLS_ENQUEUED ? 0 : CLO_WAIT);
529 }
530
531 static int lov_lock_unuse(const struct lu_env *env,
532                           const struct cl_lock_slice *slice)
533 {
534         struct lov_lock        *lck     = cl2lov_lock(slice);
535         struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
536         int i;
537         int result;
538
539         ENTRY;
540
541         for (result = 0, i = 0; i < lck->lls_nr; ++i) {
542                 int rc;
543                 struct lovsub_lock *sub;
544                 struct cl_lock *sublock;
545
546                 /* top-lock state cannot change concurrently, because single
547                  * thread (one that released the last hold) carries unlocking
548                  * to the completion. */
549                 LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING);
550                 sub = lck->lls_sub[i].sub_lock;
551                 if (sub == NULL)
552                         continue;
553
554                 sublock = sub->lss_cl.cls_lock;
555                 rc = lov_sublock_lock(env, sub, closure);
556                 if (rc == 0) {
557                         if (lck->lls_sub[i].sub_flags & LSF_HELD) {
558                                 LASSERT(sublock->cll_state == CLS_HELD);
559                                 rc = cl_unuse_try(env, sublock);
560                                 if (rc != CLO_WAIT)
561                                         rc = lov_sublock_release(env, lck,
562                                                                  i, 0, rc);
563                         }
564                         lov_sublock_unlock(env, sub, closure);
565                 }
566                 result = lov_subresult(result, rc);
567                 if (result < 0)
568                         break;
569         }
570         if (result == 0 && lck->lls_unuse_race) {
571                 lck->lls_unuse_race = 0;
572                 result = -ESTALE;
573         }
574         cl_lock_closure_fini(closure);
575         RETURN(result);
576 }
577
578 static int lov_lock_wait(const struct lu_env *env,
579                          const struct cl_lock_slice *slice)
580 {
581         struct lov_lock        *lck     = cl2lov_lock(slice);
582         struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
583         enum cl_lock_state      minstate;
584         int                     result;
585         int                     i;
586
587         ENTRY;
588
589         for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
590                 int rc;
591                 struct lovsub_lock *sub;
592                 struct cl_lock *sublock;
593
594                 sub = lck->lls_sub[i].sub_lock;
595                 LASSERT(sub != NULL);
596                 sublock = sub->lss_cl.cls_lock;
597                 rc = lov_sublock_lock(env, sub, closure);
598                 if (rc == 0) {
599                         LASSERT(sublock->cll_state >= CLS_ENQUEUED);
600                         if (sublock->cll_state < CLS_HELD)
601                                 rc = cl_wait_try(env, sublock);
602                         minstate = min(minstate, sublock->cll_state);
603                         lov_sublock_unlock(env, sub, closure);
604                 }
605                 result = lov_subresult(result, rc);
606                 if (result < 0)
607                         break;
608         }
609         cl_lock_closure_fini(closure);
610         RETURN(result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT);
611 }
612
613 static int lov_lock_use(const struct lu_env *env,
614                         const struct cl_lock_slice *slice)
615 {
616         struct lov_lock        *lck     = cl2lov_lock(slice);
617         struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
618         int                     result;
619         int                     i;
620
621         LASSERT(slice->cls_lock->cll_state == CLS_CACHED);
622         ENTRY;
623
624         for (result = 0, i = 0; i < lck->lls_nr; ++i) {
625                 int rc;
626                 struct lovsub_lock *sub;
627                 struct cl_lock *sublock;
628
629                 if (slice->cls_lock->cll_state != CLS_CACHED) {
630                         /* see comment in lov_lock_enqueue(). */
631                         LASSERT(i > 0 && result != 0);
632                         break;
633                 }
634                 /*
635                  * if a sub-lock was destroyed while top-lock was in
636                  * CLS_CACHED state, top-lock would have been moved into
637                  * CLS_NEW state, so all sub-locks have to be in place.
638                  */
639                 sub = lck->lls_sub[i].sub_lock;
640                 LASSERT(sub != NULL);
641                 sublock = sub->lss_cl.cls_lock;
642                 rc = lov_sublock_lock(env, sub, closure);
643                 if (rc == 0) {
644                         LASSERT(sublock->cll_state != CLS_FREEING);
645                         lov_sublock_hold(env, lck, i);
646                         if (sublock->cll_state == CLS_CACHED) {
647                                 rc = cl_use_try(env, sublock);
648                                 if (rc != 0)
649                                         rc = lov_sublock_release(env, lck,
650                                                                  i, 1, rc);
651                         } else
652                                 rc = 0;
653                         lov_sublock_unlock(env, sub, closure);
654                 }
655                 result = lov_subresult(result, rc);
656                 if (result < 0)
657                         break;
658         }
659         cl_lock_closure_fini(closure);
660         RETURN(result);
661 }
662
663 #if 0
664 static int lock_lock_multi_match()
665 {
666         struct cl_lock          *lock    = slice->cls_lock;
667         struct cl_lock_descr    *subneed = &lov_env_info(env)->lti_ldescr;
668         struct lov_object       *loo     = cl2lov(lov->lls_cl.cls_obj);
669         struct lov_layout_raid0 *r0      = lov_r0(loo);
670         struct lov_lock_sub     *sub;
671         struct cl_object        *subobj;
672         obd_off  fstart;
673         obd_off  fend;
674         obd_off  start;
675         obd_off  end;
676         int i;
677
678         fstart = cl_offset(need->cld_obj, need->cld_start);
679         fend   = cl_offset(need->cld_obj, need->cld_end + 1) - 1;
680         subneed->cld_mode = need->cld_mode;
681         cl_lock_mutex_get(env, lock);
682         for (i = 0; i < lov->lls_nr; ++i) {
683                 sub = &lov->lls_sub[i];
684                 if (sub->sub_lock == NULL)
685                         continue;
686                 subobj = sub->sub_descr.cld_obj;
687                 if (!lov_stripe_intersects(r0->lo_lsm, sub->sub_stripe,
688                                            fstart, fend, &start, &end))
689                         continue;
690                 subneed->cld_start = cl_index(subobj, start);
691                 subneed->cld_end   = cl_index(subobj, end);
692                 subneed->cld_obj   = subobj;
693                 if (!cl_lock_ext_match(&sub->sub_got, subneed)) {
694                         result = 0;
695                         break;
696                 }
697         }
698         cl_lock_mutex_put(env, lock);
699 }
700 #endif
701
702 static int lov_is_same_stripe(struct lov_object *lov, int stripe,
703                               const struct cl_lock_descr *descr)
704 {
705         struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
706         obd_off start;
707         obd_off end;
708
709         start = cl_offset(&lov->lo_cl, descr->cld_start);
710         end   = cl_offset(&lov->lo_cl, descr->cld_end + 1) - 1;
711         return
712                 end - start <= lsm->lsm_stripe_size &&
713                 stripe == lov_stripe_number(lsm, start) &&
714                 stripe == lov_stripe_number(lsm, end);
715 }
716
717 /**
718  * An implementation of cl_lock_operations::clo_fits_into() method.
719  *
720  * Checks whether a lock (given by \a slice) is suitable for \a
721  * io. Multi-stripe locks can be used only for "quick" io, like truncate, or
722  * O_APPEND write.
723  *
724  * \see ccc_lock_fits_into().
725  */
726 static int lov_lock_fits_into(const struct lu_env *env,
727                               const struct cl_lock_slice *slice,
728                               const struct cl_lock_descr *need,
729                               const struct cl_io *io)
730 {
731         struct lov_lock   *lov = cl2lov_lock(slice);
732         struct lov_object *obj = cl2lov(slice->cls_obj);
733         int result;
734
735         LASSERT(cl_object_same(need->cld_obj, slice->cls_obj));
736         LASSERT(lov->lls_nr > 0);
737
738         ENTRY;
739
740         if (lov->lls_nr == 1) {
741                 /*
742                  * If a lock is on a single stripe, it's enough to check that
743                  * @need lock matches actually granted stripe lock, and...
744                  */
745                 result = cl_lock_ext_match(&lov->lls_sub[0].sub_got, need);
746                 if (result && lov_r0(obj)->lo_nr > 1)
747                         /*
748                          * ... @need is on the same stripe, if multiple
749                          * stripes are possible at all for this object.
750                          */
751                         result = lov_is_same_stripe(cl2lov(slice->cls_obj),
752                                                     lov->lls_sub[0].sub_stripe,
753                                                     need);
754         } else if (io->ci_type != CIT_TRUNC && io->ci_type != CIT_MISC &&
755                    !cl_io_is_append(io) && need->cld_mode != CLM_PHANTOM)
756                 /*
757                  * Multi-stripe locks are only suitable for `quick' IO and for
758                  * glimpse.
759                  */
760                 result = 0;
761         else
762                 /*
763                  * Most general case: multi-stripe existing lock, and
764                  * (potentially) multi-stripe @need lock. Check that @need is
765                  * covered by @lov's sub-locks.
766                  *
767                  * For now, ignore lock expansions made by the server, and
768                  * match against original lock extent.
769                  */
770                 result = cl_lock_ext_match(&lov->lls_orig, need);
771         CDEBUG(D_DLMTRACE, DDESCR"/"DDESCR" %i %i/%i: %i\n",
772                PDESCR(&lov->lls_orig), PDESCR(&lov->lls_sub[0].sub_got),
773                lov->lls_sub[0].sub_stripe, lov->lls_nr, lov_r0(obj)->lo_nr,
774                result);
775         RETURN(result);
776 }
777
778 void lov_lock_unlink(const struct lu_env *env,
779                      struct lov_lock_link *link, struct lovsub_lock *sub)
780 {
781         struct lov_lock *lck    = link->lll_super;
782         struct cl_lock  *parent = lck->lls_cl.cls_lock;
783
784         LASSERT(cl_lock_is_mutexed(parent));
785         LASSERT(cl_lock_is_mutexed(sub->lss_cl.cls_lock));
786         ENTRY;
787
788         list_del_init(&link->lll_list);
789         LASSERT(lck->lls_sub[link->lll_idx].sub_lock == sub);
790         /* yank this sub-lock from parent's array */
791         lck->lls_sub[link->lll_idx].sub_lock = NULL;
792         LASSERT(lck->lls_nr_filled > 0);
793         lck->lls_nr_filled--;
794         lu_ref_del(&parent->cll_reference, "lov-child", sub->lss_cl.cls_lock);
795         cl_lock_put(env, parent);
796         OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
797         EXIT;
798 }
799
800 struct lov_lock_link *lov_lock_link_find(const struct lu_env *env,
801                                          struct lov_lock *lck,
802                                          struct lovsub_lock *sub)
803 {
804         struct lov_lock_link *scan;
805
806         LASSERT(cl_lock_is_mutexed(sub->lss_cl.cls_lock));
807         ENTRY;
808
809         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
810                 if (scan->lll_super == lck)
811                         RETURN(scan);
812         }
813         RETURN(NULL);
814 }
815
816 /**
817  * An implementation of cl_lock_operations::clo_delete() method. This is
818  * invoked for "top-to-bottom" delete, when lock destruction starts from the
819  * top-lock, e.g., as a result of inode destruction.
820  *
821  * Unlinks top-lock from all its sub-locks. Sub-locks are not deleted there:
822  * this is done separately elsewhere:
823  *
824  *     - for inode destruction, lov_object_delete() calls cl_object_kill() for
825  *       each sub-object, purging its locks;
826  *
827  *     - in other cases (e.g., a fatal error with a top-lock) sub-locks are
828  *       left in the cache.
829  */
830 static void lov_lock_delete(const struct lu_env *env,
831                             const struct cl_lock_slice *slice)
832 {
833         struct lov_lock        *lck     = cl2lov_lock(slice);
834         struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
835         int i;
836
837         LASSERT(slice->cls_lock->cll_state == CLS_FREEING);
838         ENTRY;
839
840         for (i = 0; i < lck->lls_nr; ++i) {
841                 struct lovsub_lock *lsl;
842                 struct cl_lock *sublock;
843                 int rc;
844
845                 lsl = lck->lls_sub[i].sub_lock;
846                 if (lsl == NULL)
847                         continue;
848
849                 sublock = lsl->lss_cl.cls_lock;
850                 rc = lov_sublock_lock(env, lsl, closure);
851                 if (rc == 0) {
852                         if (lck->lls_sub[i].sub_flags & LSF_HELD)
853                                 lov_sublock_release(env, lck, i, 1, 0);
854                         if (sublock->cll_state < CLS_FREEING) {
855                                 struct lov_lock_link *link;
856
857                                 link = lov_lock_link_find(env, lck, lsl);
858                                 LASSERT(link != NULL);
859                                 lov_lock_unlink(env, link, lsl);
860                                 LASSERT(lck->lls_sub[i].sub_lock == NULL);
861                         }
862                         lov_sublock_unlock(env, lsl, closure);
863                 } else if (rc == CLO_REPEAT) {
864                         --i; /* repeat with this lock */
865                 } else {
866                         CL_LOCK_DEBUG(D_ERROR, env, sublock,
867                                       "Cannot get sub-lock for delete: %i\n",
868                                       rc);
869                 }
870         }
871         cl_lock_closure_fini(closure);
872         EXIT;
873 }
874
875 static int lov_lock_print(const struct lu_env *env, void *cookie,
876                           lu_printer_t p, const struct cl_lock_slice *slice)
877 {
878         struct lov_lock *lck = cl2lov_lock(slice);
879         int              i;
880
881         (*p)(env, cookie, "%d\n", lck->lls_nr);
882         for (i = 0; i < lck->lls_nr; ++i) {
883                 struct lov_lock_sub *sub;
884
885                 sub = &lck->lls_sub[i];
886                 (*p)(env, cookie, "    %d %x: ", i, sub->sub_flags);
887                 if (sub->sub_lock != NULL)
888                         cl_lock_print(env, cookie, p,
889                                       sub->sub_lock->lss_cl.cls_lock);
890                 else
891                         (*p)(env, cookie, "---\n");
892         }
893         return 0;
894 }
895
896 static const struct cl_lock_operations lov_lock_ops = {
897         .clo_fini      = lov_lock_fini,
898         .clo_enqueue   = lov_lock_enqueue,
899         .clo_wait      = lov_lock_wait,
900         .clo_use       = lov_lock_use,
901         .clo_unuse     = lov_lock_unuse,
902         .clo_fits_into = lov_lock_fits_into,
903         .clo_delete    = lov_lock_delete,
904         .clo_print     = lov_lock_print
905 };
906
907 int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj,
908                         struct cl_lock *lock, const struct cl_io *io)
909 {
910         struct lov_lock *lck;
911         int result;
912
913         ENTRY;
914         OBD_SLAB_ALLOC_PTR(lck, lov_lock_kmem);
915         if (lck != NULL) {
916                 cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
917                 result = lov_lock_sub_init(env, lck, io);
918         } else
919                 result = -ENOMEM;
920         RETURN(result);
921 }
922
923 static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
924                                                struct cl_lock *parent)
925 {
926         struct cl_lock_closure *closure;
927
928         closure = &lov_env_info(env)->lti_closure;
929         LINVRNT(list_empty(&closure->clc_list));
930         cl_lock_closure_init(env, closure, parent, 1);
931         return closure;
932 }
933
934
935 /** @} lov */