Whamcloud - gitweb
LU-1146 build: batch update copyright messages
[fs/lustre-release.git] / lustre / lov / lovsub_lock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * Implementation of cl_lock for LOVSUB layer.
39  *
40  *   Author: Nikita Danilov <nikita.danilov@sun.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LOV
44
45 #include "lov_cl_internal.h"
46
47 /** \addtogroup lov
48  *  @{
49  */
50
51 /*****************************************************************************
52  *
53  * Lovsub lock operations.
54  *
55  */
56
57 static void lovsub_lock_fini(const struct lu_env *env,
58                              struct cl_lock_slice *slice)
59 {
60         struct lovsub_lock   *lsl;
61
62         ENTRY;
63         lsl = cl2lovsub_lock(slice);
64         LASSERT(cfs_list_empty(&lsl->lss_parents));
65         OBD_SLAB_FREE_PTR(lsl, lovsub_lock_kmem);
66         EXIT;
67 }
68
69 static void lovsub_parent_lock(const struct lu_env *env, struct lov_lock *lov)
70 {
71         struct cl_lock *parent;
72
73         ENTRY;
74         parent = lov->lls_cl.cls_lock;
75         cl_lock_get(parent);
76         lu_ref_add(&parent->cll_reference, "lovsub-parent", cfs_current());
77         cl_lock_mutex_get(env, parent);
78         EXIT;
79 }
80
81 static void lovsub_parent_unlock(const struct lu_env *env, struct lov_lock *lov)
82 {
83         struct cl_lock *parent;
84
85         ENTRY;
86         parent = lov->lls_cl.cls_lock;
87         cl_lock_mutex_put(env, lov->lls_cl.cls_lock);
88         lu_ref_del(&parent->cll_reference, "lovsub-parent", cfs_current());
89         cl_lock_put(env, parent);
90         EXIT;
91 }
92
93 /**
94  * Implements cl_lock_operations::clo_state() method for lovsub layer, which
95  * method is called whenever sub-lock state changes. Propagates state change
96  * to the top-locks.
97  */
98 static void lovsub_lock_state(const struct lu_env *env,
99                               const struct cl_lock_slice *slice,
100                               enum cl_lock_state state)
101 {
102         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
103         struct lov_lock_link *scan;
104
105         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
106         ENTRY;
107
108         cfs_list_for_each_entry(scan, &sub->lss_parents, lll_list) {
109                 struct lov_lock *lov    = scan->lll_super;
110                 struct cl_lock  *parent = lov->lls_cl.cls_lock;
111
112                 if (sub->lss_active != parent) {
113                         lovsub_parent_lock(env, lov);
114                         cl_lock_signal(env, parent);
115                         lovsub_parent_unlock(env, lov);
116                 }
117         }
118         EXIT;
119 }
120
121 /**
122  * Implementation of cl_lock_operation::clo_weigh() estimating lock weight by
123  * asking parent lock.
124  */
125 static unsigned long lovsub_lock_weigh(const struct lu_env *env,
126                                        const struct cl_lock_slice *slice)
127 {
128         struct lovsub_lock *lock = cl2lovsub_lock(slice);
129         struct lov_lock    *lov;
130         unsigned long       dumbbell;
131
132         ENTRY;
133
134         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
135
136         if (!cfs_list_empty(&lock->lss_parents)) {
137                 /*
138                  * It is not clear whether all parents have to be asked and
139                  * their estimations summed, or it is enough to ask one. For
140                  * the current usages, one is always enough.
141                  */
142                 lov = container_of(lock->lss_parents.next,
143                                    struct lov_lock_link, lll_list)->lll_super;
144
145                 lovsub_parent_lock(env, lov);
146                 dumbbell = cl_lock_weigh(env, lov->lls_cl.cls_lock);
147                 lovsub_parent_unlock(env, lov);
148         } else
149                 dumbbell = 0;
150
151         RETURN(dumbbell);
152 }
153
154 /**
155  * Maps start/end offsets within a stripe, to offsets within a file.
156  */
157 static void lovsub_lock_descr_map(const struct cl_lock_descr *in,
158                                   struct lov_object *obj,
159                                   int stripe, struct cl_lock_descr *out)
160 {
161         struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
162         pgoff_t size; /* stripe size in pages */
163         pgoff_t skip; /* how many pages in every stripe are occupied by
164                        * "other" stripes */
165         pgoff_t start;
166         pgoff_t end;
167
168         ENTRY;
169         start = in->cld_start;
170         end   = in->cld_end;
171
172         if (lsm->lsm_stripe_count > 1) {
173                 size = cl_index(lov2cl(obj), lsm->lsm_stripe_size);
174                 skip = (lsm->lsm_stripe_count - 1) * size;
175
176                 /* XXX overflow check here? */
177                 start += start/size * skip + stripe * size;
178
179                 if (end != CL_PAGE_EOF) {
180                         end += end/size * skip + stripe * size;
181                         /*
182                          * And check for overflow...
183                          */
184                         if (end < in->cld_end)
185                                 end = CL_PAGE_EOF;
186                 }
187         }
188         out->cld_start = start;
189         out->cld_end   = end;
190         EXIT;
191 }
192
193 /**
194  * Adjusts parent lock extent when a sub-lock is attached to a parent. This is
195  * called in two ways:
196  *
197  *     - as part of receive call-back, when server returns granted extent to
198  *       the client, and
199  *
200  *     - when top-lock finds existing sub-lock in the cache.
201  *
202  * Note, that lock mode is not propagated to the parent: i.e., if CLM_READ
203  * top-lock matches CLM_WRITE sub-lock, top-lock is still CLM_READ.
204  */
205 int lov_sublock_modify(const struct lu_env *env, struct lov_lock *lov,
206                        struct lovsub_lock *sublock,
207                        const struct cl_lock_descr *d, int idx)
208 {
209         struct cl_lock       *parent;
210         struct lovsub_object *subobj;
211         struct cl_lock_descr *pd;
212         struct cl_lock_descr *parent_descr;
213         int                   result;
214
215         parent       = lov->lls_cl.cls_lock;
216         parent_descr = &parent->cll_descr;
217         LASSERT(cl_lock_mode_match(d->cld_mode, parent_descr->cld_mode));
218
219         subobj = cl2lovsub(sublock->lss_cl.cls_obj);
220         pd     = &lov_env_info(env)->lti_ldescr;
221
222         pd->cld_obj  = parent_descr->cld_obj;
223         pd->cld_mode = parent_descr->cld_mode;
224         pd->cld_gid  = parent_descr->cld_gid;
225         lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
226         lov->lls_sub[idx].sub_got = *d;
227         /*
228          * Notify top-lock about modification, if lock description changes
229          * materially.
230          */
231         if (!cl_lock_ext_match(parent_descr, pd))
232                 result = cl_lock_modify(env, parent, pd);
233         else
234                 result = 0;
235         return result;
236 }
237
238 static int lovsub_lock_modify(const struct lu_env *env,
239                               const struct cl_lock_slice *s,
240                               const struct cl_lock_descr *d)
241 {
242         struct lovsub_lock   *lock   = cl2lovsub_lock(s);
243         struct lov_lock_link *scan;
244         struct lov_lock      *lov;
245         int result                   = 0;
246
247         ENTRY;
248
249         LASSERT(cl_lock_mode_match(d->cld_mode,
250                                    s->cls_lock->cll_descr.cld_mode));
251         cfs_list_for_each_entry(scan, &lock->lss_parents, lll_list) {
252                 int rc;
253
254                 lov = scan->lll_super;
255                 lovsub_parent_lock(env, lov);
256                 rc = lov_sublock_modify(env, lov, lock, d, scan->lll_idx);
257                 lovsub_parent_unlock(env, lov);
258                 result = result ?: rc;
259         }
260         RETURN(result);
261 }
262
263 static int lovsub_lock_closure(const struct lu_env *env,
264                                const struct cl_lock_slice *slice,
265                                struct cl_lock_closure *closure)
266 {
267         struct lovsub_lock   *sub;
268         struct cl_lock       *parent;
269         struct lov_lock_link *scan;
270         int                   result;
271
272         LASSERT(cl_lock_is_mutexed(slice->cls_lock));
273         ENTRY;
274
275         sub    = cl2lovsub_lock(slice);
276         result = 0;
277
278         cfs_list_for_each_entry(scan, &sub->lss_parents, lll_list) {
279                 parent = scan->lll_super->lls_cl.cls_lock;
280                 result = cl_lock_closure_build(env, parent, closure);
281                 if (result != 0)
282                         break;
283         }
284         RETURN(result);
285 }
286
287 /**
288  * A helper function for lovsub_lock_delete() that deals with a given parent
289  * top-lock.
290  */
291 static int lovsub_lock_delete_one(const struct lu_env *env,
292                                   struct cl_lock *child, struct lov_lock *lov)
293 {
294         struct cl_lock *parent;
295         int             result;
296         ENTRY;
297
298         parent = lov->lls_cl.cls_lock;
299         if (parent->cll_error)
300                 RETURN(0);
301
302         result = 0;
303         switch (parent->cll_state) {
304         case CLS_QUEUING:
305         case CLS_FREEING:
306                 cl_lock_signal(env, parent);
307                 break;
308         case CLS_INTRANSIT:
309                 /*
310                  * Here lies a problem: a sub-lock is canceled while top-lock
311                  * is being unlocked. Top-lock cannot be moved into CLS_NEW
312                  * state, because unlocking has to succeed eventually by
313                  * placing lock into CLS_CACHED (or failing it), see
314                  * cl_unuse_try(). Nor can top-lock be left in CLS_CACHED
315                  * state, because lov maintains an invariant that all
316                  * sub-locks exist in CLS_CACHED (this allows cached top-lock
317                  * to be reused immediately). Nor can we wait for top-lock
318                  * state to change, because this can be synchronous to the
319                  * current thread.
320                  *
321                  * We know for sure that lov_lock_unuse() will be called at
322                  * least one more time to finish un-using, so leave a mark on
323                  * the top-lock, that will be seen by the next call to
324                  * lov_lock_unuse().
325                  */
326                 if (cl_lock_is_intransit(parent))
327                         lov->lls_cancel_race = 1;
328                 break;
329         case CLS_CACHED:
330                 /*
331                  * if a sub-lock is canceled move its top-lock into CLS_NEW
332                  * state to preserve an invariant that a top-lock in
333                  * CLS_CACHED is immediately ready for re-use (i.e., has all
334                  * sub-locks), and so that next attempt to re-use the top-lock
335                  * enqueues missing sub-lock.
336                  */
337                 cl_lock_state_set(env, parent, CLS_NEW);
338                 /* fall through */
339         case CLS_NEW:
340                 /*
341                  * if last sub-lock is canceled, destroy the top-lock (which
342                  * is now `empty') proactively.
343                  */
344                 if (lov->lls_nr_filled == 0) {
345                         /* ... but unfortunately, this cannot be done easily,
346                          * as cancellation of a top-lock might acquire mutices
347                          * of its other sub-locks, violating lock ordering,
348                          * see cl_lock_{cancel,delete}() preconditions.
349                          *
350                          * To work around this, the mutex of this sub-lock is
351                          * released, top-lock is destroyed, and sub-lock mutex
352                          * acquired again. The list of parents has to be
353                          * re-scanned from the beginning after this.
354                          *
355                          * Only do this if no mutices other than on @child and
356                          * @parent are held by the current thread.
357                          *
358                          * TODO: The lock modal here is too complex, because
359                          * the lock may be canceled and deleted by voluntarily:
360                          *    cl_lock_request
361                          *      -> osc_lock_enqueue_wait
362                          *        -> osc_lock_cancel_wait
363                          *          -> cl_lock_delete
364                          *            -> lovsub_lock_delete
365                          *              -> cl_lock_cancel/delete
366                          *                -> ...
367                          *
368                          * The better choice is to spawn a kernel thread for
369                          * this purpose. -jay
370                          */
371                         if (cl_lock_nr_mutexed(env) == 2) {
372                                 cl_lock_mutex_put(env, child);
373                                 cl_lock_cancel(env, parent);
374                                 cl_lock_delete(env, parent);
375                                 result = 1;
376                         }
377                 }
378                 break;
379         case CLS_ENQUEUED:
380         case CLS_HELD:
381                 CL_LOCK_DEBUG(D_ERROR, env, parent, "Delete CLS_HELD lock\n");
382         default:
383                 CERROR("Impossible state: %d\n", parent->cll_state);
384                 LBUG();
385                 break;
386         }
387
388         RETURN(result);
389 }
390
391 /**
392  * An implementation of cl_lock_operations::clo_delete() method. This is
393  * invoked in "bottom-to-top" delete, when lock destruction starts from the
394  * sub-lock (e.g, as a result of ldlm lock LRU policy).
395  */
396 static void lovsub_lock_delete(const struct lu_env *env,
397                                const struct cl_lock_slice *slice)
398 {
399         struct cl_lock     *child = slice->cls_lock;
400         struct lovsub_lock *sub   = cl2lovsub_lock(slice);
401         int restart;
402
403         LASSERT(cl_lock_is_mutexed(child));
404
405         ENTRY;
406         /*
407          * Destruction of a sub-lock might take multiple iterations, because
408          * when the last sub-lock of a given top-lock is deleted, top-lock is
409          * canceled proactively, and this requires to release sub-lock
410          * mutex. Once sub-lock mutex has been released, list of its parents
411          * has to be re-scanned from the beginning.
412          */
413         do {
414                 struct lov_lock      *lov;
415                 struct lov_lock_link *scan;
416                 struct lov_lock_link *temp;
417                 struct lov_lock_sub  *subdata;
418
419                 restart = 0;
420                 cfs_list_for_each_entry_safe(scan, temp,
421                                              &sub->lss_parents, lll_list) {
422                         lov     = scan->lll_super;
423                         subdata = &lov->lls_sub[scan->lll_idx];
424                         lovsub_parent_lock(env, lov);
425                         subdata->sub_got = subdata->sub_descr;
426                         lov_lock_unlink(env, scan, sub);
427                         restart = lovsub_lock_delete_one(env, child, lov);
428                         lovsub_parent_unlock(env, lov);
429
430                         if (restart) {
431                                 cl_lock_mutex_get(env, child);
432                                 break;
433                         }
434                }
435         } while (restart);
436         EXIT;
437 }
438
439 static int lovsub_lock_print(const struct lu_env *env, void *cookie,
440                              lu_printer_t p, const struct cl_lock_slice *slice)
441 {
442         struct lovsub_lock   *sub = cl2lovsub_lock(slice);
443         struct lov_lock      *lov;
444         struct lov_lock_link *scan;
445
446         cfs_list_for_each_entry(scan, &sub->lss_parents, lll_list) {
447                 lov = scan->lll_super;
448                 (*p)(env, cookie, "[%d %p ", scan->lll_idx, lov);
449                 if (lov != NULL)
450                         cl_lock_descr_print(env, cookie, p,
451                                             &lov->lls_cl.cls_lock->cll_descr);
452                 (*p)(env, cookie, "] ");
453         }
454         return 0;
455 }
456
457 static const struct cl_lock_operations lovsub_lock_ops = {
458         .clo_fini    = lovsub_lock_fini,
459         .clo_state   = lovsub_lock_state,
460         .clo_delete  = lovsub_lock_delete,
461         .clo_modify  = lovsub_lock_modify,
462         .clo_closure = lovsub_lock_closure,
463         .clo_weigh   = lovsub_lock_weigh,
464         .clo_print   = lovsub_lock_print
465 };
466
467 int lovsub_lock_init(const struct lu_env *env, struct cl_object *obj,
468                      struct cl_lock *lock, const struct cl_io *io)
469 {
470         struct lovsub_lock *lsk;
471         int result;
472
473         ENTRY;
474         OBD_SLAB_ALLOC_PTR_GFP(lsk, lovsub_lock_kmem, CFS_ALLOC_IO);
475         if (lsk != NULL) {
476                 CFS_INIT_LIST_HEAD(&lsk->lss_parents);
477                 cl_lock_slice_add(lock, &lsk->lss_cl, obj, &lovsub_lock_ops);
478                 result = 0;
479         } else
480                 result = -ENOMEM;
481         RETURN(result);
482 }
483
484 /** @} lov */