4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * Implementation of cl_object for LOV layer.
34 * Author: Nikita Danilov <nikita.danilov@sun.com>
35 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
38 #define DEBUG_SUBSYSTEM S_LOV
40 #include "lov_cl_internal.h"
42 static inline struct lov_device *lov_object_dev(struct lov_object *obj)
44 return lu2lov_dev(obj->lo_cl.co_lu.lo_dev);
51 /*****************************************************************************
57 struct lov_layout_operations {
58 int (*llo_init)(const struct lu_env *env, struct lov_device *dev,
59 struct lov_object *lov, struct lov_stripe_md *lsm,
60 const struct cl_object_conf *conf,
61 union lov_layout_state *state);
62 int (*llo_delete)(const struct lu_env *env, struct lov_object *lov,
63 union lov_layout_state *state);
64 void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
65 union lov_layout_state *state);
66 void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
67 union lov_layout_state *state);
68 int (*llo_print)(const struct lu_env *env, void *cookie,
69 lu_printer_t p, const struct lu_object *o);
70 int (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
71 struct cl_page *page, pgoff_t index);
72 int (*llo_lock_init)(const struct lu_env *env,
73 struct cl_object *obj, struct cl_lock *lock,
74 const struct cl_io *io);
75 int (*llo_io_init)(const struct lu_env *env,
76 struct cl_object *obj, struct cl_io *io);
77 int (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
78 struct cl_attr *attr);
81 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
83 static void lov_lsm_put(struct lov_stripe_md *lsm)
89 /*****************************************************************************
91 * Lov object layout operations.
95 static void lov_install_empty(const struct lu_env *env,
96 struct lov_object *lov,
97 union lov_layout_state *state)
100 * File without objects.
104 static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
105 struct lov_object *lov, struct lov_stripe_md *lsm,
106 const struct cl_object_conf *conf,
107 union lov_layout_state *state)
112 static void lov_install_composite(const struct lu_env *env,
113 struct lov_object *lov,
114 union lov_layout_state *state)
118 static struct cl_object *lov_sub_find(const struct lu_env *env,
119 struct cl_device *dev,
120 const struct lu_fid *fid,
121 const struct cl_object_conf *conf)
126 o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu);
127 LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type));
131 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
132 struct cl_object *subobj, struct lov_layout_raid0 *r0,
133 struct lov_oinfo *oinfo, int idx)
135 struct cl_object_header *hdr;
136 struct cl_object_header *subhdr;
137 struct cl_object_header *parent;
138 int entry = lov_comp_entry(idx);
139 int stripe = lov_comp_stripe(idx);
142 if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
143 /* For sanity:test_206.
144 * Do not leave the object in cache to avoid accessing
145 * freed memory. This is because osc_object is referring to
146 * lov_oinfo of lsm_stripe_data which will be freed due to
148 cl_object_kill(env, subobj);
149 cl_object_put(env, subobj);
153 hdr = cl_object_header(lov2cl(lov));
154 subhdr = cl_object_header(subobj);
156 CDEBUG(D_INODE, DFID"@%p[%d:%d] -> "DFID"@%p: ostid: "DOSTID
157 " ost idx: %d gen: %d\n",
158 PFID(lu_object_fid(&subobj->co_lu)), subhdr, entry, stripe,
159 PFID(lu_object_fid(lov2lu(lov))), hdr, POSTID(&oinfo->loi_oi),
160 oinfo->loi_ost_idx, oinfo->loi_ost_gen);
162 /* reuse ->coh_attr_guard to protect coh_parent change */
163 spin_lock(&subhdr->coh_attr_guard);
164 parent = subhdr->coh_parent;
165 if (parent == NULL) {
166 subhdr->coh_parent = hdr;
167 spin_unlock(&subhdr->coh_attr_guard);
168 subhdr->coh_nesting = hdr->coh_nesting + 1;
169 lu_object_ref_add(&subobj->co_lu, "lov-parent", lov);
170 r0->lo_sub[stripe] = cl2lovsub(subobj);
171 r0->lo_sub[stripe]->lso_super = lov;
172 r0->lo_sub[stripe]->lso_index = idx;
175 struct lu_object *old_obj;
176 struct lov_object *old_lov;
177 unsigned int mask = D_INODE;
179 spin_unlock(&subhdr->coh_attr_guard);
180 old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
181 LASSERT(old_obj != NULL);
182 old_lov = cl2lov(lu2cl(old_obj));
183 if (old_lov->lo_layout_invalid) {
184 /* the object's layout has already changed but isn't
186 lu_object_unhash(env, &subobj->co_lu);
193 LU_OBJECT_DEBUG(mask, env, &subobj->co_lu,
194 "stripe %d is already owned.", idx);
195 LU_OBJECT_DEBUG(mask, env, old_obj, "owned.");
196 LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
197 cl_object_put(env, subobj);
202 static int lov_page_slice_fixup(struct lov_object *lov,
203 struct cl_object *stripe)
205 struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
209 return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off -
210 cfs_size_round(sizeof(struct lov_page));
212 cl_object_for_each(o, stripe)
213 o->co_slice_off += hdr->coh_page_bufsize;
215 return cl_object_header(stripe)->coh_page_bufsize;
218 static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
219 struct lov_object *lov, int index,
220 struct lov_layout_raid0 *r0)
222 struct lov_thread_info *lti = lov_env_info(env);
223 struct cl_object_conf *subconf = <i->lti_stripe_conf;
224 struct lu_fid *ofid = <i->lti_fid;
225 struct cl_object *stripe;
226 struct lov_stripe_md_entry *lse = lov_lse(lov, index);
233 spin_lock_init(&r0->lo_sub_lock);
234 r0->lo_nr = lse->lsme_stripe_count;
235 LASSERT(r0->lo_nr <= lov_targets_nr(dev));
237 OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
238 if (r0->lo_sub == NULL)
239 GOTO(out, result = -ENOMEM);
243 memset(subconf, 0, sizeof(*subconf));
246 * Create stripe cl_objects.
248 for (i = 0; i < r0->lo_nr; ++i) {
249 struct cl_device *subdev;
250 struct lov_oinfo *oinfo = lse->lsme_oinfo[i];
251 int ost_idx = oinfo->loi_ost_idx;
253 if (lov_oinfo_is_dummy(oinfo))
256 result = ostid_to_fid(ofid, &oinfo->loi_oi, oinfo->loi_ost_idx);
260 if (dev->ld_target[ost_idx] == NULL) {
261 CERROR("%s: OST %04x is not initialized\n",
262 lov2obd(dev->ld_lov)->obd_name, ost_idx);
263 GOTO(out, result = -EIO);
266 subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
267 subconf->u.coc_oinfo = oinfo;
268 LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
269 /* In the function below, .hs_keycmp resolves to
270 * lu_obj_hop_keycmp() */
271 /* coverity[overrun-buffer-val] */
272 stripe = lov_sub_find(env, subdev, ofid, subconf);
274 GOTO(out, result = PTR_ERR(stripe));
276 result = lov_init_sub(env, lov, stripe, r0, oinfo,
277 lov_comp_index(index, i));
278 if (result == -EAGAIN) { /* try again */
285 int sz = lov_page_slice_fixup(lov, stripe);
286 LASSERT(ergo(psz > 0, psz == sz));
296 static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
297 struct lov_object *lov, struct lov_stripe_md *lsm,
298 const struct cl_object_conf *conf,
299 union lov_layout_state *state)
301 struct lov_layout_composite *comp = &state->composite;
302 unsigned int entry_count;
303 unsigned int psz = 0;
309 LASSERT(lsm->lsm_entry_count > 0);
310 LASSERT(lov->lo_lsm == NULL);
311 lov->lo_lsm = lsm_addref(lsm);
312 lov->lo_layout_invalid = true;
314 entry_count = lsm->lsm_entry_count;
315 comp->lo_entry_count = entry_count;
317 OBD_ALLOC(comp->lo_entries, entry_count * sizeof(*comp->lo_entries));
318 if (comp->lo_entries == NULL)
321 for (i = 0; i < entry_count; i++) {
322 struct lov_layout_entry *le = &comp->lo_entries[i];
324 le->lle_extent = lsm->lsm_entries[i]->lsme_extent;
325 result = lov_init_raid0(env, dev, lov, i, &le->lle_raid0);
329 LASSERT(ergo(psz > 0, psz == result));
333 cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
335 return result > 0 ? 0 : result;
338 static int lov_init_released(const struct lu_env *env,
339 struct lov_device *dev, struct lov_object *lov,
340 struct lov_stripe_md *lsm,
341 const struct cl_object_conf *conf,
342 union lov_layout_state *state)
344 LASSERT(lsm != NULL);
345 LASSERT(lsm->lsm_is_released);
346 LASSERT(lov->lo_lsm == NULL);
348 lov->lo_lsm = lsm_addref(lsm);
352 static struct cl_object *lov_find_subobj(const struct lu_env *env,
353 struct lov_object *lov,
354 struct lov_stripe_md *lsm,
357 struct lov_device *dev = lu2lov_dev(lov2lu(lov)->lo_dev);
358 struct lov_thread_info *lti = lov_env_info(env);
359 struct lu_fid *ofid = <i->lti_fid;
360 struct lov_oinfo *oinfo;
361 struct cl_device *subdev;
362 int entry = lov_comp_entry(index);
363 int stripe = lov_comp_stripe(index);
366 struct cl_object *result;
368 if (lov->lo_type != LLT_COMP)
369 GOTO(out, result = NULL);
371 if (entry >= lsm->lsm_entry_count ||
372 stripe >= lsm->lsm_entries[entry]->lsme_stripe_count)
373 GOTO(out, result = NULL);
375 oinfo = lsm->lsm_entries[entry]->lsme_oinfo[stripe];
376 ost_idx = oinfo->loi_ost_idx;
377 rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx);
379 GOTO(out, result = NULL);
381 subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
382 result = lov_sub_find(env, subdev, ofid, NULL);
385 result = ERR_PTR(-EINVAL);
389 static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
390 union lov_layout_state *state)
392 LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
394 lov_layout_wait(env, lov);
398 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
399 struct lov_layout_raid0 *r0,
400 struct lovsub_object *los, int idx)
402 struct cl_object *sub;
403 struct lu_site *site;
404 struct lu_site_bkt_data *bkt;
405 wait_queue_t *waiter;
407 LASSERT(r0->lo_sub[idx] == los);
409 sub = lovsub2cl(los);
410 site = sub->co_lu.lo_dev->ld_site;
411 bkt = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
413 cl_object_kill(env, sub);
414 /* release a reference to the sub-object and ... */
415 lu_object_ref_del(&sub->co_lu, "lov-parent", lov);
416 cl_object_put(env, sub);
418 /* ... wait until it is actually destroyed---sub-object clears its
419 * ->lo_sub[] slot in lovsub_object_fini() */
420 if (r0->lo_sub[idx] == los) {
421 waiter = &lov_env_info(env)->lti_waiter;
422 init_waitqueue_entry(waiter, current);
423 add_wait_queue(&bkt->lsb_marche_funebre, waiter);
424 set_current_state(TASK_UNINTERRUPTIBLE);
426 /* this wait-queue is signaled at the end of
427 * lu_object_free(). */
428 set_current_state(TASK_UNINTERRUPTIBLE);
429 spin_lock(&r0->lo_sub_lock);
430 if (r0->lo_sub[idx] == los) {
431 spin_unlock(&r0->lo_sub_lock);
434 spin_unlock(&r0->lo_sub_lock);
435 set_current_state(TASK_RUNNING);
439 remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
441 LASSERT(r0->lo_sub[idx] == NULL);
444 static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
445 struct lov_layout_raid0 *r0)
449 if (r0->lo_sub != NULL) {
452 for (i = 0; i < r0->lo_nr; ++i) {
453 struct lovsub_object *los = r0->lo_sub[i];
456 cl_object_prune(env, &los->lso_cl);
458 * If top-level object is to be evicted from
459 * the cache, so are its sub-objects.
461 lov_subobject_kill(env, lov, r0, los, i);
469 static int lov_delete_composite(const struct lu_env *env,
470 struct lov_object *lov,
471 union lov_layout_state *state)
473 struct lov_layout_entry *entry;
477 dump_lsm(D_INODE, lov->lo_lsm);
479 lov_layout_wait(env, lov);
480 lov_foreach_layout_entry(lov, entry)
481 lov_delete_raid0(env, lov, &entry->lle_raid0);
486 static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
487 union lov_layout_state *state)
489 LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
492 static void lov_fini_raid0(const struct lu_env *env,
493 struct lov_layout_raid0 *r0)
495 if (r0->lo_sub != NULL) {
496 OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
501 static void lov_fini_composite(const struct lu_env *env,
502 struct lov_object *lov,
503 union lov_layout_state *state)
505 struct lov_layout_composite *comp = &state->composite;
508 if (comp->lo_entries != NULL) {
509 struct lov_layout_entry *entry;
511 lov_foreach_layout_entry(lov, entry)
512 lov_fini_raid0(env, &entry->lle_raid0);
514 OBD_FREE(comp->lo_entries,
515 comp->lo_entry_count * sizeof(*comp->lo_entries));
516 comp->lo_entries = NULL;
519 dump_lsm(D_INODE, lov->lo_lsm);
520 lov_free_memmd(&lov->lo_lsm);
525 static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
526 union lov_layout_state *state)
529 dump_lsm(D_INODE, lov->lo_lsm);
530 lov_free_memmd(&lov->lo_lsm);
534 static int lov_print_empty(const struct lu_env *env, void *cookie,
535 lu_printer_t p, const struct lu_object *o)
537 (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
541 static int lov_print_raid0(const struct lu_env *env, void *cookie,
542 lu_printer_t p, struct lov_layout_raid0 *r0)
546 for (i = 0; i < r0->lo_nr; ++i) {
547 struct lu_object *sub;
549 if (r0->lo_sub[i] != NULL) {
550 sub = lovsub2lu(r0->lo_sub[i]);
551 lu_object_print(env, cookie, p, sub);
553 (*p)(env, cookie, "sub %d absent\n", i);
559 static int lov_print_composite(const struct lu_env *env, void *cookie,
560 lu_printer_t p, const struct lu_object *o)
562 struct lov_object *lov = lu2lov(o);
563 struct lov_stripe_md *lsm = lov->lo_lsm;
566 (*p)(env, cookie, "entries: %d, %s, lsm{%p 0x%08X %d %u}:\n",
567 lsm->lsm_entry_count,
568 lov->lo_layout_invalid ? "invalid" : "valid", lsm,
569 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
570 lsm->lsm_layout_gen);
572 for (i = 0; i < lsm->lsm_entry_count; i++) {
573 struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
575 (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %u, %u }\n",
576 PEXT(&lse->lsme_extent), lse->lsme_magic,
577 lse->lsme_id, lse->lsme_layout_gen,
578 lse->lsme_stripe_count, lse->lsme_stripe_size);
579 lov_print_raid0(env, cookie, p, lov_r0(lov, i));
585 static int lov_print_released(const struct lu_env *env, void *cookie,
586 lu_printer_t p, const struct lu_object *o)
588 struct lov_object *lov = lu2lov(o);
589 struct lov_stripe_md *lsm = lov->lo_lsm;
592 "released: %s, lsm{%p 0x%08X %d %u}:\n",
593 lov->lo_layout_invalid ? "invalid" : "valid", lsm,
594 lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
595 lsm->lsm_layout_gen);
600 * Implements cl_object_operations::coo_attr_get() method for an object
601 * without stripes (LLT_EMPTY layout type).
603 * The only attributes this layer is authoritative in this case is
604 * cl_attr::cat_blocks---it's 0.
606 static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
607 struct cl_attr *attr)
609 attr->cat_blocks = 0;
613 static int lov_attr_get_raid0(const struct lu_env *env, struct lov_object *lov,
614 unsigned int index, struct lov_layout_raid0 *r0)
617 struct lov_stripe_md *lsm = lov->lo_lsm;
618 struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb;
619 struct cl_attr *attr = &r0->lo_attr;
623 if (r0->lo_attr_valid)
626 memset(lvb, 0, sizeof(*lvb));
628 /* XXX: timestamps can be negative by sanity:test_39m,
630 lvb->lvb_atime = LLONG_MIN;
631 lvb->lvb_ctime = LLONG_MIN;
632 lvb->lvb_mtime = LLONG_MIN;
635 * XXX that should be replaced with a loop over sub-objects,
636 * doing cl_object_attr_get() on them. But for now, let's
637 * reuse old lov code.
641 * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
642 * happy. It's not needed, because new code uses
643 * ->coh_attr_guard spin-lock to protect consistency of
644 * sub-object attributes.
646 lov_stripe_lock(lsm);
647 result = lov_merge_lvb_kms(lsm, index, lvb, &kms);
648 lov_stripe_unlock(lsm);
650 cl_lvb2attr(attr, lvb);
652 r0->lo_attr_valid = 1;
658 static int lov_attr_get_composite(const struct lu_env *env,
659 struct cl_object *obj,
660 struct cl_attr *attr)
662 struct lov_object *lov = cl2lov(obj);
663 struct lov_layout_entry *entry;
670 attr->cat_blocks = 0;
671 lov_foreach_layout_entry(lov, entry) {
672 struct lov_layout_raid0 *r0 = &entry->lle_raid0;
673 struct cl_attr *lov_attr = &r0->lo_attr;
675 result = lov_attr_get_raid0(env, lov, index, r0);
682 attr->cat_blocks += lov_attr->cat_blocks;
683 if (attr->cat_size < lov_attr->cat_size)
684 attr->cat_size = lov_attr->cat_size;
685 if (attr->cat_kms < lov_attr->cat_kms)
686 attr->cat_kms = lov_attr->cat_kms;
687 if (attr->cat_atime < lov_attr->cat_atime)
688 attr->cat_atime = lov_attr->cat_atime;
689 if (attr->cat_ctime < lov_attr->cat_ctime)
690 attr->cat_ctime = lov_attr->cat_ctime;
691 if (attr->cat_mtime < lov_attr->cat_mtime)
692 attr->cat_mtime = lov_attr->cat_mtime;
697 const static struct lov_layout_operations lov_dispatch[] = {
699 .llo_init = lov_init_empty,
700 .llo_delete = lov_delete_empty,
701 .llo_fini = lov_fini_empty,
702 .llo_install = lov_install_empty,
703 .llo_print = lov_print_empty,
704 .llo_page_init = lov_page_init_empty,
705 .llo_lock_init = lov_lock_init_empty,
706 .llo_io_init = lov_io_init_empty,
707 .llo_getattr = lov_attr_get_empty,
710 .llo_init = lov_init_released,
711 .llo_delete = lov_delete_empty,
712 .llo_fini = lov_fini_released,
713 .llo_install = lov_install_empty,
714 .llo_print = lov_print_released,
715 .llo_page_init = lov_page_init_empty,
716 .llo_lock_init = lov_lock_init_empty,
717 .llo_io_init = lov_io_init_released,
718 .llo_getattr = lov_attr_get_empty,
721 .llo_init = lov_init_composite,
722 .llo_delete = lov_delete_composite,
723 .llo_fini = lov_fini_composite,
724 .llo_install = lov_install_composite,
725 .llo_print = lov_print_composite,
726 .llo_page_init = lov_page_init_composite,
727 .llo_lock_init = lov_lock_init_composite,
728 .llo_io_init = lov_io_init_composite,
729 .llo_getattr = lov_attr_get_composite,
734 * Performs a double-dispatch based on the layout type of an object.
736 #define LOV_2DISPATCH_NOLOCK(obj, op, ...) \
738 struct lov_object *__obj = (obj); \
739 enum lov_layout_type __llt; \
741 __llt = __obj->lo_type; \
742 LASSERT(__llt < ARRAY_SIZE(lov_dispatch)); \
743 lov_dispatch[__llt].op(__VA_ARGS__); \
747 * Return lov_layout_type associated with a given lsm
749 static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
754 if (lsm->lsm_is_released)
757 if (lsm->lsm_magic == LOV_MAGIC_V1 ||
758 lsm->lsm_magic == LOV_MAGIC_V3 ||
759 lsm->lsm_magic == LOV_MAGIC_COMP_V1)
765 static inline void lov_conf_freeze(struct lov_object *lov)
767 CDEBUG(D_INODE, "To take share lov(%p) owner %p/%p\n",
768 lov, lov->lo_owner, current);
769 if (lov->lo_owner != current)
770 down_read(&lov->lo_type_guard);
773 static inline void lov_conf_thaw(struct lov_object *lov)
775 CDEBUG(D_INODE, "To release share lov(%p) owner %p/%p\n",
776 lov, lov->lo_owner, current);
777 if (lov->lo_owner != current)
778 up_read(&lov->lo_type_guard);
781 #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...) \
783 struct lov_object *__obj = (obj); \
784 int __lock = !!(lock); \
785 typeof(lov_dispatch[0].op(__VA_ARGS__)) __result; \
788 lov_conf_freeze(__obj); \
789 __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__); \
791 lov_conf_thaw(__obj); \
796 * Performs a locked double-dispatch based on the layout type of an object.
798 #define LOV_2DISPATCH(obj, op, ...) \
799 LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__)
801 #define LOV_2DISPATCH_VOID(obj, op, ...) \
803 struct lov_object *__obj = (obj); \
804 enum lov_layout_type __llt; \
806 lov_conf_freeze(__obj); \
807 __llt = __obj->lo_type; \
808 LASSERT(__llt < ARRAY_SIZE(lov_dispatch)); \
809 lov_dispatch[__llt].op(__VA_ARGS__); \
810 lov_conf_thaw(__obj); \
813 static void lov_conf_lock(struct lov_object *lov)
815 LASSERT(lov->lo_owner != current);
816 down_write(&lov->lo_type_guard);
817 LASSERT(lov->lo_owner == NULL);
818 lov->lo_owner = current;
819 CDEBUG(D_INODE, "Took exclusive lov(%p) owner %p\n",
823 static void lov_conf_unlock(struct lov_object *lov)
825 CDEBUG(D_INODE, "To release exclusive lov(%p) owner %p\n",
827 lov->lo_owner = NULL;
828 up_write(&lov->lo_type_guard);
831 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
833 struct l_wait_info lwi = { 0 };
836 while (atomic_read(&lov->lo_active_ios) > 0) {
837 CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
838 PFID(lu_object_fid(lov2lu(lov))),
839 atomic_read(&lov->lo_active_ios));
841 l_wait_event(lov->lo_waitq,
842 atomic_read(&lov->lo_active_ios) == 0, &lwi);
847 static int lov_layout_change(const struct lu_env *unused,
848 struct lov_object *lov, struct lov_stripe_md *lsm,
849 const struct cl_object_conf *conf)
851 enum lov_layout_type llt = lov_type(lsm);
852 union lov_layout_state *state = &lov->u;
853 const struct lov_layout_operations *old_ops;
854 const struct lov_layout_operations *new_ops;
855 struct lov_device *lov_dev = lov_object_dev(lov);
861 LASSERT(lov->lo_type < ARRAY_SIZE(lov_dispatch));
863 env = cl_env_get(&refcheck);
865 RETURN(PTR_ERR(env));
867 LASSERT(llt < ARRAY_SIZE(lov_dispatch));
869 CDEBUG(D_INODE, DFID" from %s to %s\n",
870 PFID(lu_object_fid(lov2lu(lov))),
871 llt2str(lov->lo_type), llt2str(llt));
873 old_ops = &lov_dispatch[lov->lo_type];
874 new_ops = &lov_dispatch[llt];
876 rc = cl_object_prune(env, &lov->lo_cl);
880 rc = old_ops->llo_delete(env, lov, &lov->u);
884 old_ops->llo_fini(env, lov, &lov->u);
886 LASSERT(atomic_read(&lov->lo_active_ios) == 0);
888 CDEBUG(D_INODE, DFID "Apply new layout lov %p, type %d\n",
889 PFID(lu_object_fid(lov2lu(lov))), lov, llt);
891 lov->lo_type = LLT_EMPTY;
893 /* page bufsize fixup */
894 cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
895 lov_page_slice_fixup(lov, NULL);
897 rc = new_ops->llo_init(env, lov_dev, lov, lsm, conf, state);
899 struct obd_device *obd = lov2obd(lov_dev->ld_lov);
901 CERROR("%s: cannot apply new layout on "DFID" : rc = %d\n",
902 obd->obd_name, PFID(lu_object_fid(lov2lu(lov))), rc);
903 new_ops->llo_delete(env, lov, state);
904 new_ops->llo_fini(env, lov, state);
905 /* this file becomes an EMPTY file. */
909 new_ops->llo_install(env, lov, state);
913 cl_env_put(env, &refcheck);
917 /*****************************************************************************
919 * Lov object operations.
922 int lov_object_init(const struct lu_env *env, struct lu_object *obj,
923 const struct lu_object_conf *conf)
925 struct lov_object *lov = lu2lov(obj);
926 struct lov_device *dev = lov_object_dev(lov);
927 const struct cl_object_conf *cconf = lu2cl_conf(conf);
928 union lov_layout_state *set = &lov->u;
929 const struct lov_layout_operations *ops;
930 struct lov_stripe_md *lsm = NULL;
934 init_rwsem(&lov->lo_type_guard);
935 atomic_set(&lov->lo_active_ios, 0);
936 init_waitqueue_head(&lov->lo_waitq);
937 cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
939 lov->lo_type = LLT_EMPTY;
940 if (cconf->u.coc_layout.lb_buf != NULL) {
941 lsm = lov_unpackmd(dev->ld_lov,
942 cconf->u.coc_layout.lb_buf,
943 cconf->u.coc_layout.lb_len);
945 RETURN(PTR_ERR(lsm));
947 dump_lsm(D_INODE, lsm);
950 /* no locking is necessary, as object is being created */
951 lov->lo_type = lov_type(lsm);
952 ops = &lov_dispatch[lov->lo_type];
953 rc = ops->llo_init(env, dev, lov, lsm, cconf, set);
957 ops->llo_install(env, lov, set);
965 static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
966 const struct cl_object_conf *conf)
968 struct lov_stripe_md *lsm = NULL;
969 struct lov_object *lov = cl2lov(obj);
973 if (conf->coc_opc == OBJECT_CONF_SET &&
974 conf->u.coc_layout.lb_buf != NULL) {
975 lsm = lov_unpackmd(lov_object_dev(lov)->ld_lov,
976 conf->u.coc_layout.lb_buf,
977 conf->u.coc_layout.lb_len);
979 RETURN(PTR_ERR(lsm));
983 if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
984 lov->lo_layout_invalid = true;
985 GOTO(out, result = 0);
988 if (conf->coc_opc == OBJECT_CONF_WAIT) {
989 if (lov->lo_layout_invalid &&
990 atomic_read(&lov->lo_active_ios) > 0) {
991 lov_conf_unlock(lov);
992 result = lov_layout_wait(env, lov);
998 LASSERT(conf->coc_opc == OBJECT_CONF_SET);
1000 if ((lsm == NULL && lov->lo_lsm == NULL) ||
1001 ((lsm != NULL && lov->lo_lsm != NULL) &&
1002 (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
1003 (lov->lo_lsm->lsm_entries[0]->lsme_pattern ==
1004 lsm->lsm_entries[0]->lsme_pattern))) {
1005 /* same version of layout */
1006 lov->lo_layout_invalid = false;
1007 GOTO(out, result = 0);
1010 /* will change layout - check if there still exists active IO. */
1011 if (atomic_read(&lov->lo_active_ios) > 0) {
1012 lov->lo_layout_invalid = true;
1013 GOTO(out, result = -EBUSY);
1016 result = lov_layout_change(env, lov, lsm, conf);
1017 lov->lo_layout_invalid = result != 0;
1021 lov_conf_unlock(lov);
1023 CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
1024 PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
1028 static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
1030 struct lov_object *lov = lu2lov(obj);
1033 LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u);
1037 static void lov_object_free(const struct lu_env *env, struct lu_object *obj)
1039 struct lov_object *lov = lu2lov(obj);
1042 LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u);
1043 lu_object_fini(obj);
1044 OBD_SLAB_FREE_PTR(lov, lov_object_kmem);
1048 static int lov_object_print(const struct lu_env *env, void *cookie,
1049 lu_printer_t p, const struct lu_object *o)
1051 return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o);
1054 int lov_page_init(const struct lu_env *env, struct cl_object *obj,
1055 struct cl_page *page, pgoff_t index)
1057 return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
1062 * Implements cl_object_operations::clo_io_init() method for lov
1063 * layer. Dispatches to the appropriate layout io initialization method.
1065 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
1068 CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
1070 CDEBUG(D_INODE, DFID "io %p type %d ignore/verify layout %d/%d\n",
1071 PFID(lu_object_fid(&obj->co_lu)), io, io->ci_type,
1072 io->ci_ignore_layout, io->ci_verify_layout);
1074 return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
1075 !io->ci_ignore_layout, env, obj, io);
1079 * An implementation of cl_object_operations::clo_attr_get() method for lov
1080 * layer. For raid0 layout this collects and merges attributes of all
1083 static int lov_attr_get(const struct lu_env *env, struct cl_object *obj,
1084 struct cl_attr *attr)
1086 /* do not take lock, as this function is called under a
1087 * spin-lock. Layout is protected from changing by ongoing IO. */
1088 return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
1091 static int lov_attr_update(const struct lu_env *env, struct cl_object *obj,
1092 const struct cl_attr *attr, unsigned valid)
1095 * No dispatch is required here, as no layout implements this.
1100 int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
1101 struct cl_lock *lock, const struct cl_io *io)
1103 /* No need to lock because we've taken one refcount of layout. */
1104 return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_lock_init, env, obj, lock,
1109 * We calculate on which OST the mapping will end. If the length of mapping
1110 * is greater than (stripe_size * stripe_count) then the last_stripe will
1111 * will be one just before start_stripe. Else we check if the mapping
1112 * intersects each OST and find last_stripe.
1113 * This function returns the last_stripe and also sets the stripe_count
1114 * over which the mapping is spread
1116 * \param lsm [in] striping information for the file
1117 * \param index [in] stripe component index
1118 * \param ext [in] logical extent of mapping
1119 * \param start_stripe [in] starting stripe of the mapping
1120 * \param stripe_count [out] the number of stripes across which to map is
1123 * \retval last_stripe return the last stripe of the mapping
1125 static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm, int index,
1126 struct lu_extent *ext,
1127 int start_stripe, int *stripe_count)
1129 struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index];
1135 if (ext->e_end - ext->e_start >
1136 lsme->lsme_stripe_size * lsme->lsme_stripe_count) {
1137 last_stripe = (start_stripe < 1 ? lsme->lsme_stripe_count - 1 :
1139 *stripe_count = lsme->lsme_stripe_count;
1141 for (j = 0, i = start_stripe; j < lsme->lsme_stripe_count;
1142 i = (i + 1) % lsme->lsme_stripe_count, j++) {
1143 if ((lov_stripe_intersects(lsm, index, i, ext,
1144 &obd_start, &obd_end)) == 0)
1148 last_stripe = (start_stripe + j - 1) % lsme->lsme_stripe_count;
1155 * Set fe_device and copy extents from local buffer into main return buffer.
1157 * \param fiemap [out] fiemap to hold all extents
1158 * \param lcl_fm_ext [in] array of fiemap extents get from OSC layer
1159 * \param ost_index [in] OST index to be written into the fm_device
1160 * field for each extent
1161 * \param ext_count [in] number of extents to be copied
1162 * \param current_extent [in] where to start copying in the extent array
1164 static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap,
1165 struct fiemap_extent *lcl_fm_ext,
1166 int ost_index, unsigned int ext_count,
1172 for (ext = 0; ext < ext_count; ext++) {
1173 lcl_fm_ext[ext].fe_device = ost_index;
1174 lcl_fm_ext[ext].fe_flags |= FIEMAP_EXTENT_NET;
1177 /* Copy fm_extent's from fm_local to return buffer */
1178 to = (char *)fiemap + fiemap_count_to_size(current_extent);
1179 memcpy(to, lcl_fm_ext, ext_count * sizeof(struct fiemap_extent));
1182 #define FIEMAP_BUFFER_SIZE 4096
1185 * Non-zero fe_logical indicates that this is a continuation FIEMAP
1186 * call. The local end offset and the device are sent in the first
1187 * fm_extent. This function calculates the stripe number from the index.
1188 * This function returns a stripe_no on which mapping is to be restarted.
1190 * This function returns fm_end_offset which is the in-OST offset at which
1191 * mapping should be restarted. If fm_end_offset=0 is returned then caller
1192 * will re-calculate proper offset in next stripe.
1193 * Note that the first extent is passed to lov_get_info via the value field.
1195 * \param fiemap [in] fiemap request header
1196 * \param lsm [in] striping information for the file
1197 * \param index [in] stripe component index
1198 * \param ext [in] logical extent of mapping
1199 * \param start_stripe [out] starting stripe will be returned in this
1201 static u64 fiemap_calc_fm_end_offset(struct fiemap *fiemap,
1202 struct lov_stripe_md *lsm,
1203 int index, struct lu_extent *ext,
1206 struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index];
1207 u64 local_end = fiemap->fm_extents[0].fe_logical;
1214 if (fiemap->fm_extent_count == 0 ||
1215 fiemap->fm_extents[0].fe_logical == 0)
1218 /* Find out stripe_no from ost_index saved in the fe_device */
1219 for (i = 0; i < lsme->lsme_stripe_count; i++) {
1220 struct lov_oinfo *oinfo = lsme->lsme_oinfo[i];
1222 if (lov_oinfo_is_dummy(oinfo))
1225 if (oinfo->loi_ost_idx == fiemap->fm_extents[0].fe_device) {
1231 if (stripe_no == -1)
1234 /* If we have finished mapping on previous device, shift logical
1235 * offset to start of next device */
1236 if (lov_stripe_intersects(lsm, index, stripe_no, ext,
1237 &lun_start, &lun_end) != 0 &&
1238 local_end < lun_end) {
1239 fm_end_offset = local_end;
1240 *start_stripe = stripe_no;
1242 /* This is a special value to indicate that caller should
1243 * calculate offset in next stripe. */
1245 *start_stripe = (stripe_no + 1) % lsme->lsme_stripe_count;
1248 return fm_end_offset;
1251 struct fiemap_state {
1252 struct fiemap *fs_fm;
1253 struct lu_extent fs_ext;
1258 int fs_start_stripe;
1260 bool fs_device_done;
1261 bool fs_finish_stripe;
1265 int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
1266 struct lov_stripe_md *lsm, struct fiemap *fiemap,
1267 size_t *buflen, struct ll_fiemap_info_key *fmkey,
1268 int index, int stripeno, struct fiemap_state *fs)
1270 struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index];
1271 struct cl_object *subobj;
1272 struct lov_obd *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
1273 struct fiemap_extent *fm_ext = &fs->fs_fm->fm_extents[0];
1274 u64 req_fm_len; /* Stores length of required mapping */
1275 u64 len_mapped_single_call;
1279 unsigned int ext_count;
1280 /* EOF for object */
1281 bool ost_eof = false;
1282 /* done with required mapping for this OST? */
1283 bool ost_done = false;
1287 fs->fs_device_done = false;
1288 /* Find out range of mapping on this stripe */
1289 if ((lov_stripe_intersects(lsm, index, stripeno, &fs->fs_ext,
1290 &lun_start, &obd_object_end)) == 0)
1293 if (lov_oinfo_is_dummy(lsme->lsme_oinfo[stripeno]))
1296 /* If this is a continuation FIEMAP call and we are on
1297 * starting stripe then lun_start needs to be set to
1299 if (fs->fs_end_offset != 0 && stripeno == fs->fs_start_stripe)
1300 lun_start = fs->fs_end_offset;
1301 lun_end = lov_size_to_stripe(lsm, index, fs->fs_ext.e_end, stripeno);
1302 if (lun_start == lun_end)
1305 req_fm_len = obd_object_end - lun_start;
1306 fs->fs_fm->fm_length = 0;
1307 len_mapped_single_call = 0;
1309 /* find lobsub object */
1310 subobj = lov_find_subobj(env, cl2lov(obj), lsm,
1311 lov_comp_index(index, stripeno));
1313 return PTR_ERR(subobj);
1314 /* If the output buffer is very large and the objects have many
1315 * extents we may need to loop on a single OST repeatedly */
1317 if (fiemap->fm_extent_count > 0) {
1318 /* Don't get too many extents. */
1319 if (fs->fs_cur_extent + fs->fs_cnt_need >
1320 fiemap->fm_extent_count)
1321 fs->fs_cnt_need = fiemap->fm_extent_count -
1325 lun_start += len_mapped_single_call;
1326 fs->fs_fm->fm_length = req_fm_len - len_mapped_single_call;
1327 req_fm_len = fs->fs_fm->fm_length;
1329 * If we've collected enough extent map, we'd request 1 more,
1330 * to see whether we coincidentally finished all available
1331 * extent map, so that FIEMAP_EXTENT_LAST would be set.
1333 fs->fs_fm->fm_extent_count = fs->fs_enough ?
1334 1 : fs->fs_cnt_need;
1335 fs->fs_fm->fm_mapped_extents = 0;
1336 fs->fs_fm->fm_flags = fiemap->fm_flags;
1338 ost_index = lsme->lsme_oinfo[stripeno]->loi_ost_idx;
1340 if (ost_index < 0 || ost_index >= lov->desc.ld_tgt_count)
1341 GOTO(obj_put, rc = -EINVAL);
1342 /* If OST is inactive, return extent with UNKNOWN flag. */
1343 if (!lov->lov_tgts[ost_index]->ltd_active) {
1344 fs->fs_fm->fm_flags |= FIEMAP_EXTENT_LAST;
1345 fs->fs_fm->fm_mapped_extents = 1;
1347 fm_ext[0].fe_logical = lun_start;
1348 fm_ext[0].fe_length = obd_object_end - lun_start;
1349 fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
1354 fs->fs_fm->fm_start = lun_start;
1355 fs->fs_fm->fm_flags &= ~FIEMAP_FLAG_DEVICE_ORDER;
1356 memcpy(&fmkey->lfik_fiemap, fs->fs_fm, sizeof(*fs->fs_fm));
1357 *buflen = fiemap_count_to_size(fs->fs_fm->fm_extent_count);
1359 rc = cl_object_fiemap(env, subobj, fmkey, fs->fs_fm, buflen);
1363 ext_count = fs->fs_fm->fm_mapped_extents;
1364 if (ext_count == 0) {
1366 fs->fs_device_done = true;
1367 /* If last stripe has hold at the end,
1368 * we need to return */
1369 if (stripeno == fs->fs_last_stripe) {
1370 fiemap->fm_mapped_extents = 0;
1371 fs->fs_finish_stripe = true;
1375 } else if (fs->fs_enough) {
1377 * We've collected enough extents and there are
1378 * more extents after it.
1383 /* If we just need num of extents, got to next device */
1384 if (fiemap->fm_extent_count == 0) {
1385 fs->fs_cur_extent += ext_count;
1389 /* prepare to copy retrived map extents */
1390 len_mapped_single_call = fm_ext[ext_count - 1].fe_logical +
1391 fm_ext[ext_count - 1].fe_length -
1394 /* Have we finished mapping on this device? */
1395 if (req_fm_len <= len_mapped_single_call) {
1397 fs->fs_device_done = true;
1400 /* Clear the EXTENT_LAST flag which can be present on
1401 * the last extent */
1402 if (fm_ext[ext_count - 1].fe_flags & FIEMAP_EXTENT_LAST)
1403 fm_ext[ext_count - 1].fe_flags &= ~FIEMAP_EXTENT_LAST;
1404 if (lov_stripe_size(lsm, index,
1405 fm_ext[ext_count - 1].fe_logical +
1406 fm_ext[ext_count - 1].fe_length,
1407 stripeno) >= fmkey->lfik_oa.o_size) {
1409 fs->fs_device_done = true;
1412 fiemap_prepare_and_copy_exts(fiemap, fm_ext, ost_index,
1413 ext_count, fs->fs_cur_extent);
1414 fs->fs_cur_extent += ext_count;
1416 /* Ran out of available extents? */
1417 if (fs->fs_cur_extent >= fiemap->fm_extent_count)
1418 fs->fs_enough = true;
1419 } while (!ost_done && !ost_eof);
1421 if (stripeno == fs->fs_last_stripe)
1422 fs->fs_finish_stripe = true;
1424 cl_object_put(env, subobj);
1430 * Break down the FIEMAP request and send appropriate calls to individual OSTs.
1431 * This also handles the restarting of FIEMAP calls in case mapping overflows
1432 * the available number of extents in single call.
1434 * \param env [in] lustre environment
1435 * \param obj [in] file object
1436 * \param fmkey [in] fiemap request header and other info
1437 * \param fiemap [out] fiemap buffer holding retrived map extents
1438 * \param buflen [in/out] max buffer length of @fiemap, when iterate
1439 * each OST, it is used to limit max map needed
1443 static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
1444 struct ll_fiemap_info_key *fmkey,
1445 struct fiemap *fiemap, size_t *buflen)
1447 struct lov_stripe_md_entry *lsme;
1448 struct lov_stripe_md *lsm;
1449 struct fiemap *fm_local = NULL;
1457 unsigned int buffer_size = FIEMAP_BUFFER_SIZE;
1459 struct fiemap_state fs = { 0 };
1462 lsm = lov_lsm_addref(cl2lov(obj));
1466 if (!(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER)) {
1468 * If the entry count > 1 or stripe_count > 1 and the
1469 * application does not understand DEVICE_ORDER flag,
1470 * it cannot interpret the extents correctly.
1472 if (lsm->lsm_entry_count > 1 ||
1473 (lsm->lsm_entry_count == 1 &&
1474 lsm->lsm_entries[0]->lsme_stripe_count > 1))
1475 GOTO(out_lsm, rc = -ENOTSUPP);
1478 if (lsm->lsm_is_released) {
1479 if (fiemap->fm_start < fmkey->lfik_oa.o_size) {
1481 * released file, return a minimal FIEMAP if
1482 * request fits in file-size.
1484 fiemap->fm_mapped_extents = 1;
1485 fiemap->fm_extents[0].fe_logical = fiemap->fm_start;
1486 if (fiemap->fm_start + fiemap->fm_length <
1487 fmkey->lfik_oa.o_size)
1488 fiemap->fm_extents[0].fe_length =
1491 fiemap->fm_extents[0].fe_length =
1492 fmkey->lfik_oa.o_size -
1494 fiemap->fm_extents[0].fe_flags |=
1495 FIEMAP_EXTENT_UNKNOWN | FIEMAP_EXTENT_LAST;
1497 GOTO(out_lsm, rc = 0);
1500 /* buffer_size is small to hold fm_extent_count of extents. */
1501 if (fiemap_count_to_size(fiemap->fm_extent_count) < buffer_size)
1502 buffer_size = fiemap_count_to_size(fiemap->fm_extent_count);
1504 OBD_ALLOC_LARGE(fm_local, buffer_size);
1505 if (fm_local == NULL)
1506 GOTO(out_lsm, rc = -ENOMEM);
1509 * Requested extent count exceeds the fiemap buffer size, shrink our
1512 if (fiemap_count_to_size(fiemap->fm_extent_count) > *buflen)
1513 fiemap->fm_extent_count = fiemap_size_to_count(*buflen);
1514 if (fiemap->fm_extent_count == 0)
1517 fs.fs_enough = false;
1518 fs.fs_cur_extent = 0;
1519 fs.fs_fm = fm_local;
1520 fs.fs_cnt_need = fiemap_size_to_count(buffer_size);
1522 whole_start = fiemap->fm_start;
1523 /* whole_start is beyond the end of the file */
1524 if (whole_start > fmkey->lfik_oa.o_size)
1525 GOTO(out_fm_local, rc = -EINVAL);
1526 whole_end = (fiemap->fm_length == OBD_OBJECT_EOF) ?
1527 fmkey->lfik_oa.o_size :
1528 whole_start + fiemap->fm_length - 1;
1530 * If fiemap->fm_length != OBD_OBJECT_EOF but whole_end exceeds file
1533 if (whole_end > fmkey->lfik_oa.o_size)
1534 whole_end = fmkey->lfik_oa.o_size;
1536 start_entry = lov_lsm_entry(lsm, whole_start);
1537 end_entry = lov_lsm_entry(lsm, whole_end);
1538 if (end_entry == -1)
1539 end_entry = lsm->lsm_entry_count - 1;
1541 if (start_entry == -1 || end_entry == -1)
1542 GOTO(out_fm_local, rc = -EINVAL);
1544 for (entry = start_entry; entry <= end_entry; entry++) {
1545 lsme = lsm->lsm_entries[entry];
1547 if (entry == start_entry)
1548 fs.fs_ext.e_start = whole_start;
1550 fs.fs_ext.e_start = lsme->lsme_extent.e_start;
1551 if (entry == end_entry)
1552 fs.fs_ext.e_end = whole_end;
1554 fs.fs_ext.e_end = lsme->lsme_extent.e_end - 1;
1555 fs.fs_length = fs.fs_ext.e_end - fs.fs_ext.e_start + 1;
1557 /* Calculate start stripe, last stripe and length of mapping */
1558 fs.fs_start_stripe = lov_stripe_number(lsm, entry,
1560 fs.fs_last_stripe = fiemap_calc_last_stripe(lsm, entry,
1561 &fs.fs_ext, fs.fs_start_stripe,
1563 fs.fs_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, entry,
1564 &fs.fs_ext, &fs.fs_start_stripe);
1565 /* Check each stripe */
1566 for (cur_stripe = fs.fs_start_stripe; stripe_count > 0;
1568 cur_stripe = (cur_stripe + 1) % lsme->lsme_stripe_count) {
1569 rc = fiemap_for_stripe(env, obj, lsm, fiemap, buflen,
1570 fmkey, entry, cur_stripe, &fs);
1572 GOTO(out_fm_local, rc);
1575 if (fs.fs_finish_stripe)
1577 } /* for each stripe */
1578 } /* for covering layout component */
1580 * We've traversed all components, set @entry to the last component
1581 * entry, it's for the last stripe check.
1585 /* Indicate that we are returning device offsets unless file just has
1587 if (lsm->lsm_entry_count > 1 ||
1588 (lsm->lsm_entry_count == 1 &&
1589 lsm->lsm_entries[0]->lsme_stripe_count > 1))
1590 fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
1592 if (fiemap->fm_extent_count == 0)
1593 goto skip_last_device_calc;
1595 /* Check if we have reached the last stripe and whether mapping for that
1596 * stripe is done. */
1597 if ((cur_stripe == fs.fs_last_stripe) && fs.fs_device_done)
1598 fiemap->fm_extents[fs.fs_cur_extent - 1].fe_flags |=
1600 skip_last_device_calc:
1601 fiemap->fm_mapped_extents = fs.fs_cur_extent;
1603 OBD_FREE_LARGE(fm_local, buffer_size);
1610 static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
1611 struct lov_user_md __user *lum)
1613 struct lov_object *lov = cl2lov(obj);
1614 struct lov_stripe_md *lsm;
1618 lsm = lov_lsm_addref(lov);
1622 rc = lov_getstripe(cl2lov(obj), lsm, lum);
1627 static int lov_object_layout_get(const struct lu_env *env,
1628 struct cl_object *obj,
1629 struct cl_layout *cl)
1631 struct lov_object *lov = cl2lov(obj);
1632 struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1633 struct lu_buf *buf = &cl->cl_buf;
1639 cl->cl_layout_gen = CL_LAYOUT_GEN_EMPTY;
1644 cl->cl_size = lov_mds_md_size(lsm->lsm_entries[0]->lsme_stripe_count,
1646 cl->cl_layout_gen = lsm->lsm_layout_gen;
1648 rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
1651 RETURN(rc < 0 ? rc : 0);
1654 static loff_t lov_object_maxbytes(struct cl_object *obj)
1656 struct lov_object *lov = cl2lov(obj);
1657 struct lov_stripe_md *lsm = lov_lsm_addref(lov);
1663 maxbytes = lsm->lsm_maxbytes;
1670 static const struct cl_object_operations lov_ops = {
1671 .coo_page_init = lov_page_init,
1672 .coo_lock_init = lov_lock_init,
1673 .coo_io_init = lov_io_init,
1674 .coo_attr_get = lov_attr_get,
1675 .coo_attr_update = lov_attr_update,
1676 .coo_conf_set = lov_conf_set,
1677 .coo_getstripe = lov_object_getstripe,
1678 .coo_layout_get = lov_object_layout_get,
1679 .coo_maxbytes = lov_object_maxbytes,
1680 .coo_fiemap = lov_object_fiemap,
1683 static const struct lu_object_operations lov_lu_obj_ops = {
1684 .loo_object_init = lov_object_init,
1685 .loo_object_delete = lov_object_delete,
1686 .loo_object_release = NULL,
1687 .loo_object_free = lov_object_free,
1688 .loo_object_print = lov_object_print,
1689 .loo_object_invariant = NULL
1692 struct lu_object *lov_object_alloc(const struct lu_env *env,
1693 const struct lu_object_header *unused,
1694 struct lu_device *dev)
1696 struct lov_object *lov;
1697 struct lu_object *obj;
1700 OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, GFP_NOFS);
1703 lu_object_init(obj, NULL, dev);
1704 lov->lo_cl.co_ops = &lov_ops;
1705 lov->lo_type = -1; /* invalid, to catch uninitialized type */
1707 * object io operation vector (cl_object::co_iop) is installed
1708 * later in lov_object_init(), as different vectors are used
1709 * for object with different layouts.
1711 obj->lo_ops = &lov_lu_obj_ops;
1717 struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
1719 struct lov_stripe_md *lsm = NULL;
1721 lov_conf_freeze(lov);
1722 if (lov->lo_lsm != NULL) {
1723 lsm = lsm_addref(lov->lo_lsm);
1724 CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
1725 lsm, atomic_read(&lsm->lsm_refc),
1726 lov->lo_layout_invalid, current);
1732 int lov_read_and_clear_async_rc(struct cl_object *clob)
1734 struct lu_object *luobj;
1738 luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
1740 if (luobj != NULL) {
1741 struct lov_object *lov = lu2lov(luobj);
1743 lov_conf_freeze(lov);
1744 switch (lov->lo_type) {
1746 struct lov_stripe_md *lsm;
1750 LASSERT(lsm != NULL);
1751 for (i = 0; i < lsm->lsm_entry_count; i++) {
1752 struct lov_stripe_md_entry *lse =
1753 lsm->lsm_entries[i];
1756 for (j = 0; j < lse->lsme_stripe_count; j++) {
1757 struct lov_oinfo *loi =
1760 if (lov_oinfo_is_dummy(loi))
1763 if (loi->loi_ar.ar_rc && !rc)
1764 rc = loi->loi_ar.ar_rc;
1765 loi->loi_ar.ar_rc = 0;
1779 EXPORT_SYMBOL(lov_read_and_clear_async_rc);