Whamcloud - gitweb
8ef14135422117b24e44f2374563babca33f5134
[fs/lustre-release.git] / lustre / lov / lov_object.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_object for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 /** \addtogroup lov lov @{ */
42
43 #define DEBUG_SUBSYSTEM S_LOV
44
45 #include "lov_cl_internal.h"
46
47 /*****************************************************************************
48  *
49  * Layout operations.
50  *
51  */
52
53 struct lov_layout_operations {
54         int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
55                         struct lov_object *lov,
56                         const struct cl_object_conf *conf,
57                         union lov_layout_state *state);
58         void (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
59                            union lov_layout_state *state);
60         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
61                          union lov_layout_state *state);
62         void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
63                             union lov_layout_state *state);
64         int  (*llo_print)(const struct lu_env *env, void *cookie,
65                           lu_printer_t p, const struct lu_object *o);
66         struct cl_page *(*llo_page_init)(const struct lu_env *env,
67                                          struct cl_object *obj,
68                                          struct cl_page *page,
69                                          cfs_page_t *vmpage);
70         int  (*llo_lock_init)(const struct lu_env *env,
71                               struct cl_object *obj, struct cl_lock *lock,
72                               const struct cl_io *io);
73         int  (*llo_io_init)(const struct lu_env *env,
74                             struct cl_object *obj, struct cl_io *io);
75         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
76                             struct cl_attr *attr);
77 };
78
79 /*****************************************************************************
80  *
81  * Lov object layout operations.
82  *
83  */
84
85 static void lov_install_empty(const struct lu_env *env,
86                               struct lov_object *lov,
87                               union  lov_layout_state *state)
88 {
89         /*
90          * File without objects.
91          */
92 }
93
94 static int lov_init_empty(const struct lu_env *env,
95                           struct lov_device *dev, struct lov_object *lov,
96                           const struct cl_object_conf *conf,
97                           union  lov_layout_state *state)
98 {
99         return 0;
100 }
101
102 static void lov_install_raid0(const struct lu_env *env,
103                               struct lov_object *lov,
104                               union  lov_layout_state *state)
105 {
106         lov->u = *state;
107 }
108
109 static void oinfo_get_fid(const struct lov_oinfo *oinfo, struct lu_fid *fid)
110 {
111         __u64 idx = oinfo->loi_id;
112
113         /* See idif definition in wiki:CMD3_interoperability_architecture */
114
115         LASSERT(oinfo->loi_gr < 1ULL << 16);
116         LASSERT(oinfo->loi_id < 1ULL << 49);
117         ENTRY;
118
119         /*
120          * Now that the fid of stripe is not unique now, ost_idx have to
121          * be used to make it unique. This is ok because the stripe fids
122          * are just used in client side(to locate the objects). -jay
123          */
124         fid->f_seq = ((__u64)oinfo->loi_ost_idx) << 32 |
125                      oinfo->loi_gr << 16 | idx >> 32;
126         fid->f_oid = idx; /* truncated to 32 bits by assignment */
127         fid->f_ver = 0;
128         EXIT;
129 }
130
131 static struct cl_object *lov_sub_find(const struct lu_env *env,
132                                       struct cl_device *dev,
133                                       const struct lu_fid *fid,
134                                       const struct cl_object_conf *conf)
135 {
136         struct lu_object *o;
137
138         ENTRY;
139         o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
140         LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
141         RETURN(lu2cl(o));
142 }
143
144 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
145                         struct cl_object *stripe,
146                         struct lov_layout_raid0 *r0, int idx)
147 {
148         struct cl_object_header *hdr;
149         struct cl_object_header *subhdr;
150         struct cl_object_header *parent;
151         struct lov_oinfo        *oinfo;
152         int result;
153
154         hdr    = cl_object_header(lov2cl(lov));
155         subhdr = cl_object_header(stripe);
156         parent = subhdr->coh_parent;
157
158         oinfo = r0->lo_lsm->lsm_oinfo[idx];
159         CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" gr: "LPU64
160                " idx: %d gen: %d\n",
161                PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
162                PFID(&hdr->coh_lu.loh_fid), hdr,
163                oinfo->loi_id, oinfo->loi_gr,
164                oinfo->loi_ost_idx, oinfo->loi_ost_gen);
165
166         if (parent == NULL) {
167                 subhdr->coh_parent = hdr;
168                 subhdr->coh_nesting = hdr->coh_nesting + 1;
169                 lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
170                 r0->lo_sub[idx] = cl2lovsub(stripe);
171                 r0->lo_sub[idx]->lso_super = lov;
172                 r0->lo_sub[idx]->lso_index = idx;
173                 result = 0;
174         } else {
175                 CERROR("Stripe is already owned by other file (%i).\n", idx);
176                 LU_OBJECT_DEBUG(D_ERROR, env, &stripe->co_lu, "\n");
177                 LU_OBJECT_DEBUG(D_ERROR, env, lu_object_top(&parent->coh_lu),
178                                 "old\n");
179                 LU_OBJECT_HEADER(D_ERROR, env, lov2lu(lov), "new\n");
180                 cl_object_put(env, stripe);
181                 result = -EIO;
182         }
183         return result;
184 }
185
186 static int lov_init_raid0(const struct lu_env *env,
187                           struct lov_device *dev, struct lov_object *lov,
188                           const struct cl_object_conf *conf,
189                           union  lov_layout_state *state)
190 {
191         int result;
192         int i;
193
194         struct cl_object        *stripe;
195         struct lov_thread_info  *lti     = lov_env_info(env);
196         struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
197         struct lov_stripe_md    *lsm     = conf->u.coc_md->lsm;
198         struct lu_fid           *ofid    = &lti->lti_fid;
199         struct lov_layout_raid0 *r0      = &state->raid0;
200
201         ENTRY;
202         r0->lo_nr  = conf->u.coc_md->lsm->lsm_stripe_count;
203         r0->lo_lsm = conf->u.coc_md->lsm;
204         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
205
206         OBD_ALLOC(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
207         if (r0->lo_sub != NULL) {
208                 result = 0;
209                 subconf->coc_inode = conf->coc_inode;
210                 /*
211                  * Create stripe cl_objects.
212                  */
213                 for (i = 0; i < r0->lo_nr && result == 0; ++i) {
214                         struct cl_device *subdev;
215                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
216                         int ost_idx = oinfo->loi_ost_idx;
217
218                         oinfo_get_fid(oinfo, ofid);
219                         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
220                         subconf->u.coc_oinfo = oinfo;
221                         LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
222                         stripe = lov_sub_find(env, subdev, ofid, subconf);
223                         if (!IS_ERR(stripe))
224                                 result = lov_init_sub(env, lov, stripe, r0, i);
225                         else
226                                 result = PTR_ERR(stripe);
227                 }
228         } else
229                 result = -ENOMEM;
230         RETURN(result);
231 }
232
233 static void lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
234                              union lov_layout_state *state)
235 {
236         LASSERT(lov->lo_type == LLT_EMPTY);
237 }
238
239 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
240                                struct lovsub_object *los, int idx)
241 {
242         struct cl_object        *sub;
243         struct lov_layout_raid0 *r0;
244         struct lu_site          *site;
245         cfs_waitlink_t          *waiter;
246
247         r0  = &lov->u.raid0;
248         sub = lovsub2cl(los);
249         LASSERT(r0->lo_sub[idx] == los);
250
251         cl_object_kill(env, sub);
252         /* release a reference to the sub-object and ... */
253         lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
254         cl_object_put(env, sub);
255
256         /* ... wait until it is actually destroyed---sub-object clears its
257          * ->lo_sub[] slot in lovsub_object_fini() */
258         if (r0->lo_sub[idx] == los) {
259                 waiter = &lov_env_info(env)->lti_waiter;
260                 site   = sub->co_lu.lo_dev->ld_site;
261                 cfs_waitlink_init(waiter);
262                 cfs_waitq_add(&site->ls_marche_funebre, waiter);
263                 set_current_state(CFS_TASK_UNINT);
264
265                 while (r0->lo_sub[idx] == los)
266                         /* this wait-queue is signaled at the end of
267                          * lu_object_free(). */
268                         cfs_waitq_wait(waiter, CFS_TASK_UNINT);
269                 cfs_waitq_del(&site->ls_marche_funebre, waiter);
270         }
271         LASSERT(r0->lo_sub[idx] == NULL);
272 }
273
274 static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
275                              union lov_layout_state *state)
276 {
277         struct lov_layout_raid0 *r0 = &state->raid0;
278         int                      i;
279
280         ENTRY;
281         if (r0->lo_sub != NULL) {
282                 for (i = 0; i < r0->lo_nr; ++i) {
283                         struct lovsub_object *los = r0->lo_sub[i];
284
285                         if (los != NULL)
286                                 /*
287                                  * If top-level object is to be evicted from
288                                  * the cache, so are its sub-objects.
289                                  */
290                                 lov_subobject_kill(env, lov, los, i);
291                 }
292         }
293         EXIT;
294 }
295
296 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
297                            union lov_layout_state *state)
298 {
299         LASSERT(lov->lo_type == LLT_EMPTY);
300 }
301
302 static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
303                            union lov_layout_state *state)
304 {
305         struct lov_layout_raid0 *r0 = &state->raid0;
306
307         ENTRY;
308         if (r0->lo_sub != NULL) {
309                 OBD_FREE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
310                 r0->lo_sub = NULL;
311         }
312         EXIT;
313 }
314
315 static int lov_print_empty(const struct lu_env *env, void *cookie,
316                            lu_printer_t p, const struct lu_object *o)
317 {
318         (*p)(env, cookie, "empty\n");
319         return 0;
320 }
321
322 static int lov_print_raid0(const struct lu_env *env, void *cookie,
323                            lu_printer_t p, const struct lu_object *o)
324 {
325         struct lov_object       *lov = lu2lov(o);
326         struct lov_layout_raid0 *r0  = lov_r0(lov);
327         int i;
328
329         (*p)(env, cookie, "stripes: %d:\n", r0->lo_nr);
330         for (i = 0; i < r0->lo_nr; ++i) {
331                 struct lu_object *sub;
332
333                 if (r0->lo_sub[i] != NULL) {
334                         sub = lovsub2lu(r0->lo_sub[i]);
335                         lu_object_print(env, cookie, p, sub);
336                 } else
337                         (*p)(env, cookie, "sub %d absent\n", i);
338         }
339         return 0;
340 }
341
342 /**
343  * Implements cl_object_operations::coo_attr_get() method for an object
344  * without stripes (LLT_EMPTY layout type).
345  *
346  * The only attributes this layer is authoritative in this case is
347  * cl_attr::cat_blocks---it's 0.
348  */
349 static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
350                               struct cl_attr *attr)
351 {
352         attr->cat_blocks = 0;
353         return 0;
354 }
355
356 static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
357                               struct cl_attr *attr)
358 {
359         struct lov_object       *lov = cl2lov(obj);
360         struct lov_layout_raid0 *r0 = lov_r0(lov);
361         struct lov_stripe_md    *lsm = lov->u.raid0.lo_lsm;
362         struct ost_lvb          *lvb = &lov_env_info(env)->lti_lvb;
363         __u64                    kms;
364         int                      result = 0;
365
366         ENTRY;
367         if (!r0->lo_attr_valid) {
368                 /*
369                  * Fill LVB with attributes already initialized by the upper
370                  * layer.
371                  */
372                 cl_attr2lvb(lvb, attr);
373                 kms = attr->cat_kms;
374
375                 /*
376                  * XXX that should be replaced with a loop over sub-objects,
377                  * doing cl_object_attr_get() on them. But for now, let's
378                  * reuse old lov code.
379                  */
380
381                 /*
382                  * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
383                  * happy. It's not needed, because new code uses
384                  * ->coh_attr_guard spin-lock to protect consistency of
385                  * sub-object attributes.
386                  */
387                 lov_stripe_lock(lsm);
388                 result = lov_merge_lvb_kms(lsm, lvb, &kms);
389                 lov_stripe_unlock(lsm);
390                 if (result == 0) {
391                         cl_lvb2attr(attr, lvb);
392                         attr->cat_kms = kms;
393                         r0->lo_attr_valid = 1;
394                         r0->lo_attr = *attr;
395                 }
396         } else
397                 *attr = r0->lo_attr;
398         RETURN(result);
399 }
400
401 const static struct lov_layout_operations lov_dispatch[] = {
402         [LLT_EMPTY] = {
403                 .llo_init      = lov_init_empty,
404                 .llo_delete    = lov_delete_empty,
405                 .llo_fini      = lov_fini_empty,
406                 .llo_install   = lov_install_empty,
407                 .llo_print     = lov_print_empty,
408                 .llo_page_init = lov_page_init_empty,
409                 .llo_lock_init = NULL,
410                 .llo_io_init   = lov_io_init_empty,
411                 .llo_getattr   = lov_attr_get_empty
412         },
413         [LLT_RAID0] = {
414                 .llo_init      = lov_init_raid0,
415                 .llo_delete    = lov_delete_raid0,
416                 .llo_fini      = lov_fini_raid0,
417                 .llo_install   = lov_install_raid0,
418                 .llo_print     = lov_print_raid0,
419                 .llo_page_init = lov_page_init_raid0,
420                 .llo_lock_init = lov_lock_init_raid0,
421                 .llo_io_init   = lov_io_init_raid0,
422                 .llo_getattr   = lov_attr_get_raid0
423         }
424 };
425
426
427 /**
428  * Performs a double-dispatch based on the layout type of an object.
429  */
430 #define LOV_2DISPATCH_NOLOCK(obj, op, ...)                              \
431 ({                                                                      \
432         struct lov_object                      *__obj = (obj);          \
433         enum lov_layout_type                    __llt;                  \
434                                                                         \
435         __llt = __obj->lo_type;                                         \
436         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
437         lov_dispatch[__llt].op(__VA_ARGS__);                            \
438 })
439
440 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                       \
441 ({                                                                      \
442         struct lov_object                      *__obj = (obj);          \
443         int                                     __lock = !!(lock);      \
444         typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;               \
445                                                                         \
446         __lock &= __obj->lo_owner != cfs_current();                     \
447         if (__lock)                                                     \
448                 down_read(&__obj->lo_type_guard);                       \
449         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
450         if (__lock)                                                     \
451                 up_read(&__obj->lo_type_guard);                         \
452         __result;                                                       \
453 })
454
455 /**
456  * Performs a locked double-dispatch based on the layout type of an object.
457  */
458 #define LOV_2DISPATCH(obj, op, ...)                     \
459         LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
460
461 #define LOV_2DISPATCH_VOID(obj, op, ...)                                \
462 do {                                                                    \
463         struct lov_object                      *__obj = (obj);          \
464         enum lov_layout_type                    __llt;                  \
465                                                                         \
466         if (__obj->lo_owner != cfs_current())                           \
467                 down_read(&__obj->lo_type_guard);                       \
468         __llt = __obj->lo_type;                                         \
469         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
470         lov_dispatch[__llt].op(__VA_ARGS__);                            \
471         if (__obj->lo_owner != cfs_current())                           \
472                 up_read(&__obj->lo_type_guard);                         \
473 } while (0)
474
475 static int lov_layout_change(const struct lu_env *env,
476                              struct lov_object *obj, enum lov_layout_type llt,
477                              const struct cl_object_conf *conf)
478 {
479         int result;
480         union lov_layout_state       *state = &lov_env_info(env)->lti_state;
481         const struct lov_layout_operations *old_ops;
482         const struct lov_layout_operations *new_ops;
483
484         LASSERT(0 <= obj->lo_type && obj->lo_type < ARRAY_SIZE(lov_dispatch));
485         LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
486         ENTRY;
487
488         old_ops = &lov_dispatch[obj->lo_type];
489         new_ops = &lov_dispatch[llt];
490
491         result = new_ops->llo_init(env, lu2lov_dev(obj->lo_cl.co_lu.lo_dev),
492                                    obj, conf, state);
493         if (result == 0) {
494                 struct cl_object_header *hdr = cl_object_header(&obj->lo_cl);
495                 void                    *cookie;
496                 struct lu_env           *nested;
497                 int                      refcheck;
498
499                 cookie = cl_env_reenter();
500                 nested = cl_env_get(&refcheck);
501                 if (!IS_ERR(nested))
502                         cl_object_prune(nested, &obj->lo_cl);
503                 else
504                         result = PTR_ERR(nested);
505                 cl_env_put(nested, &refcheck);
506                 cl_env_reexit(cookie);
507
508                 old_ops->llo_fini(env, obj, &obj->u);
509                 LASSERT(list_empty(&hdr->coh_locks));
510                 LASSERT(hdr->coh_tree.rnode == NULL);
511                 LASSERT(hdr->coh_pages == 0);
512
513                 new_ops->llo_install(env, obj, state);
514                 obj->lo_type = llt;
515         } else
516                 new_ops->llo_fini(env, obj, state);
517         RETURN(result);
518 }
519
520 /*****************************************************************************
521  *
522  * Lov object operations.
523  *
524  */
525
526 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
527                     const struct lu_object_conf *conf)
528 {
529         struct lov_device            *dev   = lu2lov_dev(obj->lo_dev);
530         struct lov_object            *lov   = lu2lov(obj);
531         const struct cl_object_conf  *cconf = lu2cl_conf(conf);
532         union  lov_layout_state      *set   = &lov_env_info(env)->lti_state;
533         const struct lov_layout_operations *ops;
534         int result;
535
536         ENTRY;
537         init_rwsem(&lov->lo_type_guard);
538
539         /* no locking is necessary, as object is being created */
540         lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
541         ops = &lov_dispatch[lov->lo_type];
542         result = ops->llo_init(env, dev, lov, cconf, set);
543         if (result == 0)
544                 ops->llo_install(env, lov, set);
545         else
546                 ops->llo_fini(env, lov, set);
547         RETURN(result);
548 }
549
550 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
551                         const struct cl_object_conf *conf)
552 {
553         struct lov_object *lov = cl2lov(obj);
554         int result;
555
556         ENTRY;
557         /*
558          * Currently only LLT_EMPTY -> LLT_RAID0 transition is supported.
559          */
560         LASSERT(lov->lo_owner != cfs_current());
561         down_write(&lov->lo_type_guard);
562         LASSERT(lov->lo_owner == NULL);
563         lov->lo_owner = cfs_current();
564         if (lov->lo_type == LLT_EMPTY && conf->u.coc_md->lsm != NULL)
565                 result = lov_layout_change(env, lov, LLT_RAID0, conf);
566         else
567                 result = -EOPNOTSUPP;
568         lov->lo_owner = NULL;
569         up_write(&lov->lo_type_guard);
570         RETURN(result);
571 }
572
573 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
574 {
575         struct lov_object *lov = lu2lov(obj);
576
577         ENTRY;
578         LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
579         EXIT;
580 }
581
582 static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
583 {
584         struct lov_object *lov = lu2lov(obj);
585
586         ENTRY;
587         LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
588         lu_object_fini(obj);
589         OBD_SLAB_FREE_PTR(lov, lov_object_kmem);
590         EXIT;
591 }
592
593 static int lov_object_print(const struct lu_env *env, void *cookie,
594                             lu_printer_t p, const struct lu_object *o)
595 {
596         return LOV_2DISPATCH(lu2lov(o), llo_print, env, cookie, p, o);
597 }
598
599 struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj,
600                               struct cl_page *page, cfs_page_t *vmpage)
601 {
602         return LOV_2DISPATCH(cl2lov(obj),
603                              llo_page_init, env, obj, page, vmpage);
604 }
605
606 /**
607  * Implements cl_object_operations::clo_io_init() method for lov
608  * layer. Dispatches to the appropriate layout io initialization method.
609  */
610 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
611                 struct cl_io *io)
612 {
613         CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
614         /*
615          * Do not take lock in case of CIT_MISC io, because
616          *
617          *     - if this is an io for a glimpse, then we don't care;
618          *
619          *     - if this not a glimpse (writepage or lock cancellation), then
620          *       layout change cannot happen because a page or a lock
621          *       already exist; and
622          *
623          *     - lock ordering (lock mutex nests within layout rw-semaphore)
624          *       is obeyed in case of lock cancellation.
625          */
626         return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
627                                      io->ci_type != CIT_MISC, env, obj, io);
628 }
629
630 /**
631  * An implementation of cl_object_operations::clo_attr_get() method for lov
632  * layer. For raid0 layout this collects and merges attributes of all
633  * sub-objects.
634  */
635 static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
636                         struct cl_attr *attr)
637 {
638         /* do not take lock, as this function is called under a
639          * spin-lock. Layout is protected from changing by ongoing IO. */
640         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
641 }
642
643 static int lov_attr_set(const struct lu_env *env, struct cl_object *obj,
644                         const struct cl_attr *attr, unsigned valid)
645 {
646         /*
647          * No dispatch is required here, as no layout implements this.
648          */
649         return 0;
650 }
651
652 int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
653                   struct cl_lock *lock, const struct cl_io *io)
654 {
655         return LOV_2DISPATCH(cl2lov(obj), llo_lock_init, env, obj, lock, io);
656 }
657
658 static const struct cl_object_operations lov_ops = {
659         .coo_page_init = lov_page_init,
660         .coo_lock_init = lov_lock_init,
661         .coo_io_init   = lov_io_init,
662         .coo_attr_get  = lov_attr_get,
663         .coo_attr_set  = lov_attr_set,
664         .coo_conf_set  = lov_conf_set
665 };
666
667 static const struct lu_object_operations lov_lu_obj_ops = {
668         .loo_object_init      = lov_object_init,
669         .loo_object_delete    = lov_object_delete,
670         .loo_object_release   = NULL,
671         .loo_object_free      = lov_object_free,
672         .loo_object_print     = lov_object_print,
673         .loo_object_invariant = NULL
674 };
675
676 struct lu_object *lov_object_alloc(const struct lu_env *env,
677                                    const struct lu_object_header *_,
678                                    struct lu_device *dev)
679 {
680         struct lov_object *lov;
681         struct lu_object  *obj;
682
683         ENTRY;
684         OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, CFS_ALLOC_IO);
685         if (lov != NULL) {
686                 obj = lov2lu(lov);
687                 lu_object_init(obj, NULL, dev);
688                 lov->lo_cl.co_ops = &lov_ops;
689                 lov->lo_type = -1; /* invalid, to catch uninitialized type */
690                 /*
691                  * object io operation vector (cl_object::co_iop) is installed
692                  * later in lov_object_init(), as different vectors are used
693                  * for object with different layouts.
694                  */
695                 obj->lo_ops = &lov_lu_obj_ops;
696         } else
697                 obj = NULL;
698         RETURN(obj);
699 }
700
701 /** @} lov */