Whamcloud - gitweb
b=20595
[fs/lustre-release.git] / lustre / lov / lov_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_object for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 /** \addtogroup lov lov @{ */
42
43 #define DEBUG_SUBSYSTEM S_LOV
44
45 #include "lov_cl_internal.h"
46
47 /*****************************************************************************
48  *
49  * Layout operations.
50  *
51  */
52
53 struct lov_layout_operations {
54         int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
55                         struct lov_object *lov,
56                         const struct cl_object_conf *conf,
57                         union lov_layout_state *state);
58         void (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
59                            union lov_layout_state *state);
60         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
61                          union lov_layout_state *state);
62         void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
63                             union lov_layout_state *state);
64         int  (*llo_print)(const struct lu_env *env, void *cookie,
65                           lu_printer_t p, const struct lu_object *o);
66         struct cl_page *(*llo_page_init)(const struct lu_env *env,
67                                          struct cl_object *obj,
68                                          struct cl_page *page,
69                                          cfs_page_t *vmpage);
70         int  (*llo_lock_init)(const struct lu_env *env,
71                               struct cl_object *obj, struct cl_lock *lock,
72                               const struct cl_io *io);
73         int  (*llo_io_init)(const struct lu_env *env,
74                             struct cl_object *obj, struct cl_io *io);
75         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
76                             struct cl_attr *attr);
77 };
78
79 /*****************************************************************************
80  *
81  * Lov object layout operations.
82  *
83  */
84
85 static void lov_install_empty(const struct lu_env *env,
86                               struct lov_object *lov,
87                               union  lov_layout_state *state)
88 {
89         /*
90          * File without objects.
91          */
92 }
93
94 static int lov_init_empty(const struct lu_env *env,
95                           struct lov_device *dev, struct lov_object *lov,
96                           const struct cl_object_conf *conf,
97                           union  lov_layout_state *state)
98 {
99         return 0;
100 }
101
102 static void lov_install_raid0(const struct lu_env *env,
103                               struct lov_object *lov,
104                               union  lov_layout_state *state)
105 {
106         lov->u = *state;
107 }
108
109 static void oinfo_get_fid(const struct lov_oinfo *oinfo, struct lu_fid *fid)
110 {
111         __u64 idx = oinfo->loi_id;
112
113         /* See idif definition in wiki:CMD3_interoperability_architecture */
114
115         LASSERT(oinfo->loi_gr < 1ULL << 16);
116         LASSERT(oinfo->loi_id < 1ULL << 49);
117         ENTRY;
118
119         /*
120          * Now that the fid of stripe is not unique now, ost_idx have to
121          * be used to make it unique. This is ok because the stripe fids
122          * are just used in client side(to locate the objects). -jay
123          */
124         fid->f_seq = ((__u64)oinfo->loi_ost_idx) << 32 |
125                      oinfo->loi_gr << 16 | idx >> 32;
126         fid->f_oid = idx; /* truncated to 32 bits by assignment */
127         fid->f_ver = 0;
128         EXIT;
129 }
130
131 static struct cl_object *lov_sub_find(const struct lu_env *env,
132                                       struct cl_device *dev,
133                                       const struct lu_fid *fid,
134                                       const struct cl_object_conf *conf)
135 {
136         struct lu_object *o;
137
138         ENTRY;
139         o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
140         LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
141         RETURN(lu2cl(o));
142 }
143
144 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
145                         struct cl_object *stripe,
146                         struct lov_layout_raid0 *r0, int idx)
147 {
148         struct cl_object_header *hdr;
149         struct cl_object_header *subhdr;
150         struct cl_object_header *parent;
151         struct lov_oinfo        *oinfo;
152         int result;
153
154         hdr    = cl_object_header(lov2cl(lov));
155         subhdr = cl_object_header(stripe);
156         parent = subhdr->coh_parent;
157
158         oinfo = r0->lo_lsm->lsm_oinfo[idx];
159         CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" gr: "LPU64
160                " idx: %d gen: %d\n",
161                PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
162                PFID(&hdr->coh_lu.loh_fid), hdr,
163                oinfo->loi_id, oinfo->loi_gr,
164                oinfo->loi_ost_idx, oinfo->loi_ost_gen);
165
166         if (parent == NULL) {
167                 subhdr->coh_parent = hdr;
168                 subhdr->coh_nesting = hdr->coh_nesting + 1;
169                 lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
170                 r0->lo_sub[idx] = cl2lovsub(stripe);
171                 r0->lo_sub[idx]->lso_super = lov;
172                 r0->lo_sub[idx]->lso_index = idx;
173                 result = 0;
174         } else {
175                 CERROR("Stripe is already owned by other file (%i).\n", idx);
176                 LU_OBJECT_DEBUG(D_ERROR, env, &stripe->co_lu, "\n");
177                 LU_OBJECT_DEBUG(D_ERROR, env, lu_object_top(&parent->coh_lu),
178                                 "old\n");
179                 LU_OBJECT_HEADER(D_ERROR, env, lov2lu(lov), "new\n");
180                 cl_object_put(env, stripe);
181                 result = -EIO;
182         }
183         return result;
184 }
185
186 static int lov_init_raid0(const struct lu_env *env,
187                           struct lov_device *dev, struct lov_object *lov,
188                           const struct cl_object_conf *conf,
189                           union  lov_layout_state *state)
190 {
191         int result;
192         int i;
193
194         struct cl_object        *stripe;
195         struct lov_thread_info  *lti     = lov_env_info(env);
196         struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
197         struct lov_stripe_md    *lsm     = conf->u.coc_md->lsm;
198         struct lu_fid           *ofid    = &lti->lti_fid;
199         struct lov_layout_raid0 *r0      = &state->raid0;
200
201         ENTRY;
202         r0->lo_nr  = conf->u.coc_md->lsm->lsm_stripe_count;
203         r0->lo_lsm = conf->u.coc_md->lsm;
204         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
205
206         OBD_ALLOC(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
207         if (r0->lo_sub != NULL) {
208                 result = 0;
209                 subconf->coc_inode = conf->coc_inode;
210                 /*
211                  * Create stripe cl_objects.
212                  */
213                 for (i = 0; i < r0->lo_nr && result == 0; ++i) {
214                         struct cl_device *subdev;
215                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
216                         int ost_idx = oinfo->loi_ost_idx;
217
218                         oinfo_get_fid(oinfo, ofid);
219                         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
220                         subconf->u.coc_oinfo = oinfo;
221                         LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
222                         stripe = lov_sub_find(env, subdev, ofid, subconf);
223                         if (!IS_ERR(stripe))
224                                 result = lov_init_sub(env, lov, stripe, r0, i);
225                         else
226                                 result = PTR_ERR(stripe);
227                 }
228         } else
229                 result = -ENOMEM;
230         RETURN(result);
231 }
232
233 static void lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
234                              union lov_layout_state *state)
235 {
236         LASSERT(lov->lo_type == LLT_EMPTY);
237 }
238
239 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
240                                struct lovsub_object *los, int idx)
241 {
242         struct cl_object        *sub;
243         struct lov_layout_raid0 *r0;
244         struct lu_site          *site;
245         cfs_waitlink_t          *waiter;
246
247         r0  = &lov->u.raid0;
248         LASSERT(r0->lo_sub[idx] == los);
249
250         sub  = lovsub2cl(los);
251         site = sub->co_lu.lo_dev->ld_site;
252
253         cl_object_kill(env, sub);
254         /* release a reference to the sub-object and ... */
255         lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
256         cl_object_put(env, sub);
257
258         /* ... wait until it is actually destroyed---sub-object clears its
259          * ->lo_sub[] slot in lovsub_object_fini() */
260         if (r0->lo_sub[idx] == los) {
261                 waiter = &lov_env_info(env)->lti_waiter;
262                 cfs_waitlink_init(waiter);
263                 cfs_waitq_add(&site->ls_marche_funebre, waiter);
264                 set_current_state(CFS_TASK_UNINT);
265
266                 while (r0->lo_sub[idx] == los)
267                         /* this wait-queue is signaled at the end of
268                          * lu_object_free(). */
269                         cfs_waitq_wait(waiter, CFS_TASK_UNINT);
270                 cfs_waitq_del(&site->ls_marche_funebre, waiter);
271         }
272         LASSERT(r0->lo_sub[idx] == NULL);
273 }
274
275 static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
276                              union lov_layout_state *state)
277 {
278         struct lov_layout_raid0 *r0 = &state->raid0;
279         int                      i;
280
281         ENTRY;
282         if (r0->lo_sub != NULL) {
283                 for (i = 0; i < r0->lo_nr; ++i) {
284                         struct lovsub_object *los = r0->lo_sub[i];
285
286                         if (los != NULL)
287                                 /*
288                                  * If top-level object is to be evicted from
289                                  * the cache, so are its sub-objects.
290                                  */
291                                 lov_subobject_kill(env, lov, los, i);
292                 }
293         }
294         EXIT;
295 }
296
297 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
298                            union lov_layout_state *state)
299 {
300         LASSERT(lov->lo_type == LLT_EMPTY);
301 }
302
303 static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
304                            union lov_layout_state *state)
305 {
306         struct lov_layout_raid0 *r0 = &state->raid0;
307
308         ENTRY;
309         if (r0->lo_sub != NULL) {
310                 OBD_FREE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
311                 r0->lo_sub = NULL;
312         }
313         EXIT;
314 }
315
316 static int lov_print_empty(const struct lu_env *env, void *cookie,
317                            lu_printer_t p, const struct lu_object *o)
318 {
319         (*p)(env, cookie, "empty\n");
320         return 0;
321 }
322
323 static int lov_print_raid0(const struct lu_env *env, void *cookie,
324                            lu_printer_t p, const struct lu_object *o)
325 {
326         struct lov_object       *lov = lu2lov(o);
327         struct lov_layout_raid0 *r0  = lov_r0(lov);
328         int i;
329
330         (*p)(env, cookie, "stripes: %d:\n", r0->lo_nr);
331         for (i = 0; i < r0->lo_nr; ++i) {
332                 struct lu_object *sub;
333
334                 if (r0->lo_sub[i] != NULL) {
335                         sub = lovsub2lu(r0->lo_sub[i]);
336                         lu_object_print(env, cookie, p, sub);
337                 } else
338                         (*p)(env, cookie, "sub %d absent\n", i);
339         }
340         return 0;
341 }
342
343 /**
344  * Implements cl_object_operations::coo_attr_get() method for an object
345  * without stripes (LLT_EMPTY layout type).
346  *
347  * The only attributes this layer is authoritative in this case is
348  * cl_attr::cat_blocks---it's 0.
349  */
350 static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
351                               struct cl_attr *attr)
352 {
353         attr->cat_blocks = 0;
354         return 0;
355 }
356
357 static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
358                               struct cl_attr *attr)
359 {
360         struct lov_object       *lov = cl2lov(obj);
361         struct lov_layout_raid0 *r0 = lov_r0(lov);
362         struct lov_stripe_md    *lsm = lov->u.raid0.lo_lsm;
363         struct ost_lvb          *lvb = &lov_env_info(env)->lti_lvb;
364         __u64                    kms;
365         int                      result = 0;
366
367         ENTRY;
368         if (!r0->lo_attr_valid) {
369                 /*
370                  * Fill LVB with attributes already initialized by the upper
371                  * layer.
372                  */
373                 cl_attr2lvb(lvb, attr);
374                 kms = attr->cat_kms;
375
376                 /*
377                  * XXX that should be replaced with a loop over sub-objects,
378                  * doing cl_object_attr_get() on them. But for now, let's
379                  * reuse old lov code.
380                  */
381
382                 /*
383                  * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
384                  * happy. It's not needed, because new code uses
385                  * ->coh_attr_guard spin-lock to protect consistency of
386                  * sub-object attributes.
387                  */
388                 lov_stripe_lock(lsm);
389                 result = lov_merge_lvb_kms(lsm, lvb, &kms);
390                 lov_stripe_unlock(lsm);
391                 if (result == 0) {
392                         cl_lvb2attr(attr, lvb);
393                         attr->cat_kms = kms;
394                         r0->lo_attr_valid = 1;
395                         r0->lo_attr = *attr;
396                 }
397         } else
398                 *attr = r0->lo_attr;
399         RETURN(result);
400 }
401
402 const static struct lov_layout_operations lov_dispatch[] = {
403         [LLT_EMPTY] = {
404                 .llo_init      = lov_init_empty,
405                 .llo_delete    = lov_delete_empty,
406                 .llo_fini      = lov_fini_empty,
407                 .llo_install   = lov_install_empty,
408                 .llo_print     = lov_print_empty,
409                 .llo_page_init = lov_page_init_empty,
410                 .llo_lock_init = NULL,
411                 .llo_io_init   = lov_io_init_empty,
412                 .llo_getattr   = lov_attr_get_empty
413         },
414         [LLT_RAID0] = {
415                 .llo_init      = lov_init_raid0,
416                 .llo_delete    = lov_delete_raid0,
417                 .llo_fini      = lov_fini_raid0,
418                 .llo_install   = lov_install_raid0,
419                 .llo_print     = lov_print_raid0,
420                 .llo_page_init = lov_page_init_raid0,
421                 .llo_lock_init = lov_lock_init_raid0,
422                 .llo_io_init   = lov_io_init_raid0,
423                 .llo_getattr   = lov_attr_get_raid0
424         }
425 };
426
427
428 /**
429  * Performs a double-dispatch based on the layout type of an object.
430  */
431 #define LOV_2DISPATCH_NOLOCK(obj, op, ...)                              \
432 ({                                                                      \
433         struct lov_object                      *__obj = (obj);          \
434         enum lov_layout_type                    __llt;                  \
435                                                                         \
436         __llt = __obj->lo_type;                                         \
437         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
438         lov_dispatch[__llt].op(__VA_ARGS__);                            \
439 })
440
441 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                       \
442 ({                                                                      \
443         struct lov_object                      *__obj = (obj);          \
444         int                                     __lock = !!(lock);      \
445         typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;               \
446                                                                         \
447         __lock &= __obj->lo_owner != cfs_current();                     \
448         if (__lock)                                                     \
449                 down_read(&__obj->lo_type_guard);                       \
450         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
451         if (__lock)                                                     \
452                 up_read(&__obj->lo_type_guard);                         \
453         __result;                                                       \
454 })
455
456 /**
457  * Performs a locked double-dispatch based on the layout type of an object.
458  */
459 #define LOV_2DISPATCH(obj, op, ...)                     \
460         LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
461
462 #define LOV_2DISPATCH_VOID(obj, op, ...)                                \
463 do {                                                                    \
464         struct lov_object                      *__obj = (obj);          \
465         enum lov_layout_type                    __llt;                  \
466                                                                         \
467         if (__obj->lo_owner != cfs_current())                           \
468                 down_read(&__obj->lo_type_guard);                       \
469         __llt = __obj->lo_type;                                         \
470         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
471         lov_dispatch[__llt].op(__VA_ARGS__);                            \
472         if (__obj->lo_owner != cfs_current())                           \
473                 up_read(&__obj->lo_type_guard);                         \
474 } while (0)
475
476 static int lov_layout_change(const struct lu_env *env,
477                              struct lov_object *obj, enum lov_layout_type llt,
478                              const struct cl_object_conf *conf)
479 {
480         int result;
481         union lov_layout_state       *state = &lov_env_info(env)->lti_state;
482         const struct lov_layout_operations *old_ops;
483         const struct lov_layout_operations *new_ops;
484
485         LASSERT(0 <= obj->lo_type && obj->lo_type < ARRAY_SIZE(lov_dispatch));
486         LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
487         ENTRY;
488
489         old_ops = &lov_dispatch[obj->lo_type];
490         new_ops = &lov_dispatch[llt];
491
492         result = new_ops->llo_init(env, lu2lov_dev(obj->lo_cl.co_lu.lo_dev),
493                                    obj, conf, state);
494         if (result == 0) {
495                 struct cl_object_header *hdr = cl_object_header(&obj->lo_cl);
496                 void                    *cookie;
497                 struct lu_env           *nested;
498                 int                      refcheck;
499
500                 cookie = cl_env_reenter();
501                 nested = cl_env_get(&refcheck);
502                 if (!IS_ERR(nested))
503                         cl_object_prune(nested, &obj->lo_cl);
504                 else
505                         result = PTR_ERR(nested);
506                 cl_env_put(nested, &refcheck);
507                 cl_env_reexit(cookie);
508
509                 old_ops->llo_fini(env, obj, &obj->u);
510                 LASSERT(list_empty(&hdr->coh_locks));
511                 LASSERT(hdr->coh_tree.rnode == NULL);
512                 LASSERT(hdr->coh_pages == 0);
513
514                 new_ops->llo_install(env, obj, state);
515                 obj->lo_type = llt;
516         } else
517                 new_ops->llo_fini(env, obj, state);
518         RETURN(result);
519 }
520
521 /*****************************************************************************
522  *
523  * Lov object operations.
524  *
525  */
526
527 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
528                     const struct lu_object_conf *conf)
529 {
530         struct lov_device            *dev   = lu2lov_dev(obj->lo_dev);
531         struct lov_object            *lov   = lu2lov(obj);
532         const struct cl_object_conf  *cconf = lu2cl_conf(conf);
533         union  lov_layout_state      *set   = &lov_env_info(env)->lti_state;
534         const struct lov_layout_operations *ops;
535         int result;
536
537         ENTRY;
538         init_rwsem(&lov->lo_type_guard);
539
540         /* no locking is necessary, as object is being created */
541         lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
542         ops = &lov_dispatch[lov->lo_type];
543         result = ops->llo_init(env, dev, lov, cconf, set);
544         if (result == 0)
545                 ops->llo_install(env, lov, set);
546         else
547                 ops->llo_fini(env, lov, set);
548         RETURN(result);
549 }
550
551 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
552                         const struct cl_object_conf *conf)
553 {
554         struct lov_object *lov = cl2lov(obj);
555         int result;
556
557         ENTRY;
558         /*
559          * Currently only LLT_EMPTY -> LLT_RAID0 transition is supported.
560          */
561         LASSERT(lov->lo_owner != cfs_current());
562         down_write(&lov->lo_type_guard);
563         LASSERT(lov->lo_owner == NULL);
564         lov->lo_owner = cfs_current();
565         if (lov->lo_type == LLT_EMPTY && conf->u.coc_md->lsm != NULL)
566                 result = lov_layout_change(env, lov, LLT_RAID0, conf);
567         else
568                 result = -EOPNOTSUPP;
569         lov->lo_owner = NULL;
570         up_write(&lov->lo_type_guard);
571         RETURN(result);
572 }
573
574 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
575 {
576         struct lov_object *lov = lu2lov(obj);
577
578         ENTRY;
579         LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
580         EXIT;
581 }
582
583 static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
584 {
585         struct lov_object *lov = lu2lov(obj);
586
587         ENTRY;
588         LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
589         lu_object_fini(obj);
590         OBD_SLAB_FREE_PTR(lov, lov_object_kmem);
591         EXIT;
592 }
593
594 static int lov_object_print(const struct lu_env *env, void *cookie,
595                             lu_printer_t p, const struct lu_object *o)
596 {
597         return LOV_2DISPATCH(lu2lov(o), llo_print, env, cookie, p, o);
598 }
599
600 struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj,
601                               struct cl_page *page, cfs_page_t *vmpage)
602 {
603         return LOV_2DISPATCH(cl2lov(obj),
604                              llo_page_init, env, obj, page, vmpage);
605 }
606
607 /**
608  * Implements cl_object_operations::clo_io_init() method for lov
609  * layer. Dispatches to the appropriate layout io initialization method.
610  */
611 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
612                 struct cl_io *io)
613 {
614         CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
615         /*
616          * Do not take lock in case of CIT_MISC io, because
617          *
618          *     - if this is an io for a glimpse, then we don't care;
619          *
620          *     - if this not a glimpse (writepage or lock cancellation), then
621          *       layout change cannot happen because a page or a lock
622          *       already exist; and
623          *
624          *     - lock ordering (lock mutex nests within layout rw-semaphore)
625          *       is obeyed in case of lock cancellation.
626          */
627         return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
628                                      io->ci_type != CIT_MISC, env, obj, io);
629 }
630
631 /**
632  * An implementation of cl_object_operations::clo_attr_get() method for lov
633  * layer. For raid0 layout this collects and merges attributes of all
634  * sub-objects.
635  */
636 static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
637                         struct cl_attr *attr)
638 {
639         /* do not take lock, as this function is called under a
640          * spin-lock. Layout is protected from changing by ongoing IO. */
641         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
642 }
643
644 static int lov_attr_set(const struct lu_env *env, struct cl_object *obj,
645                         const struct cl_attr *attr, unsigned valid)
646 {
647         /*
648          * No dispatch is required here, as no layout implements this.
649          */
650         return 0;
651 }
652
653 int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
654                   struct cl_lock *lock, const struct cl_io *io)
655 {
656         return LOV_2DISPATCH(cl2lov(obj), llo_lock_init, env, obj, lock, io);
657 }
658
659 static const struct cl_object_operations lov_ops = {
660         .coo_page_init = lov_page_init,
661         .coo_lock_init = lov_lock_init,
662         .coo_io_init   = lov_io_init,
663         .coo_attr_get  = lov_attr_get,
664         .coo_attr_set  = lov_attr_set,
665         .coo_conf_set  = lov_conf_set
666 };
667
668 static const struct lu_object_operations lov_lu_obj_ops = {
669         .loo_object_init      = lov_object_init,
670         .loo_object_delete    = lov_object_delete,
671         .loo_object_release   = NULL,
672         .loo_object_free      = lov_object_free,
673         .loo_object_print     = lov_object_print,
674         .loo_object_invariant = NULL
675 };
676
677 struct lu_object *lov_object_alloc(const struct lu_env *env,
678                                    const struct lu_object_header *unused,
679                                    struct lu_device *dev)
680 {
681         struct lov_object *lov;
682         struct lu_object  *obj;
683
684         ENTRY;
685         OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, CFS_ALLOC_IO);
686         if (lov != NULL) {
687                 obj = lov2lu(lov);
688                 lu_object_init(obj, NULL, dev);
689                 lov->lo_cl.co_ops = &lov_ops;
690                 lov->lo_type = -1; /* invalid, to catch uninitialized type */
691                 /*
692                  * object io operation vector (cl_object::co_iop) is installed
693                  * later in lov_object_init(), as different vectors are used
694                  * for object with different layouts.
695                  */
696                 obj->lo_ops = &lov_lu_obj_ops;
697         } else
698                 obj = NULL;
699         RETURN(obj);
700 }
701
702 /** @} lov */