Whamcloud - gitweb
lov_object: simplify and document lov_object life-time rules.
[fs/lustre-release.git] / lustre / lov / lov_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_object for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 /** \addtogroup lov lov @{ */
42
43 #define DEBUG_SUBSYSTEM S_LOV
44
45 #include "lov_cl_internal.h"
46
47 /*****************************************************************************
48  *
49  * Layout operations.
50  *
51  */
52
53 struct lov_layout_operations {
54         int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
55                         struct lov_object *lov,
56                         const struct cl_object_conf *conf,
57                         union lov_layout_state *state);
58         void (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
59                            union lov_layout_state *state);
60         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
61                          union lov_layout_state *state);
62         void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
63                             union lov_layout_state *state);
64         int  (*llo_print)(const struct lu_env *env, void *cookie,
65                           lu_printer_t p, const struct lu_object *o);
66         struct cl_page *(*llo_page_init)(const struct lu_env *env,
67                                          struct cl_object *obj,
68                                          struct cl_page *page,
69                                          cfs_page_t *vmpage);
70         int  (*llo_lock_init)(const struct lu_env *env,
71                               struct cl_object *obj, struct cl_lock *lock,
72                               const struct cl_io *io);
73         int  (*llo_io_init)(const struct lu_env *env,
74                             struct cl_object *obj, struct cl_io *io);
75         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
76                             struct cl_attr *attr);
77 };
78
79 /*****************************************************************************
80  *
81  * Lov object layout operations.
82  *
83  */
84
85 static void lov_install_empty(const struct lu_env *env,
86                               struct lov_object *lov,
87                               union  lov_layout_state *state)
88 {
89         /*
90          * File without objects.
91          */
92 }
93
94 static int lov_init_empty(const struct lu_env *env,
95                           struct lov_device *dev, struct lov_object *lov,
96                           const struct cl_object_conf *conf,
97                           union  lov_layout_state *state)
98 {
99         return 0;
100 }
101
102 static void lov_install_raid0(const struct lu_env *env,
103                               struct lov_object *lov,
104                               union  lov_layout_state *state)
105 {
106         lov->u = *state;
107 }
108
109 static void oinfo_get_fid(const struct lov_oinfo *oinfo, struct lu_fid *fid)
110 {
111         __u64 idx = oinfo->loi_id;
112
113         /* See idif definition in wiki:CMD3_interoperability_architecture */
114
115         LASSERT(oinfo->loi_gr < 1ULL << 16);
116         LASSERT(oinfo->loi_id < 1ULL << 49);
117         ENTRY;
118
119         /*
120          * Now that the fid of stripe is not unique now, ost_idx have to
121          * be used to make it unique. This is ok because the stripe fids
122          * are just used in client side(to locate the objects). -jay
123          */
124         fid->f_seq = ((__u64)oinfo->loi_ost_idx) << 32 |
125                      oinfo->loi_gr << 16 | idx >> 32;
126         fid->f_oid = idx; /* truncated to 32 bits by assignment */
127         fid->f_ver = 0;
128         EXIT;
129 }
130
131 static struct cl_object *lov_sub_find(const struct lu_env *env,
132                                       struct cl_device *dev,
133                                       const struct lu_fid *fid,
134                                       const struct cl_object_conf *conf)
135 {
136         struct lu_object *o;
137
138         ENTRY;
139         o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
140         LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
141         RETURN(lu2cl(o));
142 }
143
144 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
145                         struct cl_object *stripe,
146                         struct lov_layout_raid0 *r0, int idx)
147 {
148         struct cl_object_header *hdr;
149         struct cl_object_header *subhdr;
150         struct cl_object_header *parent;
151         struct lov_oinfo        *oinfo;
152         int result;
153
154         hdr    = cl_object_header(lov2cl(lov));
155         subhdr = cl_object_header(stripe);
156         parent = subhdr->coh_parent;
157
158         oinfo = r0->lo_lsm->lsm_oinfo[idx];
159         CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" gr: "LPU64
160                " idx: %d gen: %d\n",
161                PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
162                PFID(&hdr->coh_lu.loh_fid), hdr,
163                oinfo->loi_id, oinfo->loi_gr,
164                oinfo->loi_ost_idx, oinfo->loi_ost_gen);
165
166         if (parent == NULL) {
167                 subhdr->coh_parent = hdr;
168                 subhdr->coh_nesting = hdr->coh_nesting + 1;
169                 lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
170                 r0->lo_sub[idx] = cl2lovsub(stripe);
171                 r0->lo_sub[idx]->lso_super = lov;
172                 r0->lo_sub[idx]->lso_index = idx;
173                 result = 0;
174         } else {
175                 CERROR("Stripe is already owned by other file (%i).\n", idx);
176                 LU_OBJECT_DEBUG(D_ERROR, env, &stripe->co_lu, "\n");
177                 LU_OBJECT_DEBUG(D_ERROR, env, lu_object_top(&parent->coh_lu),
178                                 "old\n");
179                 LU_OBJECT_HEADER(D_ERROR, env, lov2lu(lov), "new\n");
180                 cl_object_put(env, stripe);
181                 result = -EIO;
182         }
183         return result;
184 }
185
186 static int lov_init_raid0(const struct lu_env *env,
187                           struct lov_device *dev, struct lov_object *lov,
188                           const struct cl_object_conf *conf,
189                           union  lov_layout_state *state)
190 {
191         int result;
192         int i;
193
194         struct cl_object        *stripe;
195         struct lov_thread_info  *lti     = lov_env_info(env);
196         struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
197         struct lov_stripe_md    *lsm     = conf->u.coc_md->lsm;
198         struct lu_fid           *ofid    = &lti->lti_fid;
199         struct lov_layout_raid0 *r0      = &state->raid0;
200
201         ENTRY;
202         r0->lo_nr  = conf->u.coc_md->lsm->lsm_stripe_count;
203         r0->lo_lsm = conf->u.coc_md->lsm;
204         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
205
206         OBD_ALLOC(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
207         if (r0->lo_sub != NULL) {
208                 result = 0;
209                 subconf->coc_inode = conf->coc_inode;
210                 /*
211                  * Create stripe cl_objects.
212                  */
213                 for (i = 0; i < r0->lo_nr && result == 0; ++i) {
214                         struct cl_device *subdev;
215                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
216                         int ost_idx = oinfo->loi_ost_idx;
217
218                         oinfo_get_fid(oinfo, ofid);
219                         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
220                         subconf->u.coc_oinfo = oinfo;
221                         stripe = lov_sub_find(env, subdev, ofid, subconf);
222                         if (!IS_ERR(stripe))
223                                 result = lov_init_sub(env, lov, stripe, r0, i);
224                         else
225                                 result = PTR_ERR(stripe);
226                 }
227         } else
228                 result = -ENOMEM;
229         RETURN(result);
230 }
231
232 static void lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
233                              union lov_layout_state *state)
234 {
235         LASSERT(lov->lo_type == LLT_EMPTY);
236 }
237
238 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
239                                struct lovsub_object *los, int idx)
240 {
241         struct cl_object        *sub;
242         struct lov_layout_raid0 *r0;
243         struct lu_site          *site;
244         cfs_waitlink_t          *waiter;
245
246         r0  = &lov->u.raid0;
247         sub = lovsub2cl(los);
248         LASSERT(r0->lo_sub[idx] == los);
249
250         cl_object_kill(env, sub);
251         /* release a reference to the sub-object and ... */
252         lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
253         cl_object_put(env, sub);
254
255         /* ... wait until it is actually destroyed---sub-object clears its
256          * ->lo_sub[] slot in lovsub_object_fini() */
257         if (r0->lo_sub[idx] == los) {
258                 waiter = &lov_env_info(env)->lti_waiter;
259                 site   = sub->co_lu.lo_dev->ld_site;
260                 cfs_waitlink_init(waiter);
261                 cfs_waitq_add(&site->ls_marche_funebre, waiter);
262                 set_current_state(CFS_TASK_UNINT);
263
264                 while (r0->lo_sub[idx] == los)
265                         /* this wait-queue is signaled at the end of
266                          * lu_object_free(). */
267                         cfs_waitq_wait(waiter, CFS_TASK_UNINT);
268                 cfs_waitq_del(&site->ls_marche_funebre, waiter);
269         }
270         LASSERT(r0->lo_sub[idx] == NULL);
271 }
272
273 static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
274                              union lov_layout_state *state)
275 {
276         struct lov_layout_raid0 *r0 = &state->raid0;
277         int                      i;
278
279         ENTRY;
280         if (r0->lo_sub != NULL) {
281                 for (i = 0; i < r0->lo_nr; ++i) {
282                         struct lovsub_object *los = r0->lo_sub[i];
283
284                         if (los != NULL)
285                                 /*
286                                  * If top-level object is to be evicted from
287                                  * the cache, so are its sub-objects.
288                                  */
289                                 lov_subobject_kill(env, lov, los, i);
290                 }
291         }
292         EXIT;
293 }
294
295 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
296                            union lov_layout_state *state)
297 {
298         LASSERT(lov->lo_type == LLT_EMPTY);
299 }
300
301 static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
302                            union lov_layout_state *state)
303 {
304         struct lov_layout_raid0 *r0 = &state->raid0;
305
306         ENTRY;
307         if (r0->lo_sub != NULL) {
308                 OBD_FREE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
309                 r0->lo_sub = NULL;
310         }
311         EXIT;
312 }
313
314 static int lov_print_empty(const struct lu_env *env, void *cookie,
315                            lu_printer_t p, const struct lu_object *o)
316 {
317         (*p)(env, cookie, "empty\n");
318         return 0;
319 }
320
321 static int lov_print_raid0(const struct lu_env *env, void *cookie,
322                            lu_printer_t p, const struct lu_object *o)
323 {
324         struct lov_object       *lov = lu2lov(o);
325         struct lov_layout_raid0 *r0  = lov_r0(lov);
326         int i;
327
328         (*p)(env, cookie, "stripes: %d:\n", r0->lo_nr);
329         for (i = 0; i < r0->lo_nr; ++i) {
330                 struct lu_object *sub;
331
332                 if (r0->lo_sub[i] != NULL) {
333                         sub = lovsub2lu(r0->lo_sub[i]);
334                         lu_object_print(env, cookie, p, sub);
335                 } else
336                         (*p)(env, cookie, "sub %d absent\n", i);
337         }
338         return 0;
339 }
340
341 /**
342  * Implements cl_object_operations::coo_attr_get() method for an object
343  * without stripes (LLT_EMPTY layout type).
344  *
345  * The only attributes this layer is authoritative in this case is
346  * cl_attr::cat_blocks---it's 0.
347  */
348 static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
349                               struct cl_attr *attr)
350 {
351         attr->cat_blocks = 0;
352         return 0;
353 }
354
355 static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
356                               struct cl_attr *attr)
357 {
358         struct lov_object       *lov = cl2lov(obj);
359         struct lov_layout_raid0 *r0 = lov_r0(lov);
360         struct lov_stripe_md    *lsm = lov->u.raid0.lo_lsm;
361         struct ost_lvb          *lvb = &lov_env_info(env)->lti_lvb;
362         __u64                    kms;
363         int                      result = 0;
364
365         ENTRY;
366         if (!r0->lo_attr_valid) {
367                 /*
368                  * Fill LVB with attributes already initialized by the upper
369                  * layer.
370                  */
371                 cl_attr2lvb(lvb, attr);
372                 kms = attr->cat_kms;
373
374                 /*
375                  * XXX that should be replaced with a loop over sub-objects,
376                  * doing cl_object_attr_get() on them. But for now, let's
377                  * reuse old lov code.
378                  */
379
380                 /*
381                  * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
382                  * happy. It's not needed, because new code uses
383                  * ->coh_attr_guard spin-lock to protect consistency of
384                  * sub-object attributes.
385                  */
386                 lov_stripe_lock(lsm);
387                 result = lov_merge_lvb_kms(lsm, lvb, &kms);
388                 lov_stripe_unlock(lsm);
389                 if (result == 0) {
390                         cl_lvb2attr(attr, lvb);
391                         attr->cat_kms = kms;
392                         r0->lo_attr_valid = 1;
393                         r0->lo_attr = *attr;
394                 }
395         } else
396                 *attr = r0->lo_attr;
397         RETURN(result);
398 }
399
400 const static struct lov_layout_operations lov_dispatch[] = {
401         [LLT_EMPTY] = {
402                 .llo_init      = lov_init_empty,
403                 .llo_delete    = lov_delete_empty,
404                 .llo_fini      = lov_fini_empty,
405                 .llo_install   = lov_install_empty,
406                 .llo_print     = lov_print_empty,
407                 .llo_page_init = lov_page_init_empty,
408                 .llo_lock_init = NULL,
409                 .llo_io_init   = lov_io_init_empty,
410                 .llo_getattr   = lov_attr_get_empty
411         },
412         [LLT_RAID0] = {
413                 .llo_init      = lov_init_raid0,
414                 .llo_delete    = lov_delete_raid0,
415                 .llo_fini      = lov_fini_raid0,
416                 .llo_install   = lov_install_raid0,
417                 .llo_print     = lov_print_raid0,
418                 .llo_page_init = lov_page_init_raid0,
419                 .llo_lock_init = lov_lock_init_raid0,
420                 .llo_io_init   = lov_io_init_raid0,
421                 .llo_getattr   = lov_attr_get_raid0
422         }
423 };
424
425
426 /**
427  * Performs a double-dispatch based on the layout type of an object.
428  */
429 #define LOV_2DISPATCH_NOLOCK(obj, op, ...)                              \
430 ({                                                                      \
431         struct lov_object                      *__obj = (obj);          \
432         enum lov_layout_type                    __llt;                  \
433                                                                         \
434         __llt = __obj->lo_type;                                         \
435         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
436         lov_dispatch[__llt].op(__VA_ARGS__);                            \
437 })
438
439 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                       \
440 ({                                                                      \
441         struct lov_object                      *__obj = (obj);          \
442         int                                     __lock = !!(lock);      \
443         typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;               \
444                                                                         \
445         __lock &= __obj->lo_owner != cfs_current();                     \
446         if (__lock)                                                     \
447                 down_read(&__obj->lo_type_guard);                       \
448         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
449         if (__lock)                                                     \
450                 up_read(&__obj->lo_type_guard);                         \
451         __result;                                                       \
452 })
453
454 /**
455  * Performs a locked double-dispatch based on the layout type of an object.
456  */
457 #define LOV_2DISPATCH(obj, op, ...)                     \
458         LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
459
460 #define LOV_2DISPATCH_VOID(obj, op, ...)                                \
461 do {                                                                    \
462         struct lov_object                      *__obj = (obj);          \
463         enum lov_layout_type                    __llt;                  \
464                                                                         \
465         if (__obj->lo_owner != cfs_current())                           \
466                 down_read(&__obj->lo_type_guard);                       \
467         __llt = __obj->lo_type;                                         \
468         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
469         lov_dispatch[__llt].op(__VA_ARGS__);                            \
470         if (__obj->lo_owner != cfs_current())                           \
471                 up_read(&__obj->lo_type_guard);                         \
472 } while (0)
473
474 static int lov_layout_change(const struct lu_env *env,
475                              struct lov_object *obj, enum lov_layout_type llt,
476                              const struct cl_object_conf *conf)
477 {
478         int result;
479         union lov_layout_state       *state = &lov_env_info(env)->lti_state;
480         const struct lov_layout_operations *old_ops;
481         const struct lov_layout_operations *new_ops;
482
483         LASSERT(0 <= obj->lo_type && obj->lo_type < ARRAY_SIZE(lov_dispatch));
484         LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
485         ENTRY;
486
487         old_ops = &lov_dispatch[obj->lo_type];
488         new_ops = &lov_dispatch[llt];
489
490         result = new_ops->llo_init(env, lu2lov_dev(obj->lo_cl.co_lu.lo_dev),
491                                    obj, conf, state);
492         if (result == 0) {
493                 struct cl_object_header *hdr = cl_object_header(&obj->lo_cl);
494                 void                    *cookie;
495                 struct lu_env           *nested;
496                 int                      refcheck;
497
498                 cookie = cl_env_reenter();
499                 nested = cl_env_get(&refcheck);
500                 if (!IS_ERR(nested))
501                         cl_object_prune(nested, &obj->lo_cl);
502                 else
503                         result = PTR_ERR(nested);
504                 cl_env_put(nested, &refcheck);
505                 cl_env_reexit(cookie);
506
507                 old_ops->llo_fini(env, obj, &obj->u);
508                 LASSERT(list_empty(&hdr->coh_locks));
509                 LASSERT(hdr->coh_tree.rnode == NULL);
510                 LASSERT(hdr->coh_pages == 0);
511
512                 new_ops->llo_install(env, obj, state);
513                 obj->lo_type = llt;
514         } else
515                 new_ops->llo_fini(env, obj, state);
516         RETURN(result);
517 }
518
519 /*****************************************************************************
520  *
521  * Lov object operations.
522  *
523  */
524
525 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
526                     const struct lu_object_conf *conf)
527 {
528         struct lov_device            *dev   = lu2lov_dev(obj->lo_dev);
529         struct lov_object            *lov   = lu2lov(obj);
530         const struct cl_object_conf  *cconf = lu2cl_conf(conf);
531         union  lov_layout_state      *set   = &lov_env_info(env)->lti_state;
532         const struct lov_layout_operations *ops;
533         int result;
534
535         ENTRY;
536         init_rwsem(&lov->lo_type_guard);
537
538         /* no locking is necessary, as object is being created */
539         lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
540         ops = &lov_dispatch[lov->lo_type];
541         result = ops->llo_init(env, dev, lov, cconf, set);
542         if (result == 0)
543                 ops->llo_install(env, lov, set);
544         else
545                 ops->llo_fini(env, lov, set);
546         RETURN(result);
547 }
548
549 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
550                         const struct cl_object_conf *conf)
551 {
552         struct lov_object *lov = cl2lov(obj);
553         int result;
554
555         ENTRY;
556         /*
557          * Currently only LLT_EMPTY -> LLT_RAID0 transition is supported.
558          */
559         LASSERT(lov->lo_owner != cfs_current());
560         down_write(&lov->lo_type_guard);
561         LASSERT(lov->lo_owner == NULL);
562         lov->lo_owner = cfs_current();
563         if (lov->lo_type == LLT_EMPTY && conf->u.coc_md->lsm != NULL)
564                 result = lov_layout_change(env, lov, LLT_RAID0, conf);
565         else
566                 result = -EOPNOTSUPP;
567         lov->lo_owner = NULL;
568         up_write(&lov->lo_type_guard);
569         RETURN(result);
570 }
571
572 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
573 {
574         struct lov_object *lov = lu2lov(obj);
575
576         ENTRY;
577         LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
578         EXIT;
579 }
580
581 static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
582 {
583         struct lov_object *lov = lu2lov(obj);
584
585         ENTRY;
586         LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
587         lu_object_fini(obj);
588         OBD_SLAB_FREE_PTR(lov, lov_object_kmem);
589         EXIT;
590 }
591
592 static int lov_object_print(const struct lu_env *env, void *cookie,
593                             lu_printer_t p, const struct lu_object *o)
594 {
595         return LOV_2DISPATCH(lu2lov(o), llo_print, env, cookie, p, o);
596 }
597
598 struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj,
599                               struct cl_page *page, cfs_page_t *vmpage)
600 {
601         return LOV_2DISPATCH(cl2lov(obj),
602                              llo_page_init, env, obj, page, vmpage);
603 }
604
605 /**
606  * Implements cl_object_operations::clo_io_init() method for lov
607  * layer. Dispatches to the appropriate layout io initialization method.
608  */
609 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
610                 struct cl_io *io)
611 {
612         CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
613         /*
614          * Do not take lock in case of CIT_MISC io, because
615          *
616          *     - if this is an io for a glimpse, then we don't care;
617          *
618          *     - if this not a glimpse (writepage or lock cancellation), then
619          *       layout change cannot happen because a page or a lock
620          *       already exist; and
621          *
622          *     - lock ordering (lock mutex nests within layout rw-semaphore)
623          *       is obeyed in case of lock cancellation.
624          */
625         return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
626                                      io->ci_type != CIT_MISC, env, obj, io);
627 }
628
629 /**
630  * An implementation of cl_object_operations::clo_attr_get() method for lov
631  * layer. For raid0 layout this collects and merges attributes of all
632  * sub-objects.
633  */
634 static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
635                         struct cl_attr *attr)
636 {
637         /* do not take lock, as this function is called under a
638          * spin-lock. Layout is protected from changing by ongoing IO. */
639         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
640 }
641
642 static int lov_attr_set(const struct lu_env *env, struct cl_object *obj,
643                         const struct cl_attr *attr, unsigned valid)
644 {
645         /*
646          * No dispatch is required here, as no layout implements this.
647          */
648         return 0;
649 }
650
651 int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
652                   struct cl_lock *lock, const struct cl_io *io)
653 {
654         return LOV_2DISPATCH(cl2lov(obj), llo_lock_init, env, obj, lock, io);
655 }
656
657 static const struct cl_object_operations lov_ops = {
658         .coo_page_init = lov_page_init,
659         .coo_lock_init = lov_lock_init,
660         .coo_io_init   = lov_io_init,
661         .coo_attr_get  = lov_attr_get,
662         .coo_attr_set  = lov_attr_set,
663         .coo_conf_set  = lov_conf_set
664 };
665
666 static const struct lu_object_operations lov_lu_obj_ops = {
667         .loo_object_init      = lov_object_init,
668         .loo_object_delete    = lov_object_delete,
669         .loo_object_release   = NULL,
670         .loo_object_free      = lov_object_free,
671         .loo_object_print     = lov_object_print,
672         .loo_object_invariant = NULL
673 };
674
675 struct lu_object *lov_object_alloc(const struct lu_env *env,
676                                    const struct lu_object_header *_,
677                                    struct lu_device *dev)
678 {
679         struct lov_object *lov;
680         struct lu_object  *obj;
681
682         ENTRY;
683         OBD_SLAB_ALLOC_PTR(lov, lov_object_kmem);
684         if (lov != NULL) {
685                 obj = lov2lu(lov);
686                 lu_object_init(obj, NULL, dev);
687                 lov->lo_cl.co_ops = &lov_ops;
688                 lov->lo_type = -1; /* invalid, to catch uninitialized type */
689                 /*
690                  * object io operation vector (cl_object::co_iop) is installed
691                  * later in lov_object_init(), as different vectors are used
692                  * for object with different layouts.
693                  */
694                 obj->lo_ops = &lov_lu_obj_ops;
695         } else
696                 obj = NULL;
697         RETURN(obj);
698 }
699
700 /** @} lov */