Whamcloud - gitweb
162033d145a5b6376ae11ded186ba7c1d88a94b6
[fs/lustre-release.git] / lustre / lov / lovsub_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_lock for LOVSUB layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_LOV
42
43 #include "lov_cl_internal.h"
44
45 /** \addtogroup lov lov @{ */
46
47 /*****************************************************************************
48  *
49  * Lovsub lock operations.
50  *
51  */
52
53 static void lovsub_lock_fini(const struct lu_env *env,
54                              struct cl_lock_slice *slice)
55 {
56         struct lovsub_lock   *lsl;
57
58         ENTRY;
59         lsl = cl2lovsub_lock(slice);
60         LASSERT(list_empty(&lsl->lss_parents));
61         OBD_SLAB_FREE_PTR(lsl, lovsub_lock_kmem);
62         EXIT;
63 }
64
65 static void lovsub_parent_lock(const struct lu_env *env, struct lov_lock *lov)
66 {
67         struct cl_lock *parent;
68
69         ENTRY;
70         parent = lov->lls_cl.cls_lock;
71         cl_lock_get(parent);
72         lu_ref_add(&parent->cll_reference, "lovsub-parent", cfs_current());
73         cl_lock_mutex_get(env, parent);
74         EXIT;
75 }
76
77 static void lovsub_parent_unlock(const struct lu_env *env, struct lov_lock *lov)
78 {
79         struct cl_lock *parent;
80
81         ENTRY;
82         parent = lov->lls_cl.cls_lock;
83         cl_lock_mutex_put(env, lov->lls_cl.cls_lock);
84         lu_ref_del(&parent->cll_reference, "lovsub-parent", cfs_current());
85         cl_lock_put(env, parent);
86         EXIT;
87 }
88
89 static int lovsub_lock_state_one(const struct lu_env *env,
90                                  const struct lovsub_lock *lovsub,
91                                  struct lov_lock *lov)
92 {
93         struct cl_lock *parent;
94         struct cl_lock *child;
95         int             restart = 0;
96
97         ENTRY;
98         parent = lov->lls_cl.cls_lock;
99         child  = lovsub->lss_cl.cls_lock;
100
101         if (lovsub->lss_active != parent) {
102                 lovsub_parent_lock(env, lov);
103                 if (child->cll_error != 0 && parent->cll_error == 0) {
104                         /*
105                          * This is a deadlock case:
106                          * cl_lock_error(for the parent lock)
107                          *   -> cl_lock_delete
108                          *     -> lov_lock_delete
109                          *       -> cl_lock_enclosure
110                          *         -> cl_lock_mutex_try(for the child lock)
111                          */
112                         cl_lock_mutex_put(env, child);
113                         cl_lock_error(env, parent, child->cll_error);
114                         restart = 1;
115                 } else {
116                         cl_lock_signal(env, parent);
117                 }
118                 lovsub_parent_unlock(env, lov);
119         }
120         RETURN(restart);
121 }
122
123 /**
124  * Implements cl_lock_operations::clo_state() method for lovsub layer, which
125  * method is called whenever sub-lock state changes. Propagates state change
126  * to the top-locks.
127  */
128 static void lovsub_lock_state(const struct lu_env *env,
129                               const struct cl_lock_slice *slice,
130                               enum cl_lock_state state)
131 {
132         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
133         struct lov_lock_link *scan;
134         int                   restart = 0;
135
136         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
137         ENTRY;
138
139         do {
140                 restart = 0;
141                 list_for_each_entry(scan, &sub->lss_parents, lll_list) {
142                         restart = lovsub_lock_state_one(env, sub,
143                                                         scan->lll_super);
144                         if (restart) {
145                                 cl_lock_mutex_get(env, slice->cls_lock);
146                                 break;
147                         }
148                 }
149         } while(restart);
150         EXIT;
151 }
152
153 /**
154  * Implementation of cl_lock_operation::clo_weigh() estimating lock weight by
155  * asking parent lock.
156  */
157 static unsigned long lovsub_lock_weigh(const struct lu_env *env,
158                                        const struct cl_lock_slice *slice)
159 {
160         struct lovsub_lock *lock = cl2lovsub_lock(slice);
161         struct lov_lock    *lov;
162         unsigned long       dumbbell;
163
164         ENTRY;
165
166         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
167
168         if (!list_empty(&lock->lss_parents)) {
169                 /*
170                  * It is not clear whether all parents have to be asked and
171                  * their estimations summed, or it is enough to ask one. For
172                  * the current usages, one is always enough.
173                  */
174                 lov = container_of(lock->lss_parents.next,
175                                    struct lov_lock_link, lll_list)->lll_super;
176
177                 lovsub_parent_lock(env, lov);
178                 dumbbell = cl_lock_weigh(env, lov->lls_cl.cls_lock);
179                 lovsub_parent_unlock(env, lov);
180         } else
181                 dumbbell = 0;
182
183         RETURN(dumbbell);
184 }
185
186 /**
187  * Maps start/end offsets within a stripe, to offsets within a file.
188  */
189 static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
190                                   struct lov_object *obj,
191                                   int stripe, struct cl_lock_descr *out)
192 {
193         struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
194         pgoff_t size; /* stripe size in pages */
195         pgoff_t skip; /* how many pages in every stripe are occupied by
196                        * "other" stripes */
197         pgoff_t start;
198         pgoff_t end;
199
200         ENTRY;
201         start = in->cld_start;
202         end   = in->cld_end;
203
204         /*
205          * XXX join file support.
206          */
207         if (lsm->lsm_stripe_count > 1) {
208                 size = cl_index(lov2cl(obj), lsm->lsm_stripe_size);
209                 skip = (lsm->lsm_stripe_count - 1) * size;
210
211                 /* XXX overflow check here? */
212                 start += start/size * skip + stripe * size;
213
214                 if (end != CL_PAGE_EOF) {
215                         end += end/size * skip + stripe * size;
216                         /*
217                          * And check for overflow...
218                          */
219                         if (end < in->cld_end)
220                                 end = CL_PAGE_EOF;
221                 }
222         }
223         out->cld_start = start;
224         out->cld_end   = end;
225         EXIT;
226 }
227
228 /**
229  * Adjusts parent lock extent when a sub-lock is attached to a parent. This is
230  * called in two ways:
231  *
232  *     - as part of receive call-back, when server returns granted extent to
233  *       the client, and
234  *
235  *     - when top-lock finds existing sub-lock in the cache.
236  *
237  * Note, that lock mode is not propagated to the parent: i.e., if CLM_READ
238  * top-lock matches CLM_WRITE sub-lock, top-lock is still CLM_READ.
239  */
240 int lov_sublock_modify(const struct lu_env *env, struct lov_lock *lov,
241                        struct lovsub_lock *sublock,
242                        const struct cl_lock_descr *d, int idx)
243 {
244         struct cl_lock       *parent;
245         struct cl_lock       *child;
246         struct lovsub_object *subobj;
247         struct cl_lock_descr *pd;
248         struct cl_lock_descr *parent_descr;
249         int                   result;
250
251         parent       = lov->lls_cl.cls_lock;
252         parent_descr = &parent->cll_descr;
253         LASSERT(cl_lock_mode_match(d->cld_mode, parent_descr->cld_mode));
254
255         child  = sublock->lss_cl.cls_lock;
256         subobj = cl2lovsub(sublock->lss_cl.cls_obj);
257         pd     = &lov_env_info(env)->lti_ldescr;
258
259         pd->cld_obj  = parent_descr->cld_obj;
260         pd->cld_mode = parent_descr->cld_mode;
261         lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
262         lov->lls_sub[idx].sub_got = *d;
263         /*
264          * Notify top-lock about modification, if lock description changes
265          * materially.
266          */
267         if (!cl_lock_ext_match(parent_descr, pd))
268                 result = cl_lock_modify(env, parent, pd);
269         else
270                 result = 0;
271         return result;
272 }
273
274 static int lovsub_lock_modify(const struct lu_env *env,
275                               const struct cl_lock_slice *s,
276                               const struct cl_lock_descr *d)
277 {
278         struct lovsub_lock   *lock   = cl2lovsub_lock(s);
279         struct lov_lock_link *scan;
280         struct lov_lock      *lov;
281         int result                   = 0;
282
283         ENTRY;
284
285         LASSERT(cl_lock_mode_match(d->cld_mode,
286                                    s->cls_lock->cll_descr.cld_mode));
287         list_for_each_entry(scan, &lock->lss_parents, lll_list) {
288                 int rc;
289
290                 lov = scan->lll_super;
291                 lovsub_parent_lock(env, lov);
292                 rc = lov_sublock_modify(env, lov, lock, d, scan->lll_idx);
293                 lovsub_parent_unlock(env, lov);
294                 result = result ?: rc;
295         }
296         RETURN(result);
297 }
298
299 static int lovsub_lock_closure(const struct lu_env *env,
300                                const struct cl_lock_slice *slice,
301                                struct cl_lock_closure *closure)
302 {
303         struct lovsub_lock   *sub;
304         struct cl_lock       *parent;
305         struct lov_lock_link *scan;
306         int                   result;
307
308         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
309         ENTRY;
310
311         sub    = cl2lovsub_lock(slice);
312         result = 0;
313
314         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
315                 parent = scan->lll_super->lls_cl.cls_lock;
316                 result = cl_lock_closure_build(env, parent, closure);
317                 if (result != 0)
318                         break;
319         }
320         RETURN(result);
321 }
322
323 /**
324  * A helper function for lovsub_lock_delete() that deals with a given parent
325  * top-lock.
326  */
327 static int lovsub_lock_delete_one(const struct lu_env *env,
328                                   struct cl_lock *child, struct lov_lock *lov)
329 {
330         struct cl_lock       *parent;
331         int             result;
332         ENTRY;
333
334         parent  = lov->lls_cl.cls_lock;
335         result = 0;
336
337         switch (parent->cll_state) {
338         case CLS_NEW:
339         case CLS_QUEUING:
340         case CLS_ENQUEUED:
341         case CLS_FREEING:
342                 cl_lock_signal(env, parent);
343                 break;
344         case CLS_UNLOCKING:
345                 /*
346                  * Here lies a problem: a sub-lock is canceled while top-lock
347                  * is being unlocked. Top-lock cannot be moved into CLS_NEW
348                  * state, because unlocking has to succeed eventually by
349                  * placing lock into CLS_CACHED (or failing it), see
350                  * cl_unuse_try(). Nor can top-lock be left in CLS_CACHED
351                  * state, because lov maintains an invariant that all
352                  * sub-locks exist in CLS_CACHED (this allows cached top-lock
353                  * to be reused immediately). Nor can we wait for top-lock
354                  * state to change, because this can be synchronous to the
355                  * current thread.
356                          *
357                  * We know for sure that lov_lock_unuse() will be called at
358                  * least one more time to finish un-using, so leave a mark on
359                  * the top-lock, that will be seen by the next call to
360                  * lov_lock_unuse().
361                  */
362                 lov->lls_unuse_race = 1;
363                 break;
364         case CLS_CACHED:
365                 /*
366                  * if a sub-lock is canceled move its top-lock into CLS_NEW
367                  * state to preserve an invariant that a top-lock in
368                  * CLS_CACHED is immediately ready for re-use (i.e., has all
369                  * sub-locks), and so that next attempt to re-use the top-lock
370                  * enqueues missing sub-lock.
371                  */
372                 cl_lock_state_set(env, parent, CLS_NEW);
373                 /*
374                  * if last sub-lock is canceled, destroy the top-lock (which
375                  * is now `empty') proactively.
376                  */
377                 if (lov->lls_nr_filled == 0) {
378                         /* ... but unfortunately, this cannot be done easily,
379                          * as cancellation of a top-lock might acquire mutices
380                          * of its other sub-locks, violating lock ordering,
381                          * see cl_lock_{cancel,delete}() preconditions.
382                          *
383                          * To work around this, the mutex of this sub-lock is
384                          * released, top-lock is destroyed, and sub-lock mutex
385                          * acquired again. The list of parents has to be
386                          * re-scanned from the beginning after this.
387                          *
388                          * Only do this if no mutices other than on @child and
389                          * @parent are held by the current thread.
390                          *
391                          * TODO: The lock modal here is too complex, because
392                          * the lock may be canceled and deleted by voluntarily:
393                          *    cl_lock_request
394                          *      -> osc_lock_enqueue_wait
395                          *        -> osc_lock_cancel_wait
396                          *          -> cl_lock_delete
397                          *            -> lovsub_lock_delete
398                          *              -> cl_lock_cancel/delete
399                          *                -> ...
400                          *
401                          * The better choice is to spawn a kernel thread for
402                          * this purpose. -jay
403                          */
404                         if (cl_lock_nr_mutexed(env) == 2) {
405                                 cl_lock_mutex_put(env, child);
406                                 cl_lock_cancel(env, parent);
407                                 cl_lock_delete(env, parent);
408                                 result = 1;
409                         }
410                 }
411                 break;
412         case CLS_HELD:
413         default:
414                 CERROR("Impossible state: %i\n", parent->cll_state);
415                 LBUG();
416         }
417
418         RETURN(result);
419 }
420
421 /**
422  * An implementation of cl_lock_operations::clo_delete() method. This is
423  * invoked in "bottom-to-top" delete, when lock destruction starts from the
424  * sub-lock (e.g, as a result of ldlm lock LRU policy).
425  */
426 static void lovsub_lock_delete(const struct lu_env *env,
427                                const struct cl_lock_slice *slice)
428 {
429         struct cl_lock     *child = slice->cls_lock;
430         struct lovsub_lock *sub   = cl2lovsub_lock(slice);
431         int restart;
432
433         LASSERT(cl_lock_is_mutexed(child));
434
435         ENTRY;
436         /*
437          * Destruction of a sub-lock might take multiple iterations, because
438          * when the last sub-lock of a given top-lock is deleted, top-lock is
439          * canceled proactively, and this requires to release sub-lock
440          * mutex. Once sub-lock mutex has been released, list of its parents
441          * has to be re-scanned from the beginning.
442          */
443         do {
444                 struct lov_lock      *lov;
445                 struct lov_lock_link *scan;
446                 struct lov_lock_link *temp;
447                 struct lov_lock_sub  *subdata;
448
449                 restart = 0;
450                 list_for_each_entry_safe(scan, temp,
451                                          &sub->lss_parents, lll_list) {
452                         lov     = scan->lll_super;
453                         subdata = &lov->lls_sub[scan->lll_idx];
454                         lovsub_parent_lock(env, lov);
455                         subdata->sub_got = subdata->sub_descr;
456                         lov_lock_unlink(env, scan, sub);
457                         restart = lovsub_lock_delete_one(env, child, lov);
458                         lovsub_parent_unlock(env, lov);
459
460                         if (restart) {
461                                 cl_lock_mutex_get(env, child);
462                                 break;
463                         }
464                }
465         } while (restart);
466         EXIT;
467 }
468
469 static int lovsub_lock_print(const struct lu_env *env, void *cookie,
470                              lu_printer_t p, const struct cl_lock_slice *slice)
471 {
472         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
473         struct lov_lock      *lov;
474         struct lov_lock_link *scan;
475
476         list_for_each_entry(scan, &sub->lss_parents, lll_list) {
477                 lov = scan->lll_super;
478                 (*p)(env, cookie, "[%d %p ", scan->lll_idx, lov);
479                 if (lov != NULL)
480                         cl_lock_descr_print(env, cookie, p,
481                                             &lov->lls_cl.cls_lock->cll_descr);
482                 (*p)(env, cookie, "] ");
483         }
484         return 0;
485 }
486
487 static const struct cl_lock_operations lovsub_lock_ops = {
488         .clo_fini    = lovsub_lock_fini,
489         .clo_state   = lovsub_lock_state,
490         .clo_delete  = lovsub_lock_delete,
491         .clo_modify  = lovsub_lock_modify,
492         .clo_closure = lovsub_lock_closure,
493         .clo_weigh   = lovsub_lock_weigh,
494         .clo_print   = lovsub_lock_print
495 };
496
497 int lovsub_lock_init(const struct lu_env *env, struct cl_object *obj,
498                      struct cl_lock *lock, const struct cl_io *io)
499 {
500         struct lovsub_lock *lsk;
501         int result;
502
503         ENTRY;
504         OBD_SLAB_ALLOC_PTR_GFP(lsk, lovsub_lock_kmem, CFS_ALLOC_IO);
505         if (lsk != NULL) {
506                 CFS_INIT_LIST_HEAD(&lsk->lss_parents);
507                 cl_lock_slice_add(lock, &lsk->lss_cl, obj, &lovsub_lock_ops);
508                 result = 0;
509         } else
510                 result = -ENOMEM;
511         RETURN(result);
512 }
513
514 /** @} lov */