Whamcloud - gitweb
4c15b9fbffa25753e8949ef344e8679b41f85335
[fs/lustre-release.git] / lustre / lov / lov_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * Implementation of cl_object for LOV layer.
39  *
40  *   Author: Nikita Danilov <nikita.danilov@sun.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LOV
44
45 #include "lov_cl_internal.h"
46
47 /** \addtogroup lov
48  *  @{
49  */
50
51 /*****************************************************************************
52  *
53  * Layout operations.
54  *
55  */
56
57 struct lov_layout_operations {
58         int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
59                         struct lov_object *lov,
60                         const struct cl_object_conf *conf,
61                         union lov_layout_state *state);
62         void (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
63                            union lov_layout_state *state);
64         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
65                          union lov_layout_state *state);
66         void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
67                             union lov_layout_state *state);
68         int  (*llo_print)(const struct lu_env *env, void *cookie,
69                           lu_printer_t p, const struct lu_object *o);
70         struct cl_page *(*llo_page_init)(const struct lu_env *env,
71                                          struct cl_object *obj,
72                                          struct cl_page *page,
73                                          cfs_page_t *vmpage);
74         int  (*llo_lock_init)(const struct lu_env *env,
75                               struct cl_object *obj, struct cl_lock *lock,
76                               const struct cl_io *io);
77         int  (*llo_io_init)(const struct lu_env *env,
78                             struct cl_object *obj, struct cl_io *io);
79         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
80                             struct cl_attr *attr);
81 };
82
83 /*****************************************************************************
84  *
85  * Lov object layout operations.
86  *
87  */
88
89 static void lov_install_empty(const struct lu_env *env,
90                               struct lov_object *lov,
91                               union  lov_layout_state *state)
92 {
93         /*
94          * File without objects.
95          */
96 }
97
98 static int lov_init_empty(const struct lu_env *env,
99                           struct lov_device *dev, struct lov_object *lov,
100                           const struct cl_object_conf *conf,
101                           union  lov_layout_state *state)
102 {
103         return 0;
104 }
105
106 static void lov_install_raid0(const struct lu_env *env,
107                               struct lov_object *lov,
108                               union  lov_layout_state *state)
109 {
110         lov->u = *state;
111 }
112
113 static struct cl_object *lov_sub_find(const struct lu_env *env,
114                                       struct cl_device *dev,
115                                       const struct lu_fid *fid,
116                                       const struct cl_object_conf *conf)
117 {
118         struct lu_object *o;
119
120         ENTRY;
121         o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
122         LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
123         RETURN(lu2cl(o));
124 }
125
126 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
127                         struct cl_object *stripe,
128                         struct lov_layout_raid0 *r0, int idx)
129 {
130         struct cl_object_header *hdr;
131         struct cl_object_header *subhdr;
132         struct cl_object_header *parent;
133         struct lov_oinfo        *oinfo;
134         int result;
135
136         hdr    = cl_object_header(lov2cl(lov));
137         subhdr = cl_object_header(stripe);
138         parent = subhdr->coh_parent;
139
140         oinfo = r0->lo_lsm->lsm_oinfo[idx];
141         CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" seq: "LPU64
142                " idx: %d gen: %d\n",
143                PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
144                PFID(&hdr->coh_lu.loh_fid), hdr,
145                oinfo->loi_id, oinfo->loi_seq,
146                oinfo->loi_ost_idx, oinfo->loi_ost_gen);
147
148         if (parent == NULL) {
149                 subhdr->coh_parent = hdr;
150                 subhdr->coh_nesting = hdr->coh_nesting + 1;
151                 lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
152                 r0->lo_sub[idx] = cl2lovsub(stripe);
153                 r0->lo_sub[idx]->lso_super = lov;
154                 r0->lo_sub[idx]->lso_index = idx;
155                 result = 0;
156         } else {
157                 CERROR("Stripe is already owned by other file (%d).\n", idx);
158                 LU_OBJECT_DEBUG(D_ERROR, env, &stripe->co_lu, "\n");
159                 LU_OBJECT_DEBUG(D_ERROR, env, lu_object_top(&parent->coh_lu),
160                                 "old\n");
161                 LU_OBJECT_HEADER(D_ERROR, env, lov2lu(lov), "new\n");
162                 cl_object_put(env, stripe);
163                 result = -EIO;
164         }
165         return result;
166 }
167
168 static int lov_init_raid0(const struct lu_env *env,
169                           struct lov_device *dev, struct lov_object *lov,
170                           const struct cl_object_conf *conf,
171                           union  lov_layout_state *state)
172 {
173         int result;
174         int i;
175
176         struct cl_object        *stripe;
177         struct lov_thread_info  *lti     = lov_env_info(env);
178         struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
179         struct lov_stripe_md    *lsm     = conf->u.coc_md->lsm;
180         struct lu_fid           *ofid    = &lti->lti_fid;
181         struct lov_layout_raid0 *r0      = &state->raid0;
182
183         ENTRY;
184         r0->lo_nr  = conf->u.coc_md->lsm->lsm_stripe_count;
185         r0->lo_lsm = conf->u.coc_md->lsm;
186         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
187
188         OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
189         if (r0->lo_sub != NULL) {
190                 result = 0;
191                 subconf->coc_inode = conf->coc_inode;
192                 cfs_spin_lock_init(&r0->lo_sub_lock);
193                 /*
194                  * Create stripe cl_objects.
195                  */
196                 for (i = 0; i < r0->lo_nr && result == 0; ++i) {
197                         struct cl_device *subdev;
198                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
199                         int ost_idx = oinfo->loi_ost_idx;
200
201                         fid_ostid_unpack(ofid, &oinfo->loi_oi,
202                                          oinfo->loi_ost_idx);
203                         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
204                         subconf->u.coc_oinfo = oinfo;
205                         LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
206                         stripe = lov_sub_find(env, subdev, ofid, subconf);
207                         if (!IS_ERR(stripe))
208                                 result = lov_init_sub(env, lov, stripe, r0, i);
209                         else
210                                 result = PTR_ERR(stripe);
211                 }
212         } else
213                 result = -ENOMEM;
214         RETURN(result);
215 }
216
217 static void lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
218                              union lov_layout_state *state)
219 {
220         LASSERT(lov->lo_type == LLT_EMPTY);
221 }
222
223 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
224                                struct lovsub_object *los, int idx)
225 {
226         struct cl_object        *sub;
227         struct lov_layout_raid0 *r0;
228         struct lu_site          *site;
229         struct lu_site_bkt_data *bkt;
230         cfs_waitlink_t          *waiter;
231
232         r0  = &lov->u.raid0;
233         LASSERT(r0->lo_sub[idx] == los);
234
235         sub  = lovsub2cl(los);
236         site = sub->co_lu.lo_dev->ld_site;
237         bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
238
239         cl_object_kill(env, sub);
240         /* release a reference to the sub-object and ... */
241         lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
242         cl_object_put(env, sub);
243
244         /* ... wait until it is actually destroyed---sub-object clears its
245          * ->lo_sub[] slot in lovsub_object_fini() */
246         if (r0->lo_sub[idx] == los) {
247                 waiter = &lov_env_info(env)->lti_waiter;
248                 cfs_waitlink_init(waiter);
249                 cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
250                 cfs_set_current_state(CFS_TASK_UNINT);
251                 while (1) {
252                         /* this wait-queue is signaled at the end of
253                          * lu_object_free(). */
254                         cfs_set_current_state(CFS_TASK_UNINT);
255                         cfs_spin_lock(&r0->lo_sub_lock);
256                         if (r0->lo_sub[idx] == los) {
257                                 cfs_spin_unlock(&r0->lo_sub_lock);
258                                 cfs_waitq_wait(waiter, CFS_TASK_UNINT);
259                         } else {
260                                 cfs_spin_unlock(&r0->lo_sub_lock);
261                                 cfs_set_current_state(CFS_TASK_RUNNING);
262                                 break;
263                         }
264                 }
265                 cfs_waitq_del(&bkt->lsb_marche_funebre, waiter);
266         }
267         LASSERT(r0->lo_sub[idx] == NULL);
268 }
269
270 static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
271                              union lov_layout_state *state)
272 {
273         struct lov_layout_raid0 *r0 = &state->raid0;
274         int                      i;
275
276         ENTRY;
277         if (r0->lo_sub != NULL) {
278                 for (i = 0; i < r0->lo_nr; ++i) {
279                         struct lovsub_object *los = r0->lo_sub[i];
280
281                         if (los != NULL)
282                                 /*
283                                  * If top-level object is to be evicted from
284                                  * the cache, so are its sub-objects.
285                                  */
286                                 lov_subobject_kill(env, lov, los, i);
287                 }
288         }
289         EXIT;
290 }
291
292 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
293                            union lov_layout_state *state)
294 {
295         LASSERT(lov->lo_type == LLT_EMPTY);
296 }
297
298 static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
299                            union lov_layout_state *state)
300 {
301         struct lov_layout_raid0 *r0 = &state->raid0;
302
303         ENTRY;
304         if (r0->lo_sub != NULL) {
305                 OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
306                 r0->lo_sub = NULL;
307         }
308         EXIT;
309 }
310
311 static int lov_print_empty(const struct lu_env *env, void *cookie,
312                            lu_printer_t p, const struct lu_object *o)
313 {
314         (*p)(env, cookie, "empty\n");
315         return 0;
316 }
317
318 static int lov_print_raid0(const struct lu_env *env, void *cookie,
319                            lu_printer_t p, const struct lu_object *o)
320 {
321         struct lov_object       *lov = lu2lov(o);
322         struct lov_layout_raid0 *r0  = lov_r0(lov);
323         int i;
324
325         (*p)(env, cookie, "stripes: %d:\n", r0->lo_nr);
326         for (i = 0; i < r0->lo_nr; ++i) {
327                 struct lu_object *sub;
328
329                 if (r0->lo_sub[i] != NULL) {
330                         sub = lovsub2lu(r0->lo_sub[i]);
331                         lu_object_print(env, cookie, p, sub);
332                 } else
333                         (*p)(env, cookie, "sub %d absent\n", i);
334         }
335         return 0;
336 }
337
338 /**
339  * Implements cl_object_operations::coo_attr_get() method for an object
340  * without stripes (LLT_EMPTY layout type).
341  *
342  * The only attributes this layer is authoritative in this case is
343  * cl_attr::cat_blocks---it's 0.
344  */
345 static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
346                               struct cl_attr *attr)
347 {
348         attr->cat_blocks = 0;
349         return 0;
350 }
351
352 static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
353                               struct cl_attr *attr)
354 {
355         struct lov_object       *lov = cl2lov(obj);
356         struct lov_layout_raid0 *r0 = lov_r0(lov);
357         struct lov_stripe_md    *lsm = lov->u.raid0.lo_lsm;
358         struct ost_lvb          *lvb = &lov_env_info(env)->lti_lvb;
359         __u64                    kms;
360         int                      result = 0;
361
362         ENTRY;
363         if (!r0->lo_attr_valid) {
364                 /*
365                  * Fill LVB with attributes already initialized by the upper
366                  * layer.
367                  */
368                 cl_attr2lvb(lvb, attr);
369                 kms = attr->cat_kms;
370
371                 /*
372                  * XXX that should be replaced with a loop over sub-objects,
373                  * doing cl_object_attr_get() on them. But for now, let's
374                  * reuse old lov code.
375                  */
376
377                 /*
378                  * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
379                  * happy. It's not needed, because new code uses
380                  * ->coh_attr_guard spin-lock to protect consistency of
381                  * sub-object attributes.
382                  */
383                 lov_stripe_lock(lsm);
384                 result = lov_merge_lvb_kms(lsm, lvb, &kms);
385                 lov_stripe_unlock(lsm);
386                 if (result == 0) {
387                         cl_lvb2attr(attr, lvb);
388                         attr->cat_kms = kms;
389                         r0->lo_attr_valid = 1;
390                         r0->lo_attr = *attr;
391                 }
392         } else
393                 *attr = r0->lo_attr;
394         RETURN(result);
395 }
396
397 const static struct lov_layout_operations lov_dispatch[] = {
398         [LLT_EMPTY] = {
399                 .llo_init      = lov_init_empty,
400                 .llo_delete    = lov_delete_empty,
401                 .llo_fini      = lov_fini_empty,
402                 .llo_install   = lov_install_empty,
403                 .llo_print     = lov_print_empty,
404                 .llo_page_init = lov_page_init_empty,
405                 .llo_lock_init = NULL,
406                 .llo_io_init   = lov_io_init_empty,
407                 .llo_getattr   = lov_attr_get_empty
408         },
409         [LLT_RAID0] = {
410                 .llo_init      = lov_init_raid0,
411                 .llo_delete    = lov_delete_raid0,
412                 .llo_fini      = lov_fini_raid0,
413                 .llo_install   = lov_install_raid0,
414                 .llo_print     = lov_print_raid0,
415                 .llo_page_init = lov_page_init_raid0,
416                 .llo_lock_init = lov_lock_init_raid0,
417                 .llo_io_init   = lov_io_init_raid0,
418                 .llo_getattr   = lov_attr_get_raid0
419         }
420 };
421
422
423 /**
424  * Performs a double-dispatch based on the layout type of an object.
425  */
426 #define LOV_2DISPATCH_NOLOCK(obj, op, ...)                              \
427 ({                                                                      \
428         struct lov_object                      *__obj = (obj);          \
429         enum lov_layout_type                    __llt;                  \
430                                                                         \
431         __llt = __obj->lo_type;                                         \
432         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
433         lov_dispatch[__llt].op(__VA_ARGS__);                            \
434 })
435
436 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                       \
437 ({                                                                      \
438         struct lov_object                      *__obj = (obj);          \
439         int                                     __lock = !!(lock);      \
440         typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;               \
441                                                                         \
442         __lock &= __obj->lo_owner != cfs_current();                     \
443         if (__lock)                                                     \
444                 cfs_down_read(&__obj->lo_type_guard);                   \
445         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
446         if (__lock)                                                     \
447                 cfs_up_read(&__obj->lo_type_guard);                     \
448         __result;                                                       \
449 })
450
451 /**
452  * Performs a locked double-dispatch based on the layout type of an object.
453  */
454 #define LOV_2DISPATCH(obj, op, ...)                     \
455         LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
456
457 #define LOV_2DISPATCH_VOID(obj, op, ...)                                \
458 do {                                                                    \
459         struct lov_object                      *__obj = (obj);          \
460         enum lov_layout_type                    __llt;                  \
461                                                                         \
462         if (__obj->lo_owner != cfs_current())                           \
463                 cfs_down_read(&__obj->lo_type_guard);                   \
464         __llt = __obj->lo_type;                                         \
465         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
466         lov_dispatch[__llt].op(__VA_ARGS__);                            \
467         if (__obj->lo_owner != cfs_current())                           \
468                 cfs_up_read(&__obj->lo_type_guard);                     \
469 } while (0)
470
471 static int lov_layout_change(const struct lu_env *env,
472                              struct lov_object *obj, enum lov_layout_type llt,
473                              const struct cl_object_conf *conf)
474 {
475         int result;
476         union lov_layout_state       *state = &lov_env_info(env)->lti_state;
477         const struct lov_layout_operations *old_ops;
478         const struct lov_layout_operations *new_ops;
479
480         LASSERT(0 <= obj->lo_type && obj->lo_type < ARRAY_SIZE(lov_dispatch));
481         LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
482         ENTRY;
483
484         old_ops = &lov_dispatch[obj->lo_type];
485         new_ops = &lov_dispatch[llt];
486
487         result = new_ops->llo_init(env, lu2lov_dev(obj->lo_cl.co_lu.lo_dev),
488                                    obj, conf, state);
489         if (result == 0) {
490                 struct cl_object_header *hdr = cl_object_header(&obj->lo_cl);
491                 void                    *cookie;
492                 struct lu_env           *nested;
493                 int                      refcheck;
494
495                 cookie = cl_env_reenter();
496                 nested = cl_env_get(&refcheck);
497                 if (!IS_ERR(nested))
498                         cl_object_prune(nested, &obj->lo_cl);
499                 else
500                         result = PTR_ERR(nested);
501                 cl_env_put(nested, &refcheck);
502                 cl_env_reexit(cookie);
503
504                 old_ops->llo_fini(env, obj, &obj->u);
505                 LASSERT(cfs_list_empty(&hdr->coh_locks));
506                 LASSERT(hdr->coh_tree.rnode == NULL);
507                 LASSERT(hdr->coh_pages == 0);
508
509                 new_ops->llo_install(env, obj, state);
510                 obj->lo_type = llt;
511         } else
512                 new_ops->llo_fini(env, obj, state);
513         RETURN(result);
514 }
515
516 /*****************************************************************************
517  *
518  * Lov object operations.
519  *
520  */
521
522 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
523                     const struct lu_object_conf *conf)
524 {
525         struct lov_device            *dev   = lu2lov_dev(obj->lo_dev);
526         struct lov_object            *lov   = lu2lov(obj);
527         const struct cl_object_conf  *cconf = lu2cl_conf(conf);
528         union  lov_layout_state      *set   = &lov_env_info(env)->lti_state;
529         const struct lov_layout_operations *ops;
530         int result;
531
532         ENTRY;
533         cfs_init_rwsem(&lov->lo_type_guard);
534
535         /* no locking is necessary, as object is being created */
536         lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
537         ops = &lov_dispatch[lov->lo_type];
538         result = ops->llo_init(env, dev, lov, cconf, set);
539         if (result == 0)
540                 ops->llo_install(env, lov, set);
541         else
542                 ops->llo_fini(env, lov, set);
543         RETURN(result);
544 }
545
546 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
547                         const struct cl_object_conf *conf)
548 {
549         struct lov_object *lov = cl2lov(obj);
550         int result;
551
552         ENTRY;
553         /*
554          * Currently only LLT_EMPTY -> LLT_RAID0 transition is supported.
555          */
556         LASSERT(lov->lo_owner != cfs_current());
557         cfs_down_write(&lov->lo_type_guard);
558         LASSERT(lov->lo_owner == NULL);
559         lov->lo_owner = cfs_current();
560         if (lov->lo_type == LLT_EMPTY && conf->u.coc_md->lsm != NULL)
561                 result = lov_layout_change(env, lov, LLT_RAID0, conf);
562         else
563                 result = -EOPNOTSUPP;
564         lov->lo_owner = NULL;
565         cfs_up_write(&lov->lo_type_guard);
566         RETURN(result);
567 }
568
569 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
570 {
571         struct lov_object *lov = lu2lov(obj);
572
573         ENTRY;
574         LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
575         EXIT;
576 }
577
578 static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
579 {
580         struct lov_object *lov = lu2lov(obj);
581
582         ENTRY;
583         LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
584         lu_object_fini(obj);
585         OBD_SLAB_FREE_PTR(lov, lov_object_kmem);
586         EXIT;
587 }
588
589 static int lov_object_print(const struct lu_env *env, void *cookie,
590                             lu_printer_t p, const struct lu_object *o)
591 {
592         return LOV_2DISPATCH(lu2lov(o), llo_print, env, cookie, p, o);
593 }
594
595 struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj,
596                               struct cl_page *page, cfs_page_t *vmpage)
597 {
598         return LOV_2DISPATCH(cl2lov(obj),
599                              llo_page_init, env, obj, page, vmpage);
600 }
601
602 /**
603  * Implements cl_object_operations::clo_io_init() method for lov
604  * layer. Dispatches to the appropriate layout io initialization method.
605  */
606 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
607                 struct cl_io *io)
608 {
609         CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
610         /*
611          * Do not take lock in case of CIT_MISC io, because
612          *
613          *     - if this is an io for a glimpse, then we don't care;
614          *
615          *     - if this not a glimpse (writepage or lock cancellation), then
616          *       layout change cannot happen because a page or a lock
617          *       already exist; and
618          *
619          *     - lock ordering (lock mutex nests within layout rw-semaphore)
620          *       is obeyed in case of lock cancellation.
621          */
622         return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
623                                      io->ci_type != CIT_MISC, env, obj, io);
624 }
625
626 /**
627  * An implementation of cl_object_operations::clo_attr_get() method for lov
628  * layer. For raid0 layout this collects and merges attributes of all
629  * sub-objects.
630  */
631 static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
632                         struct cl_attr *attr)
633 {
634         /* do not take lock, as this function is called under a
635          * spin-lock. Layout is protected from changing by ongoing IO. */
636         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
637 }
638
639 static int lov_attr_set(const struct lu_env *env, struct cl_object *obj,
640                         const struct cl_attr *attr, unsigned valid)
641 {
642         /*
643          * No dispatch is required here, as no layout implements this.
644          */
645         return 0;
646 }
647
648 int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
649                   struct cl_lock *lock, const struct cl_io *io)
650 {
651         return LOV_2DISPATCH(cl2lov(obj), llo_lock_init, env, obj, lock, io);
652 }
653
654 static const struct cl_object_operations lov_ops = {
655         .coo_page_init = lov_page_init,
656         .coo_lock_init = lov_lock_init,
657         .coo_io_init   = lov_io_init,
658         .coo_attr_get  = lov_attr_get,
659         .coo_attr_set  = lov_attr_set,
660         .coo_conf_set  = lov_conf_set
661 };
662
663 static const struct lu_object_operations lov_lu_obj_ops = {
664         .loo_object_init      = lov_object_init,
665         .loo_object_delete    = lov_object_delete,
666         .loo_object_release   = NULL,
667         .loo_object_free      = lov_object_free,
668         .loo_object_print     = lov_object_print,
669         .loo_object_invariant = NULL
670 };
671
672 struct lu_object *lov_object_alloc(const struct lu_env *env,
673                                    const struct lu_object_header *unused,
674                                    struct lu_device *dev)
675 {
676         struct lov_object *lov;
677         struct lu_object  *obj;
678
679         ENTRY;
680         OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, CFS_ALLOC_IO);
681         if (lov != NULL) {
682                 obj = lov2lu(lov);
683                 lu_object_init(obj, NULL, dev);
684                 lov->lo_cl.co_ops = &lov_ops;
685                 lov->lo_type = -1; /* invalid, to catch uninitialized type */
686                 /*
687                  * object io operation vector (cl_object::co_iop) is installed
688                  * later in lov_object_init(), as different vectors are used
689                  * for object with different layouts.
690                  */
691                 obj->lo_ops = &lov_lu_obj_ops;
692         } else
693                 obj = NULL;
694         RETURN(obj);
695 }
696
697 /** @} lov */