Whamcloud - gitweb
LU-169 lov: add lsm refcounting
[fs/lustre-release.git] / lustre / lov / lov_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Implementation of cl_object for LOV layer.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_LOV
43
44 #include "lov_cl_internal.h"
45 #include <lustre_debug.h>
46
47 /** \addtogroup lov
48  *  @{
49  */
50
51 /*****************************************************************************
52  *
53  * Layout operations.
54  *
55  */
56
57 struct lov_layout_operations {
58         int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
59                         struct lov_object *lov,
60                         const struct cl_object_conf *conf,
61                         union lov_layout_state *state);
62         void (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
63                            union lov_layout_state *state);
64         void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
65                          union lov_layout_state *state);
66         void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
67                             union lov_layout_state *state);
68         int  (*llo_print)(const struct lu_env *env, void *cookie,
69                           lu_printer_t p, const struct lu_object *o);
70         struct cl_page *(*llo_page_init)(const struct lu_env *env,
71                                          struct cl_object *obj,
72                                          struct cl_page *page,
73                                          cfs_page_t *vmpage);
74         int  (*llo_lock_init)(const struct lu_env *env,
75                               struct cl_object *obj, struct cl_lock *lock,
76                               const struct cl_io *io);
77         int  (*llo_io_init)(const struct lu_env *env,
78                             struct cl_object *obj, struct cl_io *io);
79         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
80                             struct cl_attr *attr);
81 };
82
83 /*****************************************************************************
84  *
85  * Lov object layout operations.
86  *
87  */
88
89 static void lov_install_empty(const struct lu_env *env,
90                               struct lov_object *lov,
91                               union  lov_layout_state *state)
92 {
93         /*
94          * File without objects.
95          */
96 }
97
98 static int lov_init_empty(const struct lu_env *env,
99                           struct lov_device *dev, struct lov_object *lov,
100                           const struct cl_object_conf *conf,
101                           union  lov_layout_state *state)
102 {
103         return 0;
104 }
105
106 static void lov_install_raid0(const struct lu_env *env,
107                               struct lov_object *lov,
108                               union  lov_layout_state *state)
109 {
110         lov->u = *state;
111 }
112
113 static struct cl_object *lov_sub_find(const struct lu_env *env,
114                                       struct cl_device *dev,
115                                       const struct lu_fid *fid,
116                                       const struct cl_object_conf *conf)
117 {
118         struct lu_object *o;
119
120         ENTRY;
121         o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
122         LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
123         RETURN(lu2cl(o));
124 }
125
126 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
127                         struct cl_object *stripe,
128                         struct lov_layout_raid0 *r0, int idx)
129 {
130         struct cl_object_header *hdr;
131         struct cl_object_header *subhdr;
132         struct cl_object_header *parent;
133         struct lov_oinfo        *oinfo;
134         int result;
135
136         hdr    = cl_object_header(lov2cl(lov));
137         subhdr = cl_object_header(stripe);
138         parent = subhdr->coh_parent;
139
140         oinfo = r0->lo_lsm->lsm_oinfo[idx];
141         CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" seq: "LPU64
142                " idx: %d gen: %d\n",
143                PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
144                PFID(&hdr->coh_lu.loh_fid), hdr,
145                oinfo->loi_id, oinfo->loi_seq,
146                oinfo->loi_ost_idx, oinfo->loi_ost_gen);
147
148         if (parent == NULL) {
149                 subhdr->coh_parent = hdr;
150                 subhdr->coh_nesting = hdr->coh_nesting + 1;
151                 lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
152                 r0->lo_sub[idx] = cl2lovsub(stripe);
153                 r0->lo_sub[idx]->lso_super = lov;
154                 r0->lo_sub[idx]->lso_index = idx;
155                 result = 0;
156         } else {
157                 CERROR("Stripe is already owned by other file (%d).\n", idx);
158                 LU_OBJECT_DEBUG(D_ERROR, env, &stripe->co_lu, "\n");
159                 LU_OBJECT_DEBUG(D_ERROR, env, lu_object_top(&parent->coh_lu),
160                                 "old\n");
161                 LU_OBJECT_HEADER(D_ERROR, env, lov2lu(lov), "new\n");
162                 cl_object_put(env, stripe);
163                 result = -EIO;
164         }
165         return result;
166 }
167
168 static int lov_init_raid0(const struct lu_env *env,
169                           struct lov_device *dev, struct lov_object *lov,
170                           const struct cl_object_conf *conf,
171                           union  lov_layout_state *state)
172 {
173         int result;
174         int i;
175
176         struct cl_object        *stripe;
177         struct lov_thread_info  *lti     = lov_env_info(env);
178         struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
179         struct lov_stripe_md    *lsm     = conf->u.coc_md->lsm;
180         struct lu_fid           *ofid    = &lti->lti_fid;
181         struct lov_layout_raid0 *r0      = &state->raid0;
182
183         ENTRY;
184
185         if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
186                 dump_lsm(D_ERROR, lsm);
187                 LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
188                          LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
189         }
190
191         r0->lo_lsm = lsm_addref(lsm);
192         r0->lo_nr  = lsm->lsm_stripe_count;
193         LASSERT(r0->lo_nr <= lov_targets_nr(dev));
194
195         OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
196         if (r0->lo_sub != NULL) {
197                 result = 0;
198                 subconf->coc_inode = conf->coc_inode;
199                 cfs_spin_lock_init(&r0->lo_sub_lock);
200                 /*
201                  * Create stripe cl_objects.
202                  */
203                 for (i = 0; i < r0->lo_nr && result == 0; ++i) {
204                         struct cl_device *subdev;
205                         struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
206                         int ost_idx = oinfo->loi_ost_idx;
207
208                         fid_ostid_unpack(ofid, &oinfo->loi_oi,
209                                          oinfo->loi_ost_idx);
210                         subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
211                         subconf->u.coc_oinfo = oinfo;
212                         LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
213                         stripe = lov_sub_find(env, subdev, ofid, subconf);
214                         if (!IS_ERR(stripe))
215                                 result = lov_init_sub(env, lov, stripe, r0, i);
216                         else
217                                 result = PTR_ERR(stripe);
218                 }
219         } else
220                 result = -ENOMEM;
221         RETURN(result);
222 }
223
224 static void lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
225                              union lov_layout_state *state)
226 {
227         LASSERT(lov->lo_type == LLT_EMPTY);
228 }
229
230 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
231                                struct lovsub_object *los, int idx)
232 {
233         struct cl_object        *sub;
234         struct lov_layout_raid0 *r0;
235         struct lu_site          *site;
236         struct lu_site_bkt_data *bkt;
237         cfs_waitlink_t          *waiter;
238
239         r0  = &lov->u.raid0;
240         LASSERT(r0->lo_sub[idx] == los);
241
242         sub  = lovsub2cl(los);
243         site = sub->co_lu.lo_dev->ld_site;
244         bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
245
246         cl_object_kill(env, sub);
247         /* release a reference to the sub-object and ... */
248         lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
249         cl_object_put(env, sub);
250
251         /* ... wait until it is actually destroyed---sub-object clears its
252          * ->lo_sub[] slot in lovsub_object_fini() */
253         if (r0->lo_sub[idx] == los) {
254                 waiter = &lov_env_info(env)->lti_waiter;
255                 cfs_waitlink_init(waiter);
256                 cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
257                 cfs_set_current_state(CFS_TASK_UNINT);
258                 while (1) {
259                         /* this wait-queue is signaled at the end of
260                          * lu_object_free(). */
261                         cfs_set_current_state(CFS_TASK_UNINT);
262                         cfs_spin_lock(&r0->lo_sub_lock);
263                         if (r0->lo_sub[idx] == los) {
264                                 cfs_spin_unlock(&r0->lo_sub_lock);
265                                 cfs_waitq_wait(waiter, CFS_TASK_UNINT);
266                         } else {
267                                 cfs_spin_unlock(&r0->lo_sub_lock);
268                                 cfs_set_current_state(CFS_TASK_RUNNING);
269                                 break;
270                         }
271                 }
272                 cfs_waitq_del(&bkt->lsb_marche_funebre, waiter);
273         }
274         LASSERT(r0->lo_sub[idx] == NULL);
275 }
276
277 static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
278                              union lov_layout_state *state)
279 {
280         struct lov_layout_raid0 *r0 = &state->raid0;
281         struct lov_stripe_md    *lsm = r0->lo_lsm;
282         struct l_wait_info       lwi = { 0 };
283         int                      i;
284
285         ENTRY;
286
287         /* wait until there is no extra users. */
288         dump_lsm(D_INODE, lsm);
289         l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
290
291         if (r0->lo_sub != NULL) {
292                 for (i = 0; i < r0->lo_nr; ++i) {
293                         struct lovsub_object *los = r0->lo_sub[i];
294
295                         if (los != NULL)
296                                 /*
297                                  * If top-level object is to be evicted from
298                                  * the cache, so are its sub-objects.
299                                  */
300                                 lov_subobject_kill(env, lov, los, i);
301                 }
302         }
303         EXIT;
304 }
305
306 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
307                            union lov_layout_state *state)
308 {
309         LASSERT(lov->lo_type == LLT_EMPTY);
310 }
311
312 static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
313                            union lov_layout_state *state)
314 {
315         struct lov_layout_raid0 *r0 = &state->raid0;
316
317         ENTRY;
318
319         if (r0->lo_sub != NULL) {
320                 OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
321                 r0->lo_sub = NULL;
322         }
323
324         LASSERT(cfs_atomic_read(&r0->lo_lsm->lsm_refc) == 1);
325         lov_free_memmd(&r0->lo_lsm);
326
327         EXIT;
328 }
329
330 static int lov_print_empty(const struct lu_env *env, void *cookie,
331                            lu_printer_t p, const struct lu_object *o)
332 {
333         (*p)(env, cookie, "empty\n");
334         return 0;
335 }
336
337 static int lov_print_raid0(const struct lu_env *env, void *cookie,
338                            lu_printer_t p, const struct lu_object *o)
339 {
340         struct lov_object       *lov = lu2lov(o);
341         struct lov_layout_raid0 *r0  = lov_r0(lov);
342         int i;
343
344         (*p)(env, cookie, "stripes: %d:\n", r0->lo_nr);
345         for (i = 0; i < r0->lo_nr; ++i) {
346                 struct lu_object *sub;
347
348                 if (r0->lo_sub[i] != NULL) {
349                         sub = lovsub2lu(r0->lo_sub[i]);
350                         lu_object_print(env, cookie, p, sub);
351                 } else
352                         (*p)(env, cookie, "sub %d absent\n", i);
353         }
354         return 0;
355 }
356
357 /**
358  * Implements cl_object_operations::coo_attr_get() method for an object
359  * without stripes (LLT_EMPTY layout type).
360  *
361  * The only attributes this layer is authoritative in this case is
362  * cl_attr::cat_blocks---it's 0.
363  */
364 static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
365                               struct cl_attr *attr)
366 {
367         attr->cat_blocks = 0;
368         return 0;
369 }
370
371 static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
372                               struct cl_attr *attr)
373 {
374         struct lov_object       *lov = cl2lov(obj);
375         struct lov_layout_raid0 *r0 = lov_r0(lov);
376         struct lov_stripe_md    *lsm = lov->u.raid0.lo_lsm;
377         struct ost_lvb          *lvb = &lov_env_info(env)->lti_lvb;
378         __u64                    kms;
379         int                      result = 0;
380
381         ENTRY;
382         if (!r0->lo_attr_valid) {
383                 /*
384                  * Fill LVB with attributes already initialized by the upper
385                  * layer.
386                  */
387                 cl_attr2lvb(lvb, attr);
388                 kms = attr->cat_kms;
389
390                 /*
391                  * XXX that should be replaced with a loop over sub-objects,
392                  * doing cl_object_attr_get() on them. But for now, let's
393                  * reuse old lov code.
394                  */
395
396                 /*
397                  * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
398                  * happy. It's not needed, because new code uses
399                  * ->coh_attr_guard spin-lock to protect consistency of
400                  * sub-object attributes.
401                  */
402                 lov_stripe_lock(lsm);
403                 result = lov_merge_lvb_kms(lsm, lvb, &kms);
404                 lov_stripe_unlock(lsm);
405                 if (result == 0) {
406                         cl_lvb2attr(attr, lvb);
407                         attr->cat_kms = kms;
408                         r0->lo_attr_valid = 1;
409                         r0->lo_attr = *attr;
410                 }
411         } else
412                 *attr = r0->lo_attr;
413         RETURN(result);
414 }
415
416 const static struct lov_layout_operations lov_dispatch[] = {
417         [LLT_EMPTY] = {
418                 .llo_init      = lov_init_empty,
419                 .llo_delete    = lov_delete_empty,
420                 .llo_fini      = lov_fini_empty,
421                 .llo_install   = lov_install_empty,
422                 .llo_print     = lov_print_empty,
423                 .llo_page_init = lov_page_init_empty,
424                 .llo_lock_init = NULL,
425                 .llo_io_init   = lov_io_init_empty,
426                 .llo_getattr   = lov_attr_get_empty
427         },
428         [LLT_RAID0] = {
429                 .llo_init      = lov_init_raid0,
430                 .llo_delete    = lov_delete_raid0,
431                 .llo_fini      = lov_fini_raid0,
432                 .llo_install   = lov_install_raid0,
433                 .llo_print     = lov_print_raid0,
434                 .llo_page_init = lov_page_init_raid0,
435                 .llo_lock_init = lov_lock_init_raid0,
436                 .llo_io_init   = lov_io_init_raid0,
437                 .llo_getattr   = lov_attr_get_raid0
438         }
439 };
440
441
442 /**
443  * Performs a double-dispatch based on the layout type of an object.
444  */
445 #define LOV_2DISPATCH_NOLOCK(obj, op, ...)                              \
446 ({                                                                      \
447         struct lov_object                      *__obj = (obj);          \
448         enum lov_layout_type                    __llt;                  \
449                                                                         \
450         __llt = __obj->lo_type;                                         \
451         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
452         lov_dispatch[__llt].op(__VA_ARGS__);                            \
453 })
454
455 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...)                       \
456 ({                                                                      \
457         struct lov_object                      *__obj = (obj);          \
458         int                                     __lock = !!(lock);      \
459         typeof(lov_dispatch[0].op(__VA_ARGS__)) __result;               \
460                                                                         \
461         __lock &= __obj->lo_owner != cfs_current();                     \
462         if (__lock)                                                     \
463                 cfs_down_read(&__obj->lo_type_guard);                   \
464         __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__);          \
465         if (__lock)                                                     \
466                 cfs_up_read(&__obj->lo_type_guard);                     \
467         __result;                                                       \
468 })
469
470 /**
471  * Performs a locked double-dispatch based on the layout type of an object.
472  */
473 #define LOV_2DISPATCH(obj, op, ...)                     \
474         LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
475
476 #define LOV_2DISPATCH_VOID(obj, op, ...)                                \
477 do {                                                                    \
478         struct lov_object                      *__obj = (obj);          \
479         enum lov_layout_type                    __llt;                  \
480                                                                         \
481         if (__obj->lo_owner != cfs_current())                           \
482                 cfs_down_read(&__obj->lo_type_guard);                   \
483         __llt = __obj->lo_type;                                         \
484         LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch));        \
485         lov_dispatch[__llt].op(__VA_ARGS__);                            \
486         if (__obj->lo_owner != cfs_current())                           \
487                 cfs_up_read(&__obj->lo_type_guard);                     \
488 } while (0)
489
490 static int lov_layout_change(const struct lu_env *env,
491                              struct lov_object *obj, enum lov_layout_type llt,
492                              const struct cl_object_conf *conf)
493 {
494         int result;
495         union lov_layout_state       *state = &lov_env_info(env)->lti_state;
496         const struct lov_layout_operations *old_ops;
497         const struct lov_layout_operations *new_ops;
498
499         LASSERT(0 <= obj->lo_type && obj->lo_type < ARRAY_SIZE(lov_dispatch));
500         LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
501         ENTRY;
502
503         old_ops = &lov_dispatch[obj->lo_type];
504         new_ops = &lov_dispatch[llt];
505
506         result = new_ops->llo_init(env, lu2lov_dev(obj->lo_cl.co_lu.lo_dev),
507                                    obj, conf, state);
508         if (result == 0) {
509                 struct cl_object_header *hdr = cl_object_header(&obj->lo_cl);
510                 void                    *cookie;
511                 struct lu_env           *nested;
512                 int                      refcheck;
513
514                 cookie = cl_env_reenter();
515                 nested = cl_env_get(&refcheck);
516                 if (!IS_ERR(nested))
517                         cl_object_prune(nested, &obj->lo_cl);
518                 else
519                         result = PTR_ERR(nested);
520                 cl_env_put(nested, &refcheck);
521                 cl_env_reexit(cookie);
522
523                 old_ops->llo_delete(env, obj, &obj->u);
524                 old_ops->llo_fini(env, obj, &obj->u);
525                 LASSERT(cfs_list_empty(&hdr->coh_locks));
526                 LASSERT(hdr->coh_tree.rnode == NULL);
527                 LASSERT(hdr->coh_pages == 0);
528
529                 new_ops->llo_install(env, obj, state);
530                 obj->lo_type = llt;
531         } else
532                 new_ops->llo_fini(env, obj, state);
533         RETURN(result);
534 }
535
536 /*****************************************************************************
537  *
538  * Lov object operations.
539  *
540  */
541
542 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
543                     const struct lu_object_conf *conf)
544 {
545         struct lov_device            *dev   = lu2lov_dev(obj->lo_dev);
546         struct lov_object            *lov   = lu2lov(obj);
547         const struct cl_object_conf  *cconf = lu2cl_conf(conf);
548         union  lov_layout_state      *set   = &lov_env_info(env)->lti_state;
549         const struct lov_layout_operations *ops;
550         int result;
551
552         ENTRY;
553         cfs_init_rwsem(&lov->lo_type_guard);
554         cfs_waitq_init(&lov->lo_waitq);
555
556         /* no locking is necessary, as object is being created */
557         lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
558         ops = &lov_dispatch[lov->lo_type];
559         result = ops->llo_init(env, dev, lov, cconf, set);
560         if (result == 0)
561                 ops->llo_install(env, lov, set);
562         else
563                 ops->llo_fini(env, lov, set);
564         RETURN(result);
565 }
566
567 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
568                         const struct cl_object_conf *conf)
569 {
570         struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
571         struct lov_object *lov = cl2lov(obj);
572         int result = 0;
573
574         ENTRY;
575         /*
576          * Currently only LLT_EMPTY -> LLT_RAID0 transition is supported.
577          */
578         LASSERT(lov->lo_owner != cfs_current());
579         cfs_down_write(&lov->lo_type_guard);
580         LASSERT(lov->lo_owner == NULL);
581         lov->lo_owner = cfs_current();
582         switch (lov->lo_type) {
583         case LLT_EMPTY:
584                 if (lsm != NULL)
585                         result = lov_layout_change(env, lov, LLT_RAID0, conf);
586                 break;
587         case LLT_RAID0:
588                 if (lsm == NULL || lov_stripe_md_cmp(lov->u.raid0.lo_lsm, lsm))
589                         result = -EOPNOTSUPP;
590                 break;
591         default:
592                 LBUG();
593         }
594         lov->lo_owner = NULL;
595         cfs_up_write(&lov->lo_type_guard);
596         RETURN(result);
597 }
598
599 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
600 {
601         struct lov_object *lov = lu2lov(obj);
602
603         ENTRY;
604         LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
605         EXIT;
606 }
607
608 static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
609 {
610         struct lov_object *lov = lu2lov(obj);
611
612         ENTRY;
613         LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
614         lu_object_fini(obj);
615         OBD_SLAB_FREE_PTR(lov, lov_object_kmem);
616         EXIT;
617 }
618
619 static int lov_object_print(const struct lu_env *env, void *cookie,
620                             lu_printer_t p, const struct lu_object *o)
621 {
622         return LOV_2DISPATCH(lu2lov(o), llo_print, env, cookie, p, o);
623 }
624
625 struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj,
626                               struct cl_page *page, cfs_page_t *vmpage)
627 {
628         return LOV_2DISPATCH(cl2lov(obj),
629                              llo_page_init, env, obj, page, vmpage);
630 }
631
632 /**
633  * Implements cl_object_operations::clo_io_init() method for lov
634  * layer. Dispatches to the appropriate layout io initialization method.
635  */
636 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
637                 struct cl_io *io)
638 {
639         CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
640         /*
641          * Do not take lock in case of CIT_MISC io, because
642          *
643          *     - if this is an io for a glimpse, then we don't care;
644          *
645          *     - if this not a glimpse (writepage or lock cancellation), then
646          *       layout change cannot happen because a page or a lock
647          *       already exist; and
648          *
649          *     - lock ordering (lock mutex nests within layout rw-semaphore)
650          *       is obeyed in case of lock cancellation.
651          */
652         return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
653                                      io->ci_type != CIT_MISC, env, obj, io);
654 }
655
656 /**
657  * An implementation of cl_object_operations::clo_attr_get() method for lov
658  * layer. For raid0 layout this collects and merges attributes of all
659  * sub-objects.
660  */
661 static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
662                         struct cl_attr *attr)
663 {
664         /* do not take lock, as this function is called under a
665          * spin-lock. Layout is protected from changing by ongoing IO. */
666         return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
667 }
668
669 static int lov_attr_set(const struct lu_env *env, struct cl_object *obj,
670                         const struct cl_attr *attr, unsigned valid)
671 {
672         /*
673          * No dispatch is required here, as no layout implements this.
674          */
675         return 0;
676 }
677
678 int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
679                   struct cl_lock *lock, const struct cl_io *io)
680 {
681         return LOV_2DISPATCH(cl2lov(obj), llo_lock_init, env, obj, lock, io);
682 }
683
684 static const struct cl_object_operations lov_ops = {
685         .coo_page_init = lov_page_init,
686         .coo_lock_init = lov_lock_init,
687         .coo_io_init   = lov_io_init,
688         .coo_attr_get  = lov_attr_get,
689         .coo_attr_set  = lov_attr_set,
690         .coo_conf_set  = lov_conf_set
691 };
692
693 static const struct lu_object_operations lov_lu_obj_ops = {
694         .loo_object_init      = lov_object_init,
695         .loo_object_delete    = lov_object_delete,
696         .loo_object_release   = NULL,
697         .loo_object_free      = lov_object_free,
698         .loo_object_print     = lov_object_print,
699         .loo_object_invariant = NULL
700 };
701
702 struct lu_object *lov_object_alloc(const struct lu_env *env,
703                                    const struct lu_object_header *unused,
704                                    struct lu_device *dev)
705 {
706         struct lov_object *lov;
707         struct lu_object  *obj;
708
709         ENTRY;
710         OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, CFS_ALLOC_IO);
711         if (lov != NULL) {
712                 obj = lov2lu(lov);
713                 lu_object_init(obj, NULL, dev);
714                 lov->lo_cl.co_ops = &lov_ops;
715                 lov->lo_type = -1; /* invalid, to catch uninitialized type */
716                 /*
717                  * object io operation vector (cl_object::co_iop) is installed
718                  * later in lov_object_init(), as different vectors are used
719                  * for object with different layouts.
720                  */
721                 obj->lo_ops = &lov_lu_obj_ops;
722         } else
723                 obj = NULL;
724         RETURN(obj);
725 }
726
727 struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
728 {
729         struct lov_stripe_md *lsm = NULL;
730
731         cfs_down_read(&lov->lo_type_guard);
732         switch (lov->lo_type) {
733         case LLT_RAID0:
734                 lsm = lsm_addref(lov->u.raid0.lo_lsm);
735         case LLT_EMPTY:
736                 break;
737         default:
738                 LBUG();
739         }
740         cfs_up_read(&lov->lo_type_guard);
741         return lsm;
742 }
743
744 void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm)
745 {
746         if (lsm == NULL)
747                 return;
748
749         lov_free_memmd(&lsm);
750         if (lov->lo_owner != NULL)
751                 cfs_waitq_signal(&lov->lo_waitq);
752 }
753
754 struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
755 {
756         struct lu_object *luobj;
757         struct lov_stripe_md *lsm = NULL;
758
759         if (clobj == NULL)
760                 return NULL;
761
762         luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
763                                  &lov_device_type);
764         if (luobj != NULL)
765                 lsm = lov_lsm_addref(lu2lov(luobj));
766         return lsm;
767 }
768 EXPORT_SYMBOL(lov_lsm_get);
769
770 void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm)
771 {
772         struct lu_object *luobj;
773
774         if (clobj == NULL || lsm == NULL)
775                 return;
776
777         luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
778                                  &lov_device_type);
779         LASSERT(luobj != NULL);
780
781         lov_lsm_decref(lu2lov(luobj), lsm);
782 }
783 EXPORT_SYMBOL(lov_lsm_put);
784
785 int lov_read_and_clear_async_rc(struct cl_object *clob)
786 {
787         struct lu_object *luobj;
788         int rc = 0;
789         ENTRY;
790
791         luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
792                                  &lov_device_type);
793         if (luobj != NULL) {
794                 struct lov_object *lov = lu2lov(luobj);
795
796                 cfs_down_read(&lov->lo_type_guard);
797                 switch (lov->lo_type) {
798                 case LLT_RAID0: {
799                         struct lov_stripe_md *lsm;
800                         int i;
801
802                         lsm = lov->u.raid0.lo_lsm;
803                         LASSERT(lsm != NULL);
804                         for (i = 0; i < lsm->lsm_stripe_count; i++) {
805                                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
806                                 if (loi->loi_ar.ar_rc && !rc)
807                                         rc = loi->loi_ar.ar_rc;
808                                 loi->loi_ar.ar_rc = 0;
809                         }
810                 }
811                 case LLT_EMPTY:
812                         break;
813                 default:
814                         LBUG();
815                 }
816                 cfs_up_read(&lov->lo_type_guard);
817         }
818         RETURN(rc);
819 }
820 EXPORT_SYMBOL(lov_read_and_clear_async_rc);
821
822 /** @} lov */