X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=ea57141597a1ece3d1d6b5f0c6f375e8647d472d;hb=HEAD;hp=e2dd864e742f81f131f32c1102776bbf405aa7a4;hpb=4c2514f4832801374092f3a48c755248af345566;p=fs%2Flustre-release.git diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index e2dd864..2d28fb1 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -1,33 +1,13 @@ -/* - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License version 2 for more details. A copy is - * included in the COPYING file that accompanied this code. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - * - * GPL HEADER END - */ +// SPDX-License-Identifier: GPL-2.0 + /* * Copyright 2009 Sun Microsystems, Inc. All rights reserved * Use is subject to license terms. * * Copyright (c) 2012, 2017, Intel Corporation. */ + /* - * lustre/lod/lod_object.c - * * This file contains implementations of methods for the OSD API * for the Logical Object Device (LOD) layer, which provides a virtual * local OSD object interface to the MDD layer, and abstracts the @@ -60,12 +40,12 @@ static const char dot[] = "."; static const char dotdot[] = ".."; -/** +/* * Implementation of dt_index_operations::dio_lookup * * Used with regular (non-striped) objects. * - * \see dt_index_operations::dio_lookup() in the API description for details. + * see dt_index_operations::dio_lookup() in the API description for details. */ static int lod_lookup(const struct lu_env *env, struct dt_object *dt, struct dt_rec *rec, const struct dt_key *key) @@ -74,12 +54,12 @@ static int lod_lookup(const struct lu_env *env, struct dt_object *dt, return next->do_index_ops->dio_lookup(env, next, rec, key); } -/** +/* * Implementation of dt_index_operations::dio_declare_insert. * * Used with regular (non-striped) objects. * - * \see dt_index_operations::dio_declare_insert() in the API description + * see dt_index_operations::dio_declare_insert() in the API description * for details. */ static int lod_declare_insert(const struct lu_env *env, struct dt_object *dt, @@ -89,12 +69,12 @@ static int lod_declare_insert(const struct lu_env *env, struct dt_object *dt, return lod_sub_declare_insert(env, dt_object_child(dt), rec, key, th); } -/** +/* * Implementation of dt_index_operations::dio_insert. * * Used with regular (non-striped) objects * - * \see dt_index_operations::dio_insert() in the API description for details. + * see dt_index_operations::dio_insert() in the API description for details. */ static int lod_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, @@ -103,12 +83,12 @@ static int lod_insert(const struct lu_env *env, struct dt_object *dt, return lod_sub_insert(env, dt_object_child(dt), rec, key, th); } -/** +/* * Implementation of dt_index_operations::dio_declare_delete. * * Used with regular (non-striped) objects. * - * \see dt_index_operations::dio_declare_delete() in the API description + * see dt_index_operations::dio_declare_delete() in the API description * for details. */ static int lod_declare_delete(const struct lu_env *env, struct dt_object *dt, @@ -117,12 +97,12 @@ static int lod_declare_delete(const struct lu_env *env, struct dt_object *dt, return lod_sub_declare_delete(env, dt_object_child(dt), key, th); } -/** +/* * Implementation of dt_index_operations::dio_delete. * * Used with regular (non-striped) objects. * - * \see dt_index_operations::dio_delete() in the API description for details. + * see dt_index_operations::dio_delete() in the API description for details. */ static int lod_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, struct thandle *th) @@ -130,12 +110,12 @@ static int lod_delete(const struct lu_env *env, struct dt_object *dt, return lod_sub_delete(env, dt_object_child(dt), key, th); } -/** +/* * Implementation of dt_it_ops::init. * * Used with regular (non-striped) objects. * - * \see dt_it_ops::init() in the API description for details. + * see dt_it_ops::init() in the API description for details. */ static struct dt_it *lod_it_init(const struct lu_env *env, struct dt_object *dt, __u32 attr) @@ -166,12 +146,12 @@ do { \ LASSERT((it)->lit_it != NULL); \ } while (0) -/** +/* * Implementation of dt_index_operations::dio_it.fini. * * Used with regular (non-striped) objects. * - * \see dt_index_operations::dio_it.fini() in the API description for details. + * see dt_index_operations::dio_it.fini() in the API description for details. */ static void lod_it_fini(const struct lu_env *env, struct dt_it *di) { @@ -185,12 +165,12 @@ static void lod_it_fini(const struct lu_env *env, struct dt_it *di) it->lit_it = NULL; } -/** +/* * Implementation of dt_it_ops::get. * * Used with regular (non-striped) objects. * - * \see dt_it_ops::get() in the API description for details. + * see dt_it_ops::get() in the API description for details. */ static int lod_it_get(const struct lu_env *env, struct dt_it *di, const struct dt_key *key) @@ -201,12 +181,12 @@ static int lod_it_get(const struct lu_env *env, struct dt_it *di, return it->lit_obj->do_index_ops->dio_it.get(env, it->lit_it, key); } -/** +/* * Implementation of dt_it_ops::put. * * Used with regular (non-striped) objects. * - * \see dt_it_ops::put() in the API description for details. + * see dt_it_ops::put() in the API description for details. */ static void lod_it_put(const struct lu_env *env, struct dt_it *di) { @@ -216,12 +196,12 @@ static void lod_it_put(const struct lu_env *env, struct dt_it *di) return it->lit_obj->do_index_ops->dio_it.put(env, it->lit_it); } -/** +/* * Implementation of dt_it_ops::next. * * Used with regular (non-striped) objects * - * \see dt_it_ops::next() in the API description for details. + * see dt_it_ops::next() in the API description for details. */ static int lod_it_next(const struct lu_env *env, struct dt_it *di) { @@ -231,12 +211,12 @@ static int lod_it_next(const struct lu_env *env, struct dt_it *di) return it->lit_obj->do_index_ops->dio_it.next(env, it->lit_it); } -/** +/* * Implementation of dt_it_ops::key. * * Used with regular (non-striped) objects. * - * \see dt_it_ops::key() in the API description for details. + * see dt_it_ops::key() in the API description for details. */ static struct dt_key *lod_it_key(const struct lu_env *env, const struct dt_it *di) @@ -247,12 +227,12 @@ static struct dt_key *lod_it_key(const struct lu_env *env, return it->lit_obj->do_index_ops->dio_it.key(env, it->lit_it); } -/** +/* * Implementation of dt_it_ops::key_size. * * Used with regular (non-striped) objects. * - * \see dt_it_ops::key_size() in the API description for details. + * see dt_it_ops::key_size() in the API description for details. */ static int lod_it_key_size(const struct lu_env *env, const struct dt_it *di) { @@ -262,12 +242,12 @@ static int lod_it_key_size(const struct lu_env *env, const struct dt_it *di) return it->lit_obj->do_index_ops->dio_it.key_size(env, it->lit_it); } -/** +/* * Implementation of dt_it_ops::rec. * * Used with regular (non-striped) objects. * - * \see dt_it_ops::rec() in the API description for details. + * see dt_it_ops::rec() in the API description for details. */ static int lod_it_rec(const struct lu_env *env, const struct dt_it *di, struct dt_rec *rec, __u32 attr) @@ -279,12 +259,12 @@ static int lod_it_rec(const struct lu_env *env, const struct dt_it *di, attr); } -/** +/* * Implementation of dt_it_ops::rec_size. * * Used with regular (non-striped) objects. * - * \see dt_it_ops::rec_size() in the API description for details. + * see dt_it_ops::rec_size() in the API description for details. */ static int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di, __u32 attr) @@ -296,12 +276,12 @@ static int lod_it_rec_size(const struct lu_env *env, const struct dt_it *di, attr); } -/** +/* * Implementation of dt_it_ops::store. * * Used with regular (non-striped) objects. * - * \see dt_it_ops::store() in the API description for details. + * see dt_it_ops::store() in the API description for details. */ static __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di) { @@ -311,12 +291,12 @@ static __u64 lod_it_store(const struct lu_env *env, const struct dt_it *di) return it->lit_obj->do_index_ops->dio_it.store(env, it->lit_it); } -/** +/* * Implementation of dt_it_ops::load. * * Used with regular (non-striped) objects. * - * \see dt_it_ops::load() in the API description for details. + * see dt_it_ops::load() in the API description for details. */ static int lod_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) @@ -327,23 +307,6 @@ static int lod_it_load(const struct lu_env *env, const struct dt_it *di, return it->lit_obj->do_index_ops->dio_it.load(env, it->lit_it, hash); } -/** - * Implementation of dt_it_ops::key_rec. - * - * Used with regular (non-striped) objects. - * - * \see dt_it_ops::rec() in the API description for details. - */ -static int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, - void *key_rec) -{ - const struct lod_it *it = (const struct lod_it *)di; - - LOD_CHECK_IT(env, it); - return it->lit_obj->do_index_ops->dio_it.key_rec(env, it->lit_it, - key_rec); -} - static const struct dt_index_operations lod_index_ops = { .dio_lookup = lod_lookup, .dio_declare_insert = lod_declare_insert, @@ -362,16 +325,15 @@ static const struct dt_index_operations lod_index_ops = { .rec_size = lod_it_rec_size, .store = lod_it_store, .load = lod_it_load, - .key_rec = lod_it_key_rec, } }; -/** +/* * Implementation of dt_index_operations::dio_lookup * * Used with striped directories. * - * \see dt_index_operations::dio_lookup() in the API description for details. + * see dt_index_operations::dio_lookup() in the API description for details. */ static int lod_striped_lookup(const struct lu_env *env, struct dt_object *dt, struct dt_rec *rec, const struct dt_key *key) @@ -410,13 +372,13 @@ static int lod_striped_lookup(const struct lu_env *env, struct dt_object *dt, return next->do_index_ops->dio_lookup(env, next, rec, key); } -/** +/* * Implementation of dt_it_ops::init. * * Used with striped objects. Internally just initializes the iterator * on the first stripe. * - * \see dt_it_ops::init() in the API description for details. + * see dt_it_ops::init() in the API description for details. */ static struct dt_it *lod_striped_it_init(const struct lu_env *env, struct dt_object *dt, __u32 attr) @@ -467,12 +429,12 @@ do { \ LASSERT((it)->lit_stripe_index < (lo)->ldo_dir_stripe_count); \ } while (0) -/** +/* * Implementation of dt_it_ops::fini. * * Used with striped objects. * - * \see dt_it_ops::fini() in the API description for details. + * see dt_it_ops::fini() in the API description for details. */ static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di) { @@ -498,14 +460,14 @@ static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di) it->lit_stripe_index = 0; } -/** +/* * Implementation of dt_it_ops::get. * * Right now it's not used widely, only to reset the iterator to the * initial position. It should be possible to implement a full version * which chooses a correct stripe to be able to position with any key. * - * \see dt_it_ops::get() in the API description for details. + * see dt_it_ops::get() in the API description for details. */ static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di, const struct dt_key *key) @@ -524,12 +486,12 @@ static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di, return next->do_index_ops->dio_it.get(env, it->lit_it, key); } -/** +/* * Implementation of dt_it_ops::put. * * Used with striped objects. * - * \see dt_it_ops::put() in the API description for details. + * see dt_it_ops::put() in the API description for details. */ static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di) { @@ -553,13 +515,13 @@ static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di) return next->do_index_ops->dio_it.put(env, it->lit_it); } -/** +/* * Implementation of dt_it_ops::next. * * Used with striped objects. When the end of the current stripe is * reached, the method takes the next stripe's iterator. * - * \see dt_it_ops::next() in the API description for details. + * see dt_it_ops::next() in the API description for details. */ static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di) { @@ -649,12 +611,12 @@ again: RETURN(1); } -/** +/* * Implementation of dt_it_ops::key. * * Used with striped objects. * - * \see dt_it_ops::key() in the API description for details. + * see dt_it_ops::key() in the API description for details. */ static struct dt_key *lod_striped_it_key(const struct lu_env *env, const struct dt_it *di) @@ -672,12 +634,12 @@ static struct dt_key *lod_striped_it_key(const struct lu_env *env, return next->do_index_ops->dio_it.key(env, it->lit_it); } -/** +/* * Implementation of dt_it_ops::key_size. * * Used with striped objects. * - * \see dt_it_ops::size() in the API description for details. + * see dt_it_ops::size() in the API description for details. */ static int lod_striped_it_key_size(const struct lu_env *env, const struct dt_it *di) @@ -695,12 +657,12 @@ static int lod_striped_it_key_size(const struct lu_env *env, return next->do_index_ops->dio_it.key_size(env, it->lit_it); } -/** +/* * Implementation of dt_it_ops::rec. * * Used with striped objects. * - * \see dt_it_ops::rec() in the API description for details. + * see dt_it_ops::rec() in the API description for details. */ static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di, struct dt_rec *rec, __u32 attr) @@ -718,12 +680,12 @@ static int lod_striped_it_rec(const struct lu_env *env, const struct dt_it *di, return next->do_index_ops->dio_it.rec(env, it->lit_it, rec, attr); } -/** +/* * Implementation of dt_it_ops::rec_size. * * Used with striped objects. * - * \see dt_it_ops::rec_size() in the API description for details. + * see dt_it_ops::rec_size() in the API description for details. */ static int lod_striped_it_rec_size(const struct lu_env *env, const struct dt_it *di, __u32 attr) @@ -741,12 +703,12 @@ static int lod_striped_it_rec_size(const struct lu_env *env, return next->do_index_ops->dio_it.rec_size(env, it->lit_it, attr); } -/** +/* * Implementation of dt_it_ops::store. * * Used with striped objects. * - * \see dt_it_ops::store() in the API description for details. + * see dt_it_ops::store() in the API description for details. */ static __u64 lod_striped_it_store(const struct lu_env *env, const struct dt_it *di) @@ -764,12 +726,12 @@ static __u64 lod_striped_it_store(const struct lu_env *env, return next->do_index_ops->dio_it.store(env, it->lit_it); } -/** +/* * Implementation of dt_it_ops::load. * * Used with striped objects. * - * \see dt_it_ops::load() in the API description for details. + * see dt_it_ops::load() in the API description for details. */ static int lod_striped_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) @@ -809,8 +771,12 @@ static const struct dt_index_operations lod_striped_index_ops = { }; /** - * Append the FID for each shard of the striped directory after the - * given LMV EA header. + * lod_load_lmv_shards() - Append the FID for each shard of the striped + * directory after the given LMV EA header. + * @env: pointer to the thread context + * @lo: pointer to the master object of the striped directory + * @buf: pointer to the lu_buf which will hold the LMV EA + * @resize: whether re-allocate the buffer if it is not big enough * * To simplify striped directory and the consistency verification, * we only store the LMV EA header on disk, for both master object @@ -825,14 +791,10 @@ static const struct dt_index_operations lod_striped_index_ops = { * There may be holes in the LMV EA if some shards' name entries * are corrupted or lost. * - * \param[in] env pointer to the thread context - * \param[in] lo pointer to the master object of the striped directory - * \param[in] buf pointer to the lu_buf which will hold the LMV EA - * \param[in] resize whether re-allocate the buffer if it is not big enough - * - * \retval positive size of the LMV EA - * \retval 0 for nothing to be loaded - * \retval negative error number on failure + * Return: + * * %positive size of the LMV EA + * * %0 for nothing to be loaded + * * %negative error number on failure */ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, struct lu_buf *buf, bool resize) @@ -878,7 +840,7 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len); } - if (unlikely(!dt_try_as_dir(env, obj))) + if (unlikely(!dt_try_as_dir(env, obj, true))) RETURN(-ENOTDIR); memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid)); @@ -971,7 +933,8 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, } /* The slot has been occupied. */ - if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) { + if (!fid_is_zero(&lmv1->lmv_stripe_fids[index]) && + !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME)) { struct lu_fid fid0; fid_le_to_cpu(&fid0, @@ -1001,10 +964,10 @@ next: RETURN(rc > 0 ? lmv_mds_md_size(stripes, magic) : rc); } -/** +/* * Implementation of dt_object_operations::do_index_try. * - * \see dt_object_operations::do_index_try() in the API description for details. + * see dt_object_operations::do_index_try() in the API description for details. */ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, const struct dt_index_features *feat) @@ -1046,10 +1009,10 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -/** +/* * Implementation of dt_object_operations::do_read_lock. * - * \see dt_object_operations::do_read_lock() in the API description for details. + * see dt_object_operations::do_read_lock() in the API description for details. */ static void lod_read_lock(const struct lu_env *env, struct dt_object *dt, unsigned role) @@ -1057,10 +1020,10 @@ static void lod_read_lock(const struct lu_env *env, struct dt_object *dt, dt_read_lock(env, dt_object_child(dt), role); } -/** +/* * Implementation of dt_object_operations::do_write_lock. * - * \see dt_object_operations::do_write_lock() in the API description for + * see dt_object_operations::do_write_lock() in the API description for * details. */ static void lod_write_lock(const struct lu_env *env, struct dt_object *dt, @@ -1069,10 +1032,10 @@ static void lod_write_lock(const struct lu_env *env, struct dt_object *dt, dt_write_lock(env, dt_object_child(dt), role); } -/** +/* * Implementation of dt_object_operations::do_read_unlock. * - * \see dt_object_operations::do_read_unlock() in the API description for + * see dt_object_operations::do_read_unlock() in the API description for * details. */ static void lod_read_unlock(const struct lu_env *env, struct dt_object *dt) @@ -1080,10 +1043,10 @@ static void lod_read_unlock(const struct lu_env *env, struct dt_object *dt) dt_read_unlock(env, dt_object_child(dt)); } -/** +/* * Implementation of dt_object_operations::do_write_unlock. * - * \see dt_object_operations::do_write_unlock() in the API description for + * see dt_object_operations::do_write_unlock() in the API description for * details. */ static void lod_write_unlock(const struct lu_env *env, struct dt_object *dt) @@ -1091,10 +1054,10 @@ static void lod_write_unlock(const struct lu_env *env, struct dt_object *dt) dt_write_unlock(env, dt_object_child(dt)); } -/** +/* * Implementation of dt_object_operations::do_write_locked. * - * \see dt_object_operations::do_write_locked() in the API description for + * see dt_object_operations::do_write_locked() in the API description for * details. */ static int lod_write_locked(const struct lu_env *env, struct dt_object *dt) @@ -1102,10 +1065,10 @@ static int lod_write_locked(const struct lu_env *env, struct dt_object *dt) return dt_write_locked(env, dt_object_child(dt)); } -/** +/* * Implementation of dt_object_operations::do_attr_get. * - * \see dt_object_operations::do_attr_get() in the API description for details. + * see dt_object_operations::do_attr_get() in the API description for details. */ static int lod_attr_get(const struct lu_env *env, struct dt_object *dt, @@ -1159,7 +1122,7 @@ static inline void lod_adjust_stripe_info(struct lod_layout_component *comp, struct lov_desc *desc, int append_stripes) { - if (comp->llc_pattern != LOV_PATTERN_MDT) { + if (!(comp->llc_pattern & LOV_PATTERN_MDT)) { if (append_stripes) { comp->llc_stripe_count = append_stripes; } else if (!comp->llc_stripe_count) { @@ -1183,13 +1146,16 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, for (i = 0; i < lo->ldo_comp_cnt; i++) { lod_comp = &lo->ldo_comp_entries[i]; + if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) + continue; + if (lod_comp->llc_stripe == NULL) continue; /* has stripe but not inited yet, this component has been * declared to be created, but hasn't created yet. */ - if (!lod_comp_inited(lod_comp)) + if (!lod_comp_inited(lod_comp) && !data->locd_declare) continue; if (data->locd_comp_skip_cb && @@ -1224,60 +1190,6 @@ unlock: RETURN(rc); } -static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env, - struct lod_object *lo, int comp_idx, - struct lod_obj_stripe_cb_data *data) -{ - struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[comp_idx]; - bool skipped = false; - - if (!(data->locd_attr->la_valid & LA_LAYOUT_VERSION)) - return skipped; - - switch (lo->ldo_flr_state) { - case LCM_FL_WRITE_PENDING: { - int i; - - /* skip stale components */ - if (lod_comp->llc_flags & LCME_FL_STALE) { - skipped = true; - break; - } - - /* skip valid and overlapping components, therefore any - * attempts to write overlapped components will never succeed - * because client will get EINPROGRESS. */ - for (i = 0; i < lo->ldo_comp_cnt; i++) { - if (i == comp_idx) - continue; - - if (lo->ldo_comp_entries[i].llc_flags & LCME_FL_STALE) - continue; - - if (lu_extent_is_overlapped(&lod_comp->llc_extent, - &lo->ldo_comp_entries[i].llc_extent)) { - skipped = true; - break; - } - } - break; - } - case LCM_FL_RDONLY: - case LCM_FL_SYNC_PENDING: - break; - default: - LASSERTF(0, "impossible: %d\n", lo->ldo_flr_state); - break; - } - - CDEBUG(D_LAYOUT, DFID": %s to set component %x to version: %u\n", - PFID(lu_object_fid(&lo->ldo_obj.do_lu)), - skipped ? "skipped" : "chose", lod_comp->llc_id, - data->locd_attr->la_layout_version); - - return skipped; -} - static inline int lod_obj_stripe_attr_set_cb(const struct lu_env *env, struct lod_object *lo, struct dt_object *dt, struct thandle *th, @@ -1296,7 +1208,7 @@ lod_obj_stripe_attr_set_cb(const struct lu_env *env, struct lod_object *lo, return lod_sub_attr_set(env, dt, data->locd_attr, th); } -/** +/* * Implementation of dt_object_operations::do_declare_attr_set. * * If the object is striped, then apply the changes to all the stripes. @@ -1333,7 +1245,7 @@ static int lod_declare_attr_set(const struct lu_env *env, if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) RETURN(0); } else { if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID | LA_MODE | @@ -1384,18 +1296,17 @@ static int lod_declare_attr_set(const struct lu_env *env, !S_ISREG(attr->la_mode)) RETURN(0); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { rc = lod_sub_declare_xattr_del(env, next, XATTR_NAME_LOV, th); RETURN(rc); } - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) || - OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) || + CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; - buf->lb_buf = info->lti_ea_store; - buf->lb_len = info->lti_ea_store_size; + *buf = info->lti_ea_buf; rc = lod_sub_declare_xattr_set(env, next, buf, XATTR_NAME_LOV, LU_XATTR_REPLACE, th); } @@ -1403,7 +1314,7 @@ static int lod_declare_attr_set(const struct lu_env *env, RETURN(rc); } -/** +/* * Implementation of dt_object_operations::do_attr_set. * * If the object is striped, then apply the changes to all or subset of @@ -1432,7 +1343,7 @@ static int lod_attr_set(const struct lu_env *env, if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) RETURN(0); } else { if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE | LA_PROJID | @@ -1473,7 +1384,6 @@ static int lod_attr_set(const struct lu_env *env, data.locd_attr = attr; data.locd_declare = false; - data.locd_comp_skip_cb = lod_obj_attr_set_comp_skip_cb; data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; rc = lod_obj_for_each_stripe(env, lo, th, &data); } @@ -1485,12 +1395,12 @@ static int lod_attr_set(const struct lu_env *env, !S_ISREG(attr->la_mode)) RETURN(0); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th); RETURN(rc); } - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE)) { + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE)) { struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; struct ost_id *oi = &info->lti_ostid; @@ -1503,9 +1413,8 @@ static int lod_attr_set(const struct lu_env *env, if (rc <= 0) RETURN(rc); - buf->lb_buf = info->lti_ea_store; - buf->lb_len = info->lti_ea_store_size; - lmm = info->lti_ea_store; + *buf = info->lti_ea_buf; + lmm = buf->lb_buf; magic = le32_to_cpu(lmm->lmm_magic); if (magic == LOV_MAGIC_COMP_V1 || magic == LOV_MAGIC_SEL) { struct lov_comp_md_v1 *lcm = buf->lb_buf; @@ -1528,7 +1437,7 @@ static int lod_attr_set(const struct lu_env *env, rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LOV, LU_XATTR_REPLACE, th); - } else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { + } else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; struct lov_comp_md_v1 *lcm; @@ -1538,8 +1447,7 @@ static int lod_attr_set(const struct lu_env *env, if (rc <= 0) RETURN(rc); - buf->lb_buf = info->lti_ea_store; - buf->lb_len = info->lti_ea_store_size; + *buf = info->lti_ea_buf; lcm = buf->lb_buf; if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1 && le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_SEL) @@ -1557,7 +1465,7 @@ static int lod_attr_set(const struct lu_env *env, RETURN(rc); } -/** +/* * Implementation of dt_object_operations::do_xattr_get. * * If LOV EA is requested from the root object and it's not @@ -1681,15 +1589,15 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, } /** - * Verify LVM EA. + * lod_verify_md_striping() - Verify LVM EA. + * @lod: lod device + * @lum: a buffer storing LMV EA to verify * * Checks that the magic of the stripe is sane. * - * \param[in] lod lod device - * \param[in] lum a buffer storing LMV EA to verify - * - * \retval 0 if the EA is sane - * \retval negative otherwise + * Return: + * * %0 on success + * * %negative if failed */ static int lod_verify_md_striping(struct lod_device *lod, const struct lmv_user_md_v1 *lum) @@ -1707,13 +1615,11 @@ static int lod_verify_md_striping(struct lod_device *lod, } /** - * Initialize LMV EA for a slave. + * lod_prep_slave_lmv_md() - Initialize LMV EA for a slave. + * @master_lmv: a buffer containing master's EA + * @slave_lmv: a buffer where slave's EA will be stored [out] * * Initialize slave's LMV EA from the master's LMV EA. - * - * \param[in] master_lmv a buffer containing master's EA - * \param[out] slave_lmv a buffer where slave's EA will be stored - * */ static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv, const struct lmv_mds_md_v1 *master_lmv) @@ -1723,17 +1629,17 @@ static void lod_prep_slave_lmv_md(struct lmv_mds_md_v1 *slave_lmv, } /** - * Generate LMV EA. + * lod_prep_lmv_md() - Generate LMV EA. + * @env: execution environment + * @dt: object + * @lmv_buf: buffer storing generated LMV EA * * Generate LMV EA from the object passed as \a dt. The object must have * the stripes created and initialized. * - * \param[in] env execution environment - * \param[in] dt object - * \param[out] lmv_buf buffer storing generated LMV EA - * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, struct lu_buf *lmv_buf) @@ -1760,7 +1666,7 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, memset(info->lti_ea_store, 0, sizeof(*lmm1)); } - lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store; + lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_buf.lb_buf; memset(lmm1, 0, sizeof(*lmm1)); lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC); lmm1->lmv_stripe_count = cpu_to_le32(stripe_count); @@ -1777,26 +1683,27 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, RETURN(rc); lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx); - lmv_buf->lb_buf = info->lti_ea_store; + lmv_buf->lb_buf = info->lti_ea_buf.lb_buf; lmv_buf->lb_len = sizeof(*lmm1); RETURN(rc); } /** - * Create in-core represenation for a striped directory. + * lod_parse_dir_striping() - Create in-core represenation for a striped + * directory. + * @env: execution environment + * @lo: lod object + * @buf: buffer containing LMV EA * * Parse the buffer containing LMV EA and instantiate LU objects * representing the stripe objects. The pointers to the objects are * stored in ldo_stripe field of \a lo. This function is used when * we need to access an already created object (i.e. load from a disk). * - * \param[in] env execution environment - * \param[in] lo lod object - * \param[in] buf buffer containing LMV EA - * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, const struct lu_buf *buf) @@ -1868,6 +1775,7 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, } out: lo->ldo_stripe = stripe; + lo->ldo_is_foreign = 0; lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count); lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); lo->ldo_dir_layout_version = le32_to_cpu(lmv1->lmv_layout_version); @@ -1881,7 +1789,12 @@ out: } /** - * Declare create a striped directory. + * lod_dir_declare_create_stripes() - Declare create a striped directory. + * @env: execution environment + * @dt: object + * @attr: attributes to initialize the objects with + * @dof: type of objects to be created + * @th: transaction handle * * Declare creating a striped directory with a given stripe pattern on the * specified MDTs. A striped directory is represented as a regular directory @@ -1890,14 +1803,9 @@ out: * identifies it as a striped directory. The function allocates FIDs * for all stripes. * - * \param[in] env execution environment - * \param[in] dt object - * \param[in] attr attributes to initialize the objects with - * \param[in] dof type of objects to be created - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ static int lod_dir_declare_create_stripes(const struct lu_env *env, struct dt_object *dt, @@ -1929,7 +1837,7 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, slave_lmv_buf.lb_buf = slave_lmm; slave_lmv_buf.lb_len = sizeof(*slave_lmm); - if (!dt_try_as_dir(env, dt_object_child(dt))) + if (!dt_try_as_dir(env, dt_object_child(dt), false)) GOTO(out, rc = -EINVAL); rec->rec_type = S_IFDIR; @@ -1951,7 +1859,7 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (rc != 0) GOTO(out, rc); - if (!dt_try_as_dir(env, dto)) + if (!dt_try_as_dir(env, dto, false)) GOTO(out, rc = -EINVAL); rc = lod_sub_declare_ref_add(env, dto, th); @@ -1975,7 +1883,7 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (rc != 0) GOTO(out, rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && cfs_fail_val == i) snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", @@ -2013,9 +1921,9 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, GOTO(out, rc); } - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && cfs_fail_val == i) slave_lmm->lmv_master_mdt_index = cpu_to_le32(i + 1); @@ -2034,17 +1942,22 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (rc != 0) GOTO(out, rc); out: - if (slave_lmm != NULL) - OBD_FREE_PTR(slave_lmm); + OBD_FREE_PTR(slave_lmm); RETURN(rc); } /** - * Allocate a striping on a predefined set of MDTs. + * lod_mdt_alloc_specific() - Allocate a striping on a predefined set of MDTs. + * @env: execution environment for this thread + * @lo: LOD object + * @stripes: striping created + * @mdt_indices: MDT indices of striping created + * @is_specific: true if the MDTs are provided by lum; false if only the + * starting MDT index is provided * * Allocates new striping using the MDT index range provided by the data from - * the lum_obejcts contained in the lmv_user_md passed to this method if + * the lum_objects contained in the lmv_user_md passed to this method if * \a is_specific is true; or allocates new layout starting from MDT index in * lo->ldo_dir_stripe_offset. The exact order of MDTs is not important and * varies depending on MDT status. The number of stripes needed and stripe @@ -2053,16 +1966,9 @@ out: * release the stripes allocated. All the internal structures are protected, * but no concurrent allocation is allowed on the same objects. * - * \param[in] env execution environment for this thread - * \param[in] lo LOD object - * \param[out] stripes striping created - * \param[out] mdt_indices MDT indices of striping created - * \param[in] is_specific true if the MDTs are provided by lum; false if - * only the starting MDT index is provided - * - * \retval positive stripes allocated, including the first stripe allocated - * outside - * \retval negative errno on failure + * Return: + * * %positive stripes allocated, including the first stripe allocated outside + * * %negative errno on failure */ static int lod_mdt_alloc_specific(const struct lu_env *env, struct lod_object *lo, @@ -2097,11 +2003,17 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, bool already_allocated = false; __u32 k; - CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n", - idx, lod->lod_remote_mdt_count + 1, stripe_idx); + CDEBUG(D_INFO, + "try idx %d, mdt cnt %u, allocated %u, specific %d count %hu offset %d hash %#X\n", + idx, lod->lod_remote_mdt_count + 1, stripe_idx, + is_specific, lo->ldo_dir_stripe_count, + (int)lo->ldo_dir_stripe_offset, + lo->ldo_dir_hash_type); if (likely(!is_specific && - !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) { + !CFS_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE) && + !(lo->ldo_dir_hash_type & + LMV_HASH_FLAG_OVERSTRIPED))) { /* check whether the idx already exists * in current allocated array */ for (k = 0; k < stripe_idx; k++) { @@ -2141,6 +2053,9 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, /* this OSP doesn't feel well */ continue; + if (tgt->ltd_statfs.os_state & OS_STATFS_NOCREATE) + continue; + rc = dt_fid_alloc(env, tgt_dt, &fid, NULL, NULL); if (rc < 0) continue; @@ -2166,7 +2081,7 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, * remote MDT, otherwise we may save too many local * slave locks which will exceed RS_MAX_LOCKS. */ - if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) + if (unlikely(CFS_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) idx = master_index; mdt_indices[stripe_idx + 1] = (idx + 1) % (lod->lod_remote_mdt_count + 1); @@ -2177,7 +2092,7 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, LASSERT(fid_is_sane(&fid)); /* fail a remote stripe FID allocation */ - if (stripe_idx && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_FID)) + if (stripe_idx && CFS_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_FID)) continue; dto = dt_locate_at(env, tgt_dt, &fid, @@ -2214,6 +2129,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, struct dt_object **stripes; struct lu_object_conf conf = { .loc_flags = LOC_F_NEW }; struct lu_fid fid = { 0 }; + int mdt_count = lod->lod_remote_mdt_count + 1; __u32 stripe_count; int i; int rc = 0; @@ -2225,6 +2141,25 @@ static int lod_prep_md_striped_create(const struct lu_env *env, le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC); stripe_count = lo->ldo_dir_stripe_count; + /* silently clear OVERSTRIPED flag on single MDT system */ + if (mdt_count == 1) + lo->ldo_dir_hash_type &= ~LMV_HASH_FLAG_OVERSTRIPED; + if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) { + /* silently clamp stripe count if MDTs are not specific */ + if (stripe_count > mdt_count * lod->lod_max_stripes_per_mdt) { + if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) + stripe_count = mdt_count * + lod->lod_max_stripes_per_mdt; + else + RETURN(-E2BIG); + } + /* clear OVERSTRIPED if not overstriped */ + if (stripe_count <= mdt_count && + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) + lo->ldo_dir_hash_type &= ~LMV_HASH_FLAG_OVERSTRIPED; + } else if (stripe_count > mdt_count) { + RETURN(-E2BIG); + } OBD_ALLOC_PTR_ARRAY(stripes, stripe_count); if (!stripes) @@ -2255,7 +2190,35 @@ static int lod_prep_md_striped_create(const struct lu_env *env, GOTO(out, rc = -ENOMEM); if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) { + int stripes_per_mdt; + int mdt; + bool overstriped = false; + is_specific = true; + + /* Verify we do not exceed the stripes per MDT limit */ + for (mdt = 0; mdt < mdt_count + 1; mdt++) { + stripes_per_mdt = 0; + for (i = 0; i < stripe_count; i++) { + if (mdt == + le32_to_cpu(lum->lum_objects[i].lum_mds)) + stripes_per_mdt++; + } + if (stripes_per_mdt > + lod->lod_max_stripes_per_mdt) + GOTO(out_free, rc = -EINVAL); + if (stripes_per_mdt > 1) + overstriped = true; + } + if (!overstriped && + (lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED)) + lo->ldo_dir_hash_type &= + ~LMV_HASH_FLAG_OVERSTRIPED; + else if (overstriped && + !(lo->ldo_dir_hash_type & + LMV_HASH_FLAG_OVERSTRIPED)) + GOTO(out_free, rc = -EINVAL); + for (i = 0; i < stripe_count; i++) idx_array[i] = le32_to_cpu(lum->lum_objects[i].lum_mds); @@ -2266,6 +2229,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; rc = lod_mdt_alloc_specific(env, lo, stripes, idx_array, is_specific); +out_free: OBD_FREE_PTR_ARRAY(idx_array, stripe_count); } @@ -2299,44 +2263,110 @@ out: } /** + * lod_alloc_foreign_lov() - Alloc cached foreign LOV + * @lo: object + * @size: size of foreign LOV * - * Alloc cached foreign LMV - * - * \param[in] lo object - * \param[in] size size of foreign LMV + * Return: + * * %0 on success + * * %negative if failed + */ +int lod_alloc_foreign_lov(struct lod_object *lo, size_t size) +{ + OBD_ALLOC_LARGE(lo->ldo_foreign_lov, size); + if (lo->ldo_foreign_lov == NULL) + return -ENOMEM; + lo->ldo_foreign_lov_size = size; + lo->ldo_is_foreign = 1; + return 0; +} + +/** + * lod_free_foreign_lov() - Free cached foreign LOV + * @lo: object + */ +void lod_free_foreign_lov(struct lod_object *lo) +{ + if (lo->ldo_foreign_lov != NULL) + OBD_FREE_LARGE(lo->ldo_foreign_lov, lo->ldo_foreign_lov_size); + lo->ldo_foreign_lov = NULL; + lo->ldo_foreign_lov_size = 0; + lo->ldo_is_foreign = 0; +} + +/** + * lod_alloc_foreign_lmv() - Alloc cached foreign LMV + * @lo: object + * @size: size of foreign LMV * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ -int lod_alloc_foreign_lmv(struct lod_object *lo, size_t size) +static int lod_alloc_foreign_lmv(struct lod_object *lo, size_t size) { OBD_ALLOC_LARGE(lo->ldo_foreign_lmv, size); if (lo->ldo_foreign_lmv == NULL) return -ENOMEM; lo->ldo_foreign_lmv_size = size; - lo->ldo_dir_is_foreign = 1; + lo->ldo_is_foreign = 1; return 0; } +static int lod_prep_md_replayed_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + const struct lu_buf *lmv_buf, + struct dt_object_format *dof, + struct thandle *th) +{ + struct lod_object *lo = lod_dt_obj(dt); + int rc; + + ENTRY; + + mutex_lock(&lo->ldo_layout_mutex); + rc = lod_parse_dir_striping(env, lo, lmv_buf); + if (rc == 0) { + lo->ldo_dir_stripe_loaded = 1; + lo->ldo_dir_striped = 1; + rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th); + } + mutex_unlock(&lo->ldo_layout_mutex); + + RETURN(rc); +} + +/** + * lod_free_foreign_lmv() - Free cached foreign LMV + * @lo: object + */ +static void lod_free_foreign_lmv(struct lod_object *lo) +{ + if (lo->ldo_foreign_lmv != NULL) + OBD_FREE_LARGE(lo->ldo_foreign_lmv, lo->ldo_foreign_lmv_size); + lo->ldo_foreign_lmv = NULL; + lo->ldo_foreign_lmv_size = 0; + lo->ldo_is_foreign = 0; +} + /** - * Declare create striped md object. + * lod_declare_xattr_set_lmv() - Declare create striped md object. + * @env: execution environment + * @dt: object + * @attr: attributes to initialize the objects with + * @lum_buf: a pattern specifying the number of stripes and MDT to start from + * @dof: type of objects to be created + * @th: transaction handle * * The function declares intention to create a striped directory. This is a * wrapper for lod_prep_md_striped_create(). The only additional functionality - * is to verify pattern \a lum_buf is good. Check that function for the details. - * - * \param[in] env execution environment - * \param[in] dt object - * \param[in] attr attributes to initialize the objects with - * \param[in] lum_buf a pattern specifying the number of stripes and - * MDT to start from - * \param[in] dof type of objects to be created - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed + * is to verify pattern @lum_buf is good. Check that function for the details. * + * Return: + * * %0 on success + * * %negative if failed */ static int lod_declare_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, @@ -2345,63 +2375,70 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); - struct lmv_user_md_v1 *lum = lum_buf->lb_buf; - int rc; - ENTRY; + struct lod_object *lo = lod_dt_obj(dt); + struct lmv_user_md_v1 *lum = lum_buf->lb_buf; + int rc; + ENTRY; LASSERT(lum != NULL); - CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n", - le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count), - (int)le32_to_cpu(lum->lum_stripe_offset)); + CDEBUG(D_INFO, + "lum magic=%x hash=%x count=%u offset=%d inherit=%u rr=%u\n", + le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_hash_type), + le32_to_cpu(lum->lum_stripe_count), + (int)le32_to_cpu(lum->lum_stripe_offset), + lum->lum_max_inherit, lum->lum_max_inherit_rr); if (lo->ldo_dir_stripe_count == 0) { - if (lo->ldo_dir_is_foreign) { + if (lo->ldo_is_foreign) { rc = lod_alloc_foreign_lmv(lo, lum_buf->lb_len); if (rc != 0) - GOTO(out, rc); + RETURN(rc); memcpy(lo->ldo_foreign_lmv, lum, lum_buf->lb_len); lo->ldo_dir_stripe_loaded = 1; } - GOTO(out, rc = 0); + RETURN(0); } - /* prepare dir striped objects */ - rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th); - if (rc != 0) { + /* client replay striped directory creation with LMV, this happens when + * all involved MDTs were rebooted, or MDT recovery was aborted. + */ + if (le32_to_cpu(lum->lum_magic) == LMV_MAGIC_V1) + rc = lod_prep_md_replayed_create(env, dt, attr, lum_buf, dof, + th); + else + rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th); + if (rc != 0) /* failed to create striping, let's reset * config so that others don't get confused */ lod_striping_free(env, lo); - GOTO(out, rc); - } -out: + RETURN(rc); } /** - * Set or replace striped directory layout, and LFSCK may set layout on a plain - * directory, so don't check stripe count. - * - * \param[in] env execution environment - * \param[in] dt target object - * \param[in] buf LMV buf which contains source stripe fids - * \param[in] fl set or replace - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed + * lod_dir_layout_set() - Set or replace striped directory layout, and LFSCK + * may set layout on a plain directory, so don't check stripe count. + * @env: execution environment + * @dt: target object + * @lmv_buf: LMV buf which contains source stripe FIDs + * @fl: set or replace + * @th: transaction handle + * + * Return: + * * %0 on success + * * %negative if failed */ static int lod_dir_layout_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, + const struct lu_buf *lmv_buf, int fl, struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev); - struct lmv_mds_md_v1 *lmv = buf->lb_buf; + struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf; struct lmv_mds_md_v1 *slave_lmv; struct lu_buf slave_buf; int i; @@ -2421,7 +2458,7 @@ static int lod_dir_layout_set(const struct lu_env *env, LMV_DEBUG(D_INFO, lmv, "set"); - rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, fl, th); + rc = lod_sub_xattr_set(env, next, lmv_buf, XATTR_NAME_LMV, fl, th); if (rc) RETURN(rc); @@ -2460,7 +2497,7 @@ static int lod_dir_layout_set(const struct lu_env *env, RETURN(rc); } -/** +/* * Implementation of dt_object_operations::do_declare_xattr_set. * * Used with regular (non-striped) objects. Basically it @@ -2486,7 +2523,10 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { struct lmv_user_md_v1 *lum; - LASSERT(buf != NULL && buf->lb_buf != NULL); + LASSERT(buf != NULL); + if (!buf->lb_buf || buf->lb_len < sizeof(*lum)) + RETURN(-EFAULT); + lum = buf->lb_buf; rc = lod_verify_md_striping(d, lum); if (rc != 0) @@ -2579,6 +2619,7 @@ lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, ff->ff_layout.ol_comp_id = comp->llc_id; ff->ff_layout.ol_comp_start = comp->llc_extent.e_start; ff->ff_layout.ol_comp_end = comp->llc_extent.e_end; + ff->ff_layout_version = lo->ldo_layout_gen; filter_fid_cpu_to_le(ff, ff, sizeof(*ff)); if (data->locd_declare) @@ -2592,19 +2633,20 @@ lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, } /** - * Reset parent FID on OST object + * lod_replace_parent_fid() - Reset parent FID on OST object + * @env: execution environment + * @dt: dt_object whose stripes's parent FID will be reset + * @buf: buffer holding parent FID + * @th: thandle + * @declare: if it is declare * * Replace parent FID with @dt object FID, which is only called during migration * to reset the parent FID after the MDT object is migrated to the new MDT, i.e. * the FID is changed. * - * \param[in] env execution environment - * \param[in] dt dt_object whose stripes's parent FID will be reset - * \parem[in] th thandle - * \param[in] declare if it is declare - * - * \retval 0 if reset succeeds - * \retval negative errno if reset fails + * Return: + * * %0 on success + * * %negative if failed */ static int lod_replace_parent_fid(const struct lu_env *env, struct dt_object *dt, @@ -2642,23 +2684,32 @@ static int lod_replace_parent_fid(const struct lu_env *env, RETURN(rc); } -__u16 lod_comp_entry_stripe_count(struct lod_object *lo, - int comp_idx, bool is_dir) +__u16 lod_comp_entry_stripe_count(struct lod_object *lo, int comp_idx, + bool is_dir) { struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev); struct lod_layout_component *entry; + enum lod_uses_hint flags = LOD_USES_ASSIGNED_STRIPE; - if (is_dir) - return 0; + if (is_dir) { + entry = &lo->ldo_def_striping->lds_def_comp_entries[comp_idx]; + return entry->llc_ostlist.op_count; + } entry = &lo->ldo_comp_entries[comp_idx]; if (lod_comp_inited(entry)) return entry->llc_stripe_count; - else if ((__u16)-1 == entry->llc_stripe_count) - return lod->lod_ost_count; - else - return lod_get_stripe_count(lod, lo, comp_idx, - entry->llc_stripe_count, false); + if (entry->llc_stripe_count <= LOV_ALL_STRIPES && + entry->llc_stripe_count >= LOV_ALL_STRIPES_WIDE) + return lod_get_stripe_count_plain(lod, lo, + entry->llc_stripe_count, + entry->llc_pattern & + LOV_PATTERN_OVERSTRIPING, + &flags); + + return lod_get_stripe_count(lod, lo, comp_idx, entry->llc_stripe_count, + entry->llc_pattern & LOV_PATTERN_OVERSTRIPING, + &flags); } static int lod_comp_md_size(struct lod_object *lo, bool is_dir) @@ -2693,30 +2744,40 @@ static int lod_comp_md_size(struct lod_object *lo, bool is_dir) for (i = 0; i < comp_cnt; i++) { __u16 stripe_count; - magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1; - stripe_count = lod_comp_entry_stripe_count(lo, i, is_dir); - if (!is_dir && is_composite) - lod_comp_shrink_stripe_count(&comp_entries[i], - &stripe_count); - - size += lov_user_md_size(stripe_count, magic); + if (comp_entries[i].llc_magic == LOV_MAGIC_FOREIGN) { + size += lov_foreign_md_size(comp_entries[i].llc_length); + } else { + magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : + LOV_MAGIC_V1; + stripe_count = lod_comp_entry_stripe_count(lo, i, + is_dir); + if (!is_dir && is_composite) + lod_comp_shrink_stripe_count(&comp_entries[i], + &stripe_count); + if (is_dir && comp_entries[i].llc_ostlist.op_count) + magic = LOV_MAGIC_SPECIFIC; + + size += lov_user_md_size(stripe_count, magic); + } LASSERT(size % sizeof(__u64) == 0); } return size; } /** + * lod_declare_layout_add() - Declare component add + * @env: execution environment + * @dt: dt_object to add components on + * @buf: buffer contains components to be added + * @th: thandle + * * Declare component add. The xattr name is XATTR_LUSTRE_LOV.add, and * the xattr value is binary lov_comp_md_v1 which contains component(s) * to be added. - * - * \param[in] env execution environment - * \param[in] dt dt_object to add components on - * \param[in] buf buffer contains components to be added - * \parem[in] th thandle * - * \retval 0 on success - * \retval negative errno on failure + * Return: + * * %0 on success + * * %negative if failed */ static int lod_declare_layout_add(const struct lu_env *env, struct dt_object *dt, @@ -2729,7 +2790,6 @@ static int lod_declare_layout_add(const struct lu_env *env, struct dt_object *next = dt_object_child(dt); struct lov_desc *desc = &d->lod_ost_descs.ltd_lov_desc; struct lod_object *lo = lod_dt_obj(dt); - struct lov_user_md_v3 *v3; struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; __u32 magic; int i, rc, array_cnt, old_array_cnt; @@ -2756,7 +2816,7 @@ static int lod_declare_layout_add(const struct lu_env *env, mutex_lock(&lo->ldo_layout_mutex); array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count; - OBD_ALLOC_PTR_ARRAY(comp_array, array_cnt); + OBD_ALLOC_PTR_ARRAY_LARGE(comp_array, array_cnt); if (comp_array == NULL) { mutex_unlock(&lo->ldo_layout_mutex); RETURN(-ENOMEM); @@ -2780,13 +2840,38 @@ static int lod_declare_layout_add(const struct lu_env *env, lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; lod_comp->llc_flags = comp_v1->lcm_entries[i].lcme_flags; - lod_comp->llc_stripe_count = v1->lmm_stripe_count; lod_comp->llc_stripe_size = v1->lmm_stripe_size; + lod_comp->llc_stripe_count = v1->lmm_stripe_count; + lod_comp->llc_pattern = v1->lmm_pattern; + /** + * limit stripe count so that it's less than/equal to + * extent_size / stripe_size. + * + * Note: extension size reused llc_stripe_size field and + * uninstantiated component could be defined with + * extent_start == extent_end as extension component will + * expand it later. + */ + if (!(lod_comp->llc_flags & LCME_FL_EXTENSION) && + (lod_comp_inited(lod_comp) || + lod_comp->llc_extent.e_start < + lod_comp->llc_extent.e_end) && + !(lod_comp->llc_stripe_count <= LOV_ALL_STRIPES && + lod_comp->llc_stripe_count >= LOV_ALL_STRIPES_WIDE) && + ext->e_end != OBD_OBJECT_EOF && + (__u64)(lod_comp->llc_stripe_count * + lod_comp->llc_stripe_size) > + (ext->e_end - ext->e_start)) + lod_comp->llc_stripe_count = + DIV_ROUND_UP(ext->e_end - ext->e_start, + lod_comp->llc_stripe_size); lod_adjust_stripe_info(lod_comp, desc, 0); if (v1->lmm_magic == LOV_USER_MAGIC_V3) { - v3 = (struct lov_user_md_v3 *) v1; - if (v3->lmm_pool_name[0] != '\0') { + struct lov_user_md_v3 *v3 = (typeof(*v3) *) v1; + + if (v3->lmm_pool_name[0] != '\0' && + !lov_pool_is_ignored(v3->lmm_pool_name)) { rc = lod_set_pool(&lod_comp->llc_pool, v3->lmm_pool_name); if (rc) @@ -2813,7 +2898,7 @@ static int lod_declare_layout_add(const struct lu_env *env, GOTO(error, rc); } - OBD_FREE_PTR_ARRAY(old_array, old_array_cnt); + OBD_FREE_PTR_ARRAY_LARGE(old_array, old_array_cnt); LASSERT(lo->ldo_mirror_count == 1); lo->ldo_mirrors[0].lme_end = array_cnt - 1; @@ -2831,7 +2916,7 @@ error: lod_comp->llc_pool = NULL; } } - OBD_FREE_PTR_ARRAY(comp_array, array_cnt); + OBD_FREE_PTR_ARRAY_LARGE(comp_array, array_cnt); mutex_unlock(&lo->ldo_layout_mutex); RETURN(rc); @@ -2840,7 +2925,7 @@ error: /** * lod_last_non_stale_mirror() - Check if a mirror is the last non-stale mirror. * @mirror_id: Mirror id to be checked. - * @lo: LOD object. + * @lo: LOD object. * * This function checks if a mirror with specified @mirror_id is the last * non-stale mirror of a LOD object @lo. @@ -2874,21 +2959,22 @@ bool lod_last_non_stale_mirror(__u16 mirror_id, struct lod_object *lo) } /** + * lod_declare_layout_set() - Declare component set + * @env: execution environment + * @dt: dt_object to be modified + * @op: operation string, like "set.flags" + * @buf: buffer contains components to be set + * @th: thandle + * * Declare component set. The xattr is name XATTR_LUSTRE_LOV.set.$field, * the '$field' can only be 'flags' now. The xattr value is binary * lov_comp_md_v1 which contains the component ID(s) and the value of - * the field to be modified. - * Please update allowed_lustre_lov macro if $field groks more values - * in the future. - * - * \param[in] env execution environment - * \param[in] dt dt_object to be modified - * \param[in] op operation string, like "set.flags" - * \param[in] buf buffer contains components to be set - * \parem[in] th thandle + * the field to be modified. Please update allowed_lustre_lov macro if $field + * groks more values in the future. * - * \retval 0 on success - * \retval negative errno on failure + * Return: + * * %0 on success + * * %negative if failed */ static int lod_declare_layout_set(const struct lu_env *env, struct dt_object *dt, @@ -2997,16 +3083,17 @@ static int lod_declare_layout_set(const struct lu_env *env, } /** - * Declare component deletion. The xattr name is XATTR_LUSTRE_LOV.del, - * and the xattr value is a unique component ID or a special lcme_id. - * - * \param[in] env execution environment - * \param[in] dt dt_object to be operated on - * \param[in] buf buffer contains component ID or lcme_id - * \parem[in] th thandle - * - * \retval 0 on success - * \retval negative errno on failure + * lod_declare_layout_del() - Declare component deletion. The xattr name is + * XATTR_LUSTRE_LOV.del, and the xattr value is a unique component ID or a + * special lcme_id. + * @env: execution environment + * @dt: dt_object to be operated on + * @buf: buffer contains component ID or lcme_id + * @th: thandle + * + * Return: + * * %0 on success + * * %negative if failed */ static int lod_declare_layout_del(const struct lu_env *env, struct dt_object *dt, @@ -3140,20 +3227,21 @@ static int lod_declare_layout_del(const struct lu_env *env, } /** - * Declare layout add/set/del operations issued by special xattr names: - * + * lod_declare_modify_layout() - Declare layout add/set/del + * @env: execution environment + * @dt: object + * @name: name of xattr + * @buf: lu_buf contains xattr value + * @th: transaction handle + * + * operations issued by special xattr names: * XATTR_LUSTRE_LOV.add add component(s) to existing file * XATTR_LUSTRE_LOV.del delete component(s) from existing file * XATTR_LUSTRE_LOV.set.$field set specified field of certain component(s) * - * \param[in] env execution environment - * \param[in] dt object - * \param[in] name name of xattr - * \param[in] buf lu_buf contains xattr value - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ static int lod_declare_modify_layout(const struct lu_env *env, struct dt_object *dt, @@ -3207,16 +3295,15 @@ unlock: } /** - * Convert a plain file lov_mds_md to a composite layout. + * lod_layout_convert() - Convert a plain file lov_mds_md to a composite layout + * @info: the thread info::lti_ea_store buffer contains little endian plain file + * layout [in,out] * - * \param[in,out] info the thread info::lti_ea_store buffer contains little - * endian plain file layout - * - * \retval 0 on success, <0 on failure + * Returns 0 on success, <0 on failure */ -static int lod_layout_convert(struct lod_thread_info *info) +static int lod_layout_convert(struct lu_buf *buf) { - struct lov_mds_md *lmm = info->lti_ea_store; + struct lov_mds_md *lmm = buf->lb_buf; struct lov_mds_md *lmm_save; struct lov_comp_md_v1 *lcm; struct lov_comp_md_entry_v1 *lcme; @@ -3236,13 +3323,11 @@ static int lod_layout_convert(struct lod_thread_info *info) memcpy(lmm_save, lmm, blob_size); - if (info->lti_ea_store_size < size) { - rc = lod_ea_store_resize(info, size); - if (rc) - GOTO(out, rc); - } + lu_buf_check_and_alloc(buf, size_roundup_power2(size)); + if (buf->lb_buf == NULL) + GOTO(out, rc = -ENOMEM); - lcm = info->lti_ea_store; + lcm = buf->lb_buf; memset(lcm, 0, sizeof(*lcm) + sizeof(*lcme)); lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1); lcm->lcm_size = cpu_to_le32(size); @@ -3252,6 +3337,7 @@ static int lod_layout_convert(struct lod_thread_info *info) lcm->lcm_entry_count = cpu_to_le16(1); lcme = &lcm->lcm_entries[0]; + lcme->lcme_id = cpu_to_le32(1); lcme->lcme_flags = cpu_to_le32(LCME_FL_INIT); lcme->lcme_extent.e_start = 0; lcme->lcme_extent.e_end = cpu_to_le64(OBD_OBJECT_EOF); @@ -3267,15 +3353,15 @@ out: return rc; } -/** +/* * Merge layouts to form a mirrored file. */ static int lod_declare_layout_merge(const struct lu_env *env, - struct dt_object *dt, const struct lu_buf *mbuf, - struct thandle *th) -{ + struct dt_object *dt, + const struct lu_buf *mbuf, + struct thandle *th) +{ struct lod_thread_info *info = lod_env_info(env); - struct lu_attr *layout_attr = &info->lti_layout_attr; struct lu_buf *buf = &info->lti_buf; struct lod_object *lo = lod_dt_obj(dt); struct lov_comp_md_v1 *lcm; @@ -3314,11 +3400,11 @@ static int lod_declare_layout_merge(const struct lu_env *env, if (rc <= 0) RETURN(rc ? : -ENODATA); - cur_lcm = info->lti_ea_store; + cur_lcm = info->lti_ea_buf.lb_buf; switch (le32_to_cpu(cur_lcm->lcm_magic)) { case LOV_MAGIC_V1: case LOV_MAGIC_V3: - rc = lod_layout_convert(info); + rc = lod_layout_convert(&info->lti_ea_buf); break; case LOV_MAGIC_COMP_V1: case LOV_MAGIC_SEL: @@ -3330,8 +3416,8 @@ static int lod_declare_layout_merge(const struct lu_env *env, if (rc) RETURN(rc); - /* info->lti_ea_store could be reallocated in lod_layout_convert() */ - cur_lcm = info->lti_ea_store; + /* info->lti_ea_buf could be reallocated in lod_layout_convert() */ + cur_lcm = info->lti_ea_buf.lb_buf; cur_entry_count = le16_to_cpu(cur_lcm->lcm_entry_count); /* 'lcm_mirror_count + 1' is the current # of mirrors the file has */ @@ -3384,7 +3470,7 @@ static int lod_declare_layout_merge(const struct lu_env *env, /* check if first entry in new layout is DOM */ lmm = (struct lov_mds_md_v1 *)((char *)merge_lcm + merge_lcm->lcm_entries[0].lcme_offset); - merge_has_dom = lov_pattern(le32_to_cpu(lmm->lmm_pattern)) == + merge_has_dom = lov_pattern(le32_to_cpu(lmm->lmm_pattern)) & LOV_PATTERN_MDT; for (i = 0; i < merge_entry_count; i++) { @@ -3413,52 +3499,38 @@ static int lod_declare_layout_merge(const struct lu_env *env, lcm->lcm_entry_count = cpu_to_le16(cur_entry_count + merge_entry_count); lcm->lcm_mirror_count = cpu_to_le16(mirror_count); if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NONE) - lcm->lcm_flags = cpu_to_le32(LCM_FL_RDONLY); + lcm->lcm_flags = cpu_to_le16((le16_to_cpu(lcm->lcm_flags) & + ~LCM_FL_FLR_MASK) | + LCM_FL_RDONLY); - rc = lod_striping_reload(env, lo, buf); + rc = lod_striping_reload(env, lo, buf, 0); if (rc) GOTO(out, rc); lod_obj_inc_layout_gen(lo); lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); - /* transfer layout version to OST objects. */ - if (lo->ldo_mirror_count > 1) { - struct lod_obj_stripe_cb_data data = { {0} }; - - layout_attr->la_valid = LA_LAYOUT_VERSION; - layout_attr->la_layout_version = 0; - data.locd_attr = layout_attr; - data.locd_declare = true; - data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; - rc = lod_obj_for_each_stripe(env, lo, th, &data); - if (rc) - GOTO(out, rc); - } - rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), buf, - XATTR_NAME_LOV, LU_XATTR_REPLACE, th); + XATTR_NAME_LOV, LU_XATTR_REPLACE, th); out: lu_buf_free(buf); RETURN(rc); } -/** +/* * Split layouts, just set the LOVEA with the layout from mbuf. */ static int lod_declare_layout_split(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *mbuf, struct thandle *th) { - struct lod_thread_info *info = lod_env_info(env); - struct lu_attr *layout_attr = &info->lti_layout_attr; struct lod_object *lo = lod_dt_obj(dt); struct lov_comp_md_v1 *lcm = mbuf->lb_buf; int rc; ENTRY; - rc = lod_striping_reload(env, lo, mbuf); + rc = lod_striping_reload(env, lo, mbuf, LVF_ALL_STALE); if (rc) RETURN(rc); @@ -3466,21 +3538,6 @@ static int lod_declare_layout_split(const struct lu_env *env, /* fix on-disk layout gen */ lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); - - /* transfer layout version to OST objects. */ - if (lo->ldo_mirror_count > 1) { - struct lod_obj_stripe_cb_data data = { {0} }; - - layout_attr->la_valid = LA_LAYOUT_VERSION; - layout_attr->la_layout_version = 0; - data.locd_attr = layout_attr; - data.locd_declare = true; - data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; - rc = lod_obj_for_each_stripe(env, lo, th, &data); - if (rc) - RETURN(rc); - } - rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), mbuf, XATTR_NAME_LOV, LU_XATTR_REPLACE, th); RETURN(rc); @@ -3628,7 +3685,7 @@ out: RETURN(rc); } -/** +/* * Purge layouts, delete sub objects in the mirror stored in the vic_buf, * and set the LOVEA with the layout from mbuf. */ @@ -3676,7 +3733,7 @@ static int lod_layout_purge(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -/** +/* * Implementation of dt_object_operations::do_declare_xattr_set. * * \see dt_object_operations::do_declare_xattr_set() in the API description @@ -3688,12 +3745,14 @@ static int lod_layout_purge(const struct lu_env *env, struct dt_object *dt, */ static int lod_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, const struct lu_buf *buf, const char *name, int fl, struct thandle *th) { + struct lod_thread_info *info = lod_env_info(env); struct dt_object *next = dt_object_child(dt); - struct lu_attr *attr = &lod_env_info(env)->lti_attr; + struct lod_object *lo = lod_dt_obj(dt); __u32 mode; int rc; ENTRY; @@ -3704,6 +3763,8 @@ static int lod_declare_xattr_set(const struct lu_env *env, LU_XATTR_PURGE)) && (strcmp(name, XATTR_NAME_LOV) == 0 || strcmp(name, XATTR_LUSTRE_LOV) == 0)) { + struct lu_attr *lattr = &lod_env_info(env)->lti_attr; + /* * this is a request to create object's striping. * @@ -3714,15 +3775,15 @@ static int lod_declare_xattr_set(const struct lu_env *env, * LU_XATTR_REPLACE is set to indicate a layout swap */ if (dt_object_exists(dt)) { - rc = dt_attr_get(env, next, attr); + rc = dt_attr_get(env, next, lattr); if (rc) RETURN(rc); } else { - memset(attr, 0, sizeof(*attr)); - attr->la_valid = LA_TYPE | LA_MODE; - attr->la_mode = S_IFREG; + memset(lattr, 0, sizeof(*lattr)); + lattr->la_valid = LA_TYPE | LA_MODE; + lattr->la_mode = S_IFREG; } - rc = lod_declare_striped_create(env, dt, attr, buf, th); + rc = lod_declare_striped_create(env, dt, lattr, buf, th); } else if (fl & LU_XATTR_MERGE) { LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || strcmp(name, XATTR_LUSTRE_LOV) == 0); @@ -3754,23 +3815,28 @@ static int lod_declare_xattr_set(const struct lu_env *env, rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); } + if (rc == 0 && + (strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0 || allowed_lustre_lov(name))) + rc = lod_save_layout_gen_intrans(info, lo); + RETURN(rc); } /** - * Apply xattr changes to the object. + * lod_xattr_set_internal() - Apply xattr changes to the object. + * @env: execution environment + * @dt: object + * @buf: buffer pointing to the new value of xattr + * @name: name of xattr + * @fl: flags + * @th: transaction handle * * Applies xattr changes to the object and the stripes if the latter exist. * - * \param[in] env execution environment - * \param[in] dt object - * \param[in] buf buffer pointing to the new value of xattr - * \param[in] name name of xattr - * \param[in] fl flags - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ static int lod_xattr_set_internal(const struct lu_env *env, struct dt_object *dt, @@ -3812,17 +3878,17 @@ static int lod_xattr_set_internal(const struct lu_env *env, } /** - * Delete an extended attribute. + * lod_xattr_del_internal() - Delete an extended attribute. + * @env: execution environment + * @dt: object + * @name: name of xattr + * @th: transaction handle * * Deletes specified xattr from the object and the stripes if the latter exist. * - * \param[in] env execution environment - * \param[in] dt object - * \param[in] name name of xattr - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ static int lod_xattr_del_internal(const struct lu_env *env, struct dt_object *dt, @@ -3858,22 +3924,23 @@ static int lod_xattr_del_internal(const struct lu_env *env, } /** - * Set default striping on a directory. + * lod_xattr_set_lov_on_dir() - Set default striping on a directory. + * @env: execution environment + * @dt: the striped object + * @buf: buffer with the striping + * @name: name of EA + * @fl: xattr flag (see OSD API description) + * @th: transaction handle * * Sets specified striping on a directory object unless it matches the default * striping (LOVEA_DELETE_VALUES() macro). In the latter case remove existing * EA. This striping will be used when regular file is being created in this * directory. * - * \param[in] env execution environment - * \param[in] dt the striped object - * \param[in] buf buffer with the striping - * \param[in] name name of EA - * \param[in] fl xattr flag (see OSD API description) - * \param[in] th transaction handle * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, struct dt_object *dt, @@ -3895,9 +3962,11 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, case LOV_USER_MAGIC_SPECIFIC: case LOV_USER_MAGIC_V3: v3 = buf->lb_buf; - if (v3->lmm_pool_name[0] != '\0') + if (lov_pool_is_reserved(v3->lmm_pool_name)) + memset(v3->lmm_pool_name, 0, sizeof(v3->lmm_pool_name)); + else if (v3->lmm_pool_name[0] != '\0') pool_name = v3->lmm_pool_name; - /* fall through */ + fallthrough; case LOV_USER_MAGIC_V1: /* if { size, offset, count } = { 0, -1, 0 } and no pool * (i.e. all default values specified) then delete default @@ -3948,23 +4017,208 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, RETURN(rc); } +static int lod_get_default_lov_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds, + struct dt_allocation_hint *ah); + +/** + * embed_pool_to_comp_v1() - Helper function to convert compound layout to + * compound layout with pool + * @src: source layout + * @pool: pool to use in \a tgt + * @tgt: target layout [out] + * + * Copy lcm_entries array of @src to @tgt. Replace lov_user_md_v1 + * components of @src with lov_user_md_v3 using @pool. + */ +static void embed_pool_to_comp_v1(const struct lov_comp_md_v1 *src, + const char *pool, + struct lov_comp_md_v1 *tgt) +{ + size_t shift; + struct lov_user_md_v1 *lum; + struct lov_user_md_v3 *lum3; + struct lov_comp_md_entry_v1 *entry; + int i; + __u32 offset; + + entry = tgt->lcm_entries; + shift = 0; + for (i = 0; i < le16_to_cpu(src->lcm_entry_count); i++, entry++) { + *entry = src->lcm_entries[i]; + offset = le32_to_cpu(src->lcm_entries[i].lcme_offset); + entry->lcme_offset = cpu_to_le32(offset + shift); + + lum = (struct lov_user_md_v1 *)((char *)src + offset); + lum3 = (struct lov_user_md_v3 *)((char *)tgt + offset + shift); + *(struct lov_user_md_v1 *)lum3 = *lum; + if (lum->lmm_pattern & cpu_to_le32(LOV_PATTERN_MDT)) { + lum3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1); + } else { + lum3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); + entry->lcme_size = cpu_to_le32(sizeof(*lum3)); + strscpy(lum3->lmm_pool_name, pool, + sizeof(lum3->lmm_pool_name)); + shift += sizeof(*lum3) - sizeof(*lum); + } + } +} + /** - * Set default striping on a directory object. + * lod_xattr_set_default_lov_on_dir() - Set default striping on a directory. + * @env: execution environment + * @dt: the striped object + * @buf: buffer with the striping + * @name: name of EA + * @fl: xattr flag (see OSD API description) + * @th: transaction handle * * Sets specified striping on a directory object unless it matches the default * striping (LOVEA_DELETE_VALUES() macro). In the latter case remove existing - * EA. This striping will be used when a new directory is being created in the + * EA. This striping will be used when regular file is being created in this * directory. + * If current default striping includes a pool but specifed striping + * does not - retain the pool if it exists. + * + * Return: + * * %0 on success + * * %negative if failed + */ +static int lod_xattr_set_default_lov_on_dir(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct lod_default_striping *lds = lod_lds_buf_get(env); + struct lov_user_md_v1 *v1 = buf->lb_buf; + char pool[LOV_MAXPOOLNAME + 1]; + bool is_del; + int rc; + + ENTRY; + + /* get existing striping config */ + rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds, NULL); + if (rc) + RETURN(rc); + + memset(pool, 0, sizeof(pool)); + if (lds->lds_def_striping_set == 1) + lod_layout_get_pool(lds->lds_def_comp_entries, + lds->lds_def_comp_cnt, pool, + sizeof(pool)); + + is_del = LOVEA_DELETE_VALUES(v1->lmm_stripe_size, + v1->lmm_stripe_count, + v1->lmm_stripe_offset, + NULL); + + /* Retain the pool name if it is not given */ + if (v1->lmm_magic == LOV_USER_MAGIC_V1 && pool[0] != '\0' && + !is_del) { + struct lod_thread_info *info = lod_env_info(env); + struct lov_user_md_v3 *v3 = info->lti_ea_buf.lb_buf; + + memset(v3, 0, sizeof(*v3)); + v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); + v3->lmm_pattern = cpu_to_le32(v1->lmm_pattern); + v3->lmm_stripe_count = cpu_to_le32(v1->lmm_stripe_count); + v3->lmm_stripe_offset = cpu_to_le32(v1->lmm_stripe_offset); + v3->lmm_stripe_size = cpu_to_le32(v1->lmm_stripe_size); + + strscpy(v3->lmm_pool_name, pool, sizeof(v3->lmm_pool_name)); + + info->lti_buf.lb_buf = v3; + info->lti_buf.lb_len = sizeof(*v3); + rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, + name, fl, th); + } else if (v1->lmm_magic == LOV_USER_MAGIC_COMP_V1 && + pool[0] != '\0' && !is_del) { + /* + * try to retain the pool from default layout if the + * specified component layout does not provide pool + * info explicitly + */ + struct lod_thread_info *info = lod_env_info(env); + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + struct lov_comp_md_v1 *comp_v1p; + struct lov_user_md_v1 *lum; + int entry_count; + int i; + __u32 offset; + struct lov_comp_md_entry_v1 *entry; + int size; + + entry_count = le16_to_cpu(comp_v1->lcm_entry_count); + size = sizeof(*comp_v1) + + entry_count * sizeof(comp_v1->lcm_entries[0]); + entry = comp_v1->lcm_entries; + for (i = 0; i < entry_count; i++, entry++) { + offset = le32_to_cpu(entry->lcme_offset); + lum = (struct lov_user_md_v1 *)((char *)comp_v1 + + offset); + if (le32_to_cpu(lum->lmm_magic) != LOV_USER_MAGIC_V1) + /* the i-th component includes pool info */ + break; + if (lum->lmm_pattern & cpu_to_le32(LOV_PATTERN_MDT)) + size += sizeof(struct lov_user_md_v1); + else + size += sizeof(struct lov_user_md_v3); + } + + if (i == entry_count) { + /* + * re-compose the layout to include the pool for + * each component + */ + if (info->lti_ea_store_size < size) + rc = lod_ea_store_resize(info, size); + + if (rc == 0) { + comp_v1p = info->lti_ea_buf.lb_buf; + *comp_v1p = *comp_v1; + comp_v1p->lcm_size = cpu_to_le32(size); + embed_pool_to_comp_v1(comp_v1, pool, comp_v1p); + + info->lti_buf.lb_buf = comp_v1p; + info->lti_buf.lb_len = size; + rc = lod_xattr_set_lov_on_dir(env, dt, + &info->lti_buf, + name, fl, th); + } + } else { + rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, + th); + } + } else { + rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th); + } + + if (lds->lds_def_striping_set == 1 && lds->lds_def_comp_entries != NULL) + lod_free_def_comp_entries(lds); + + RETURN(rc); +} + +/** + * lod_xattr_set_default_lmv_on_dir() - Set default striping on directory object + * @env: execution environment + * @dt: the striped object + * @buf: buffer with the striping + * @name: name of EA + * @fl: xattr flag (see OSD API description) + * @th: transaction handle * - * \param[in] env execution environment - * \param[in] dt the striped object - * \param[in] buf buffer with the striping - * \param[in] name name of EA - * \param[in] fl xattr flag (see OSD API description) - * \param[in] th transaction handle + * Sets specified striping on a directory object unless it matches the default + * striping (LOVEA_DELETE_VALUES() macro). In the latter case remove existing + * EA. This striping will be used when a new directory is being created in the + * directory. * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, struct dt_object *dt, @@ -4002,7 +4256,13 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, } /** - * Turn directory into a striped directory. + * lod_xattr_set_lmv() - Turn directory into a striped directory. + * @env: execution environment + * @dt: the striped object + * @buf: buf lmv_user_md for create, or lmv_mds_md for replay + * @name: not used currently + * @fl: xattr flag (see OSD API description) + * @th: transaction handle * * During replay the client sends the striping created before MDT * failure, then the layer above LOD sends this defined striping @@ -4010,40 +4270,37 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, * of the stripes. Notice the original information for the striping * (#stripes, FIDs, etc) was transferred in declare path. * - * \param[in] env execution environment - * \param[in] dt the striped object - * \param[in] buf not used currently - * \param[in] name not used currently - * \param[in] fl xattr flag (see OSD API description) - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); - struct lod_thread_info *info = lod_env_info(env); - struct lu_attr *attr = &info->lti_attr; + struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *attr = &info->lti_attr; struct dt_object_format *dof = &info->lti_format; - struct lu_buf lmv_buf; - struct lu_buf slave_lmv_buf; - struct lmv_mds_md_v1 *lmm; - struct lmv_mds_md_v1 *slave_lmm = NULL; - struct dt_insert_rec *rec = &info->lti_dt_rec; - int i; - int rc; + struct lu_buf lmv_buf; + struct lu_buf slave_lmv_buf; + struct lmv_user_md *lum = buf->lb_buf; + struct lmv_mds_md_v1 *lmm; + struct lmv_mds_md_v1 *slave_lmm = NULL; + struct dt_insert_rec *rec = &info->lti_dt_rec; + int i; + int rc; ENTRY; + /* lum is used to know whether it's replay */ + LASSERT(lum); if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(-ENOTDIR); /* The stripes are supposed to be allocated in declare phase, * if there are no stripes being allocated, it will skip */ if (lo->ldo_dir_stripe_count == 0) { - if (lo->ldo_dir_is_foreign) { + if (lo->ldo_is_foreign) { rc = lod_sub_xattr_set(env, dt_object_child(dt), buf, XATTR_NAME_LMV, fl, th); if (rc != 0) @@ -4056,8 +4313,8 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, if (rc != 0) RETURN(rc); - attr->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_FLAGS | - LA_MODE | LA_UID | LA_GID | LA_TYPE | LA_PROJID; + attr->la_valid &= LA_ATIME | LA_MTIME | LA_CTIME | LA_FLAGS | + LA_MODE | LA_UID | LA_GID | LA_TYPE | LA_PROJID; dof->dof_type = DFT_DIR; rc = lod_prep_lmv_md(env, dt, &lmv_buf); @@ -4080,15 +4337,25 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, struct lu_name *sname; struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; + bool stripe_created = false; /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */ if (!dto) continue; /* fail a remote stripe creation */ - if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE)) + if (i && CFS_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE)) continue; + /* if it's replay by client request, and stripe exists on remote + * MDT, it means mkdir was partially executed: stripe was + * created on remote MDT successfully, but target not in last + * run. + */ + if (unlikely((le32_to_cpu(lum->lum_magic) == LMV_MAGIC_V1) && + dt_object_exists(dto) && dt_object_remote(dto))) + stripe_created = true; + /* don't create stripe if: * 1. it's source stripe of migrating directory * 2. it's existed stripe of splitting directory @@ -4097,7 +4364,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, (lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) { if (!dt_object_exists(dto)) GOTO(out, rc = -EINVAL); - } else { + } else if (!stripe_created) { dt_write_lock(env, dto, DT_TGT_CHILD); rc = lod_sub_create(env, dto, attr, NULL, dof, th); if (rc != 0) { @@ -4118,9 +4385,9 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && cfs_fail_val == i) slave_lmm->lmv_master_mdt_index = cpu_to_le32(i + 1); @@ -4143,13 +4410,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, lo->ldo_dir_split_offset > i) continue; - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out, rc); - - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && cfs_fail_val == i) snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", PFID(lu_object_fid(&dto->do_lu)), i + 1); @@ -4157,23 +4418,38 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", PFID(lu_object_fid(&dto->do_lu)), i); - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_links_new(&ldata, &info->lti_linkea_buf, - sname, lu_object_fid(&dt->do_lu)); - if (rc != 0) - GOTO(out, rc); + if (!stripe_created) { + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc != 0) + GOTO(out, rc); - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = lod_sub_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc != 0) - GOTO(out, rc); + sname = lod_name_get(env, stripe_name, + strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(out, rc); + } rec->rec_fid = lu_object_fid(&dto->do_lu); rc = lod_sub_insert(env, dt_object_child(dt), (const struct dt_rec *)rec, (const struct dt_key *)stripe_name, th); + if (rc == -EEXIST) { + CDEBUG(D_INFO, DFID": can't insert stripe %i "DFID"\n", + PFID(lod_object_fid(lo)), i, + PFID(lu_object_fid(&dt->do_lu))); + continue; + } if (rc != 0) GOTO(out, rc); @@ -4182,18 +4458,25 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV)) + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV)) rc = lod_sub_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV, fl, th); out: - if (slave_lmm != NULL) - OBD_FREE_PTR(slave_lmm); + OBD_FREE_PTR(slave_lmm); RETURN(rc); } /** - * Helper function to declare/execute creation of a striped directory + * lod_dir_striping_create_internal() - Helper function to declare/execute + * creation of a striped directory + * @env: execution environment + * @dt: object + * @attr: attributes the stripes will be created with + * @lmu: lmv_user_md if MDT indices are specified + * @dof: format of stripes (see OSD API description) + * @th: transaction handle + * @declare: where to call "declare" or "execute" methods * * Called in declare/create object path, prepare striping for a directory * and prepare defaults data striping for the objects to be created in @@ -4204,16 +4487,9 @@ out: * arguments for the both phases are the same and this is the reason for * this function to exist. * - * \param[in] env execution environment - * \param[in] dt object - * \param[in] attr attributes the stripes will be created with - * \param[in] lmu lmv_user_md if MDT indices are specified - * \param[in] dof format of stripes (see OSD API description) - * \param[in] th transaction handle - * \param[in] declare where to call "declare" or "execute" methods - * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ static int lod_dir_striping_create_internal(const struct lu_env *env, struct dt_object *dt, @@ -4232,18 +4508,20 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, LASSERT(ergo(lds != NULL, lds->lds_def_striping_set || lds->lds_dir_def_striping_set)); + LASSERT(lmu); if (!LMVEA_DELETE_VALUES(lo->ldo_dir_stripe_count, lo->ldo_dir_stripe_offset)) { - if (!lmu) { - struct lmv_user_md_v1 *v1 = info->lti_ea_store; + if (!lmu->lb_buf) { + /* mkdir by default LMV */ + struct lmv_user_md_v1 *v1 = info->lti_ea_buf.lb_buf; int stripe_count = lo->ldo_dir_stripe_count; if (info->lti_ea_store_size < sizeof(*v1)) { rc = lod_ea_store_resize(info, sizeof(*v1)); if (rc != 0) RETURN(rc); - v1 = info->lti_ea_store; + v1 = info->lti_ea_buf.lb_buf; } memset(v1, 0, sizeof(*v1)); @@ -4265,25 +4543,22 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, th); if (rc != 0) RETURN(rc); - } else { + } else if (lmu->lb_buf) { /* foreign LMV EA case */ - if (lmu) { + if (declare) { struct lmv_foreign_md *lfm = lmu->lb_buf; - if (lfm->lfm_magic == LMV_MAGIC_FOREIGN) { + if (lfm->lfm_magic == LMV_MAGIC_FOREIGN) rc = lod_declare_xattr_set_lmv(env, dt, attr, lmu, dof, th); - } - } else { - if (lo->ldo_dir_is_foreign) { - LASSERT(lo->ldo_foreign_lmv != NULL && - lo->ldo_foreign_lmv_size > 0); - info->lti_buf.lb_buf = lo->ldo_foreign_lmv; - info->lti_buf.lb_len = lo->ldo_foreign_lmv_size; - lmu = &info->lti_buf; - rc = lod_xattr_set_lmv(env, dt, lmu, - XATTR_NAME_LMV, 0, th); - } + } else if (lo->ldo_is_foreign) { + LASSERT(lo->ldo_foreign_lmv != NULL && + lo->ldo_foreign_lmv_size > 0); + info->lti_buf.lb_buf = lo->ldo_foreign_lmv; + info->lti_buf.lb_len = lo->ldo_foreign_lmv_size; + lmu = &info->lti_buf; + rc = lod_xattr_set_lmv(env, dt, lmu, XATTR_NAME_LMV, 0, + th); } } @@ -4295,13 +4570,13 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, lds->lds_dir_def_stripe_offset) && le32_to_cpu(lds->lds_dir_def_hash_type) != LMV_HASH_TYPE_UNKNOWN)) { - struct lmv_user_md_v1 *v1 = info->lti_ea_store; + struct lmv_user_md_v1 *v1 = info->lti_ea_buf.lb_buf; if (info->lti_ea_store_size < sizeof(*v1)) { rc = lod_ea_store_resize(info, sizeof(*v1)); if (rc != 0) RETURN(rc); - v1 = info->lti_ea_store; + v1 = info->lti_ea_buf.lb_buf; } memset(v1, 0, sizeof(*v1)); @@ -4343,7 +4618,7 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, if (rc != 0) RETURN(rc); } - lmm = info->lti_ea_store; + lmm = info->lti_ea_buf.lb_buf; rc = lod_generate_lovea(env, lo, lmm, &lmm_size, true); if (rc != 0) @@ -4362,6 +4637,12 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, RETURN(rc); } + /* ldo_def_striping is not allocated, clear after use, in case directory + * layout is changed later. + */ + if (!declare) + lo->ldo_def_striping = NULL; + RETURN(0); } @@ -4379,15 +4660,19 @@ static int lod_declare_dir_striping_create(const struct lu_env *env, static int lod_dir_striping_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, + const struct lu_buf *lmu, struct dt_object_format *dof, struct thandle *th) { - return lod_dir_striping_create_internal(env, dt, attr, NULL, dof, th, + return lod_dir_striping_create_internal(env, dt, attr, lmu, dof, th, false); } /** - * Make LOV EA for striped object. + * lod_generate_and_set_lovea() - Make LOV EA for striped object. + * @env: execution environment for this thread + * @lo: LOD object + * @th: transaction handle * * Generate striping information and store it in the LOV EA of the given * object. The caller must ensure nobody else is calling the function @@ -4395,12 +4680,9 @@ static int lod_dir_striping_create(const struct lu_env *env, * FLDB service must be running as well; it's used to map FID to the target, * which is stored in LOV EA. * - * \param[in] env execution environment for this thread - * \param[in] lo LOD object - * \param[in] th transaction handle - * - * \retval 0 if LOV EA is stored successfully - * \retval negative error number on failure + * Return: + * * %0 if LOV EA is stored successfully + * * %negative error number on failure */ static int lod_generate_and_set_lovea(const struct lu_env *env, struct lod_object *lo, @@ -4426,7 +4708,7 @@ static int lod_generate_and_set_lovea(const struct lu_env *env, if (rc) RETURN(rc); } - lmm = info->lti_ea_store; + lmm = info->lti_ea_buf.lb_buf; rc = lod_generate_lovea(env, lo, lmm, &lmm_size, false); if (rc) @@ -4443,7 +4725,10 @@ static __u32 lod_gen_component_id(struct lod_object *lo, int mirror_id, int comp_idx); /** - * Repeat an existing component + * lod_layout_repeat_comp() - Repeat an existing component + * @env: execution environment for this thread + * @lo: object to update the layout of + * @index: index of component to copy * * Creates a new layout by replicating an existing component. Uses striping * policy from previous component as a template for the striping for the new @@ -4455,12 +4740,9 @@ static __u32 lod_gen_component_id(struct lod_object *lo, * NB: Reallocates layout components array (lo->ldo_comp_entries), invalidating * any pre-existing pointers to components. Handle with care. * - * \param[in] env execution environment for this thread - * \param[in,out] lo object to update the layout of - * \param[in] index index of component to copy - * - * \retval 0 on success - * \retval negative errno on error + * Return: + * * %0 on success + * * %negative errno on error */ static int lod_layout_repeat_comp(const struct lu_env *env, struct lod_object *lo, int index) @@ -4478,7 +4760,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, CDEBUG(D_LAYOUT, "repeating component %d\n", index); - OBD_ALLOC_PTR_ARRAY(comp_array, new_cnt); + OBD_ALLOC_PTR_ARRAY_LARGE(comp_array, new_cnt); if (comp_array == NULL) GOTO(out, rc = -ENOMEM); @@ -4526,7 +4808,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, new_comp->llc_ostlist.op_array = op_array; } - OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt); + OBD_FREE_PTR_ARRAY_LARGE(lo->ldo_comp_entries, lo->ldo_comp_cnt); lo->ldo_comp_entries = comp_array; lo->ldo_comp_cnt = new_cnt; @@ -4540,7 +4822,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, EXIT; out: if (rc) - OBD_FREE_PTR_ARRAY(comp_array, new_cnt); + OBD_FREE_PTR_ARRAY_LARGE(comp_array, new_cnt); return rc; } @@ -4570,7 +4852,10 @@ static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt) } /** - * Prepare new layout minus deleted components + * lod_layout_del_prep_layout() - Prepare new layout minus deleted components + * @env: execution environment for this thread + * @lo: object to update the layout of + * @th: transaction handle for this operation * * Removes components marked for deletion (LCME_ID_INVAL) by copying to a new * layout and skipping those components. Removes stripe objects if any exist. @@ -4581,12 +4866,9 @@ static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt) * * Caller is responsible for updating mirror end (ldo_mirror[].lme_end). * - * \param[in] env execution environment for this thread - * \param[in,out] lo object to update the layout of - * \param[in] th transaction handle for this operation - * - * \retval # of components deleted - * \retval negative errno on error + * Return: + * * %positive# of components deleted + * * %negative errno on error */ static int lod_layout_del_prep_layout(const struct lu_env *env, struct lod_object *lo, @@ -4614,6 +4896,9 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, continue; } + if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) + continue; + lod_obj_set_pool(lo, i, NULL); if (lod_comp->llc_ostlist.op_array) { OBD_FREE(lod_comp->llc_ostlist.op_array, @@ -4665,7 +4950,7 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, if (info->lti_count > 0) { struct lod_layout_component *comp_array; - OBD_ALLOC_PTR_ARRAY(comp_array, info->lti_count); + OBD_ALLOC_PTR_ARRAY_LARGE(comp_array, info->lti_count); if (comp_array == NULL) GOTO(out, rc = -ENOMEM); @@ -4675,7 +4960,8 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, sizeof(*comp_array)); } - OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt); + OBD_FREE_PTR_ARRAY_LARGE(lo->ldo_comp_entries, + lo->ldo_comp_cnt); lo->ldo_comp_entries = comp_array; lo->ldo_comp_cnt = info->lti_count; } else { @@ -4688,18 +4974,18 @@ out: } /** - * Delete layout component(s) + * lod_layout_del() - Delete layout component(s) + * @env: execution environment for this thread + * @dt: object + * @th: transaction handle * * This function sets up the layout data in the env and does the setattrs * required to write out the new layout. The layout itself is modified in * lod_layout_del_prep_layout. * - * \param[in] env execution environment for this thread - * \param[in] dt object - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative error number on failure + * Return: + * * %0 on success + * * %negative error number on failure */ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) @@ -4748,12 +5034,15 @@ out: } -static int lod_get_default_lov_striping(const struct lu_env *env, - struct lod_object *lo, - struct lod_default_striping *lds, - struct dt_allocation_hint *ah); /** - * Implementation of dt_object_operations::do_xattr_set. + * lod_xattr_set() - Implementation of dt_object_operations::do_xattr_set + * @env: execution environment for this thread + * @dt: object to lock for reading + * @buf: unused, may be removed in the future + * @name: name of the extended attribute + * @fl: LU_XATTR_CREATE - fail if EA exists + * LU_XATTR_REPLACE - fail if EA doesn't exist + * @th: transaction handle * * Sets specified extended attribute on the object. Three types of EAs are * special: @@ -4768,16 +5057,19 @@ static int lod_get_default_lov_striping(const struct lu_env *env, * LOV EA storing all the stripes directly or LMV EA storing just a small header * with striping configuration. * - * \see dt_object_operations::do_xattr_set() in the API description for details. + * see dt_object_operations::do_xattr_set() in the API description for details. + * + * Return: + * * %0 on success + * * %negative on failure */ static int lod_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th) { + struct lod_thread_info *info = lod_env_info(env); struct dt_object *next = dt_object_child(dt); - struct lu_attr *layout_attr = &lod_env_info(env)->lti_layout_attr; struct lod_object *lo = lod_dt_obj(dt); - struct lod_obj_stripe_cb_data data = { {0} }; int rc = 0; ENTRY; @@ -4786,7 +5078,8 @@ static int lod_xattr_set(const struct lu_env *env, !strcmp(name, XATTR_NAME_LMV)) { switch (fl) { case LU_XATTR_CREATE: - rc = lod_dir_striping_create(env, dt, NULL, NULL, th); + rc = lod_dir_striping_create(env, dt, NULL, buf, NULL, + th); break; case 0: case LU_XATTR_REPLACE: @@ -4799,59 +5092,8 @@ static int lod_xattr_set(const struct lu_env *env, RETURN(rc); } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_LOV) == 0) { - struct lod_default_striping *lds = lod_lds_buf_get(env); - struct lov_user_md_v1 *v1 = buf->lb_buf; - char pool[LOV_MAXPOOLNAME + 1]; - bool is_del; - - /* get existing striping config */ - rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds, - NULL); - if (rc) - RETURN(rc); - - memset(pool, 0, sizeof(pool)); - if (lds->lds_def_striping_set == 1) - lod_layout_get_pool(lds->lds_def_comp_entries, - lds->lds_def_comp_cnt, pool, - sizeof(pool)); - - is_del = LOVEA_DELETE_VALUES(v1->lmm_stripe_size, - v1->lmm_stripe_count, - v1->lmm_stripe_offset, - NULL); - - /* Retain the pool name if it is not given */ - if (v1->lmm_magic == LOV_USER_MAGIC_V1 && pool[0] != '\0' && - !is_del) { - struct lod_thread_info *info = lod_env_info(env); - struct lov_user_md_v3 *v3 = info->lti_ea_store; - - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_pattern = cpu_to_le32(v1->lmm_pattern); - v3->lmm_stripe_count = - cpu_to_le32(v1->lmm_stripe_count); - v3->lmm_stripe_offset = - cpu_to_le32(v1->lmm_stripe_offset); - v3->lmm_stripe_size = cpu_to_le32(v1->lmm_stripe_size); - - strlcpy(v3->lmm_pool_name, pool, - sizeof(v3->lmm_pool_name)); - - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); - rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, - name, fl, th); - } else { - rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, - fl, th); - } - - if (lds->lds_def_striping_set == 1 && - lds->lds_def_comp_entries != NULL) - lod_free_def_comp_entries(lds); - + rc = lod_xattr_set_default_lov_on_dir(env, dt, buf, name, fl, + th); RETURN(rc); } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { @@ -4863,6 +5105,17 @@ static int lod_xattr_set(const struct lu_env *env, (strcmp(name, XATTR_NAME_LOV) == 0 || strcmp(name, XATTR_LUSTRE_LOV) == 0 || allowed_lustre_lov(name))) { + /* layout has been changed by others in the transaction */ + rc = lod_check_layout_gen_intrans(info, lo); + if (rc > 0) { + CDEBUG(D_LAYOUT, + "%s: obj "DFID" gen changed from %d to %d in transaction, retry the transaction\n", + dt->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), + info->lti_gen[rc - 1], lo->ldo_layout_gen); + RETURN(-EAGAIN); + } + /* in case of lov EA swap, just set it * if not, it is a replay so check striping match what we * already have during req replay, declare_xattr_set() @@ -4877,24 +5130,9 @@ static int lod_xattr_set(const struct lu_env *env, if (rc) RETURN(rc); - rc = lod_striping_reload(env, lo, buf); + rc = lod_striping_reload(env, lo, buf, LVF_ALL_STALE); if (rc) RETURN(rc); - - if (lo->ldo_mirror_count > 1 && - layout_attr->la_valid & LA_LAYOUT_VERSION) { - /* mirror split */ - layout_attr->la_layout_version = - lo->ldo_layout_gen; - data.locd_attr = layout_attr; - data.locd_declare = false; - data.locd_stripe_cb = - lod_obj_stripe_attr_set_cb; - rc = lod_obj_for_each_stripe(env, lo, th, - &data); - if (rc) - RETURN(rc); - } } else if (fl & LU_XATTR_PURGE) { rc = lod_layout_purge(env, dt, buf, th); } else if (dt_object_remote(dt)) { @@ -4912,7 +5150,7 @@ static int lod_xattr_set(const struct lu_env *env, } else { /* * When 'name' is XATTR_LUSTRE_LOV or XATTR_NAME_LOV, - * it's going to create create file with specified + * it's going to create file with specified * component(s), the striping must have not being * cached in this case; * @@ -4920,28 +5158,14 @@ static int lod_xattr_set(const struct lu_env *env, * an existing file, the striping must have been cached * in this case. */ - LASSERT(equi(!strcmp(name, XATTR_LUSTRE_LOV) || - !strcmp(name, XATTR_NAME_LOV), - !lod_dt_obj(dt)->ldo_comp_cached)); + if (!(fl & LU_XATTR_MERGE)) + LASSERT(equi(!strcmp(name, XATTR_LUSTRE_LOV) || + !strcmp(name, XATTR_NAME_LOV), + !lod_dt_obj(dt)->ldo_comp_cached)); rc = lod_striped_create(env, dt, NULL, NULL, th); if (rc) RETURN(rc); - - if (fl & LU_XATTR_MERGE && lo->ldo_mirror_count > 1 && - layout_attr->la_valid & LA_LAYOUT_VERSION) { - /* mirror merge exec phase */ - layout_attr->la_layout_version = - lo->ldo_layout_gen; - data.locd_attr = layout_attr; - data.locd_declare = false; - data.locd_stripe_cb = - lod_obj_stripe_attr_set_cb; - rc = lod_obj_for_each_stripe(env, lo, th, - &data); - if (rc) - RETURN(rc); - } } RETURN(rc); } else if (strcmp(name, XATTR_NAME_FID) == 0) { @@ -4956,10 +5180,10 @@ static int lod_xattr_set(const struct lu_env *env, RETURN(rc); } -/** +/* * Implementation of dt_object_operations::do_declare_xattr_del. * - * \see dt_object_operations::do_declare_xattr_del() in the API description + * see dt_object_operations::do_declare_xattr_del() in the API description * for details. */ static int lod_declare_xattr_del(const struct lu_env *env, @@ -5010,13 +5234,13 @@ static int lod_declare_xattr_del(const struct lu_env *env, RETURN(rc); } -/** +/* * Implementation of dt_object_operations::do_xattr_del. * * If EA storing a regular striping is being deleted, then release * all the references to the stripe objects in core. * - * \see dt_object_operations::do_xattr_del() in the API description for details. + * see dt_object_operations::do_xattr_del() in the API description for details. */ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) @@ -5033,10 +5257,10 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -/** +/* * Implementation of dt_object_operations::do_xattr_list. * - * \see dt_object_operations::do_xattr_list() in the API description + * see dt_object_operations::do_xattr_list() in the API description * for details. */ static int lod_xattr_list(const struct lu_env *env, @@ -5051,13 +5275,13 @@ static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fi } /** - * Copy OST list from layout provided by user. + * lod_comp_copy_ost_lists() - Copy OST list from layout provided by user. + * @lod_comp: layout_component to be filled + * @v3: LOV EA V3 user data * - * \param[in] lod_comp layout_component to be filled - * \param[in] v3 LOV EA V3 user data - * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ int lod_comp_copy_ost_lists(struct lod_layout_component *lod_comp, struct lov_user_md_v3 *v3) @@ -5098,31 +5322,40 @@ skip: /** - * Get default striping. - * - * \param[in] env execution environment - * \param[in] lo object - * \param[out] lds default striping - * - * \retval 0 on success - * \retval negative if failed + * lod_get_default_lov_striping() - Get default striping. + * @env: execution environment + * @lo: object + * @lds: default striping + * @dah: pointer to a struct that gives allocation hints + * + * Return: + * * %0 on success + * * %negative if failed */ static int lod_get_default_lov_striping(const struct lu_env *env, struct lod_object *lo, struct lod_default_striping *lds, - struct dt_allocation_hint *ah) + struct dt_allocation_hint *dah) { struct lod_thread_info *info = lod_env_info(env); struct lov_user_md_v1 *v1 = NULL; struct lov_user_md_v3 *v3 = NULL; - struct lov_comp_md_v1 *comp_v1 = NULL; - __u16 comp_cnt; - __u16 mirror_cnt; - bool composite; + struct lov_comp_md_v1 *lcm = NULL; + __u32 magic; + int append_stripe_count = dah != NULL ? dah->dah_append_stripe_count : 0; + const char *append_pool = (dah != NULL && + dah->dah_append_pool != NULL && + dah->dah_append_pool[0] != '\0') ? + dah->dah_append_pool : NULL; + __u16 entry_count = 1; + __u16 mirror_count = 0; + bool want_composite = false; int rc, i, j; ENTRY; + lds->lds_def_striping_set = 0; + rc = lod_get_lov_ea(env, lo); if (rc < 0) RETURN(rc); @@ -5130,116 +5363,129 @@ static int lod_get_default_lov_striping(const struct lu_env *env, if (rc < (typeof(rc))sizeof(struct lov_user_md)) RETURN(0); - v1 = info->lti_ea_store; - if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) { - lustre_swab_lov_user_md_v1(v1); - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { - v3 = (struct lov_user_md_v3 *)v1; - lustre_swab_lov_user_md_v3(v3); - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) { - v3 = (struct lov_user_md_v3 *)v1; + magic = *(__u32 *)info->lti_ea_buf.lb_buf; + if (magic == __swab32(LOV_USER_MAGIC_V1)) { + lustre_swab_lov_user_md_v1(info->lti_ea_store); + } else if (magic == __swab32(LOV_USER_MAGIC_V3)) { + lustre_swab_lov_user_md_v3(info->lti_ea_store); + } else if (magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) { + v3 = (struct lov_user_md_v3 *)info->lti_ea_store; lustre_swab_lov_user_md_v3(v3); lustre_swab_lov_user_md_objects(v3->lmm_objects, v3->lmm_stripe_count); - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1) || - v1->lmm_magic == __swab32(LOV_USER_MAGIC_SEL)) { - comp_v1 = (struct lov_comp_md_v1 *)v1; - lustre_swab_lov_comp_md_v1(comp_v1); + } else if (magic == __swab32(LOV_USER_MAGIC_COMP_V1) || + magic == __swab32(LOV_USER_MAGIC_SEL)) { + lustre_swab_lov_comp_md_v1(info->lti_ea_buf.lb_buf); } - if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1 && - v1->lmm_magic != LOV_MAGIC_COMP_V1 && - v1->lmm_magic != LOV_MAGIC_SEL && - v1->lmm_magic != LOV_USER_MAGIC_SPECIFIC) + switch (magic) { + case LOV_MAGIC_V1: + case LOV_MAGIC_V3: + case LOV_USER_MAGIC_SPECIFIC: + v1 = info->lti_ea_buf.lb_buf; + break; + case LOV_MAGIC_COMP_V1: + case LOV_MAGIC_SEL: + lcm = info->lti_ea_buf.lb_buf; + entry_count = lcm->lcm_entry_count; + if (entry_count == 0) + RETURN(-EINVAL); + + mirror_count = lcm->lcm_mirror_count + 1; + want_composite = true; + break; + default: RETURN(-ENOTSUPP); + } - if ((v1->lmm_magic == LOV_MAGIC_COMP_V1 || - v1->lmm_magic == LOV_MAGIC_SEL) && - !(ah && ah->dah_append_stripes)) { - comp_v1 = (struct lov_comp_md_v1 *)v1; - comp_cnt = comp_v1->lcm_entry_count; - if (comp_cnt == 0) - RETURN(-EINVAL); - mirror_cnt = comp_v1->lcm_mirror_count + 1; - composite = true; - } else { - comp_cnt = 1; - mirror_cnt = 0; - composite = false; + if (append_stripe_count != 0 || append_pool != NULL) { + entry_count = 1; + mirror_count = 0; + want_composite = false; } /* realloc default comp entries if necessary */ - rc = lod_def_striping_comp_resize(lds, comp_cnt); + rc = lod_def_striping_comp_resize(lds, entry_count); if (rc < 0) RETURN(rc); - lds->lds_def_comp_cnt = comp_cnt; - lds->lds_def_striping_is_composite = composite; - lds->lds_def_mirror_cnt = mirror_cnt; + lds->lds_def_comp_cnt = entry_count; + lds->lds_def_striping_is_composite = want_composite; + lds->lds_def_mirror_cnt = mirror_count; - for (i = 0; i < comp_cnt; i++) { - struct lod_layout_component *lod_comp; - char *pool; + for (i = 0; i < entry_count; i++) { + struct lod_layout_component *llc = &lds->lds_def_comp_entries[i]; + const char *pool; - lod_comp = &lds->lds_def_comp_entries[i]; /* - * reset lod_comp values, llc_stripes is always NULL in - * the default striping template, llc_pool will be reset - * later below. + * reset llc values, llc_stripes is always NULL in the + * default striping template, llc_pool will be reset + * later below using lod_set_pool(). + * + * XXX At this point llc_pool may point to valid (!) + * kmalloced strings from previous RPCs. */ - memset(lod_comp, 0, offsetof(typeof(*lod_comp), llc_pool)); - - if (composite) { - v1 = (struct lov_user_md *)((char *)comp_v1 + - comp_v1->lcm_entries[i].lcme_offset); - lod_comp->llc_extent = - comp_v1->lcm_entries[i].lcme_extent; - /* We only inherit certain flags from the layout */ - lod_comp->llc_flags = - comp_v1->lcm_entries[i].lcme_flags & + memset(llc, 0, offsetof(typeof(*llc), llc_pool)); + + if (lcm != NULL) { + v1 = (struct lov_user_md *)((char *)lcm + + lcm->lcm_entries[i].lcme_offset); + + if (want_composite) { + llc->llc_extent = lcm->lcm_entries[i].lcme_extent; + /* We only inherit certain flags from the layout */ + llc->llc_flags = lcm->lcm_entries[i].lcme_flags & LCME_TEMPLATE_FLAGS; + } } + CDEBUG(D_LAYOUT, DFID" magic = %#08x, pattern = %#x, stripe_count = %hd, stripe_size = %u, stripe_offset = %hd, append_pool = '%s', append_stripe_count = %d\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), v1->lmm_magic, + v1->lmm_pattern, (__s16)v1->lmm_stripe_count, + v1->lmm_stripe_size, (__s16)v1->lmm_stripe_offset, + append_pool ?: "", append_stripe_count); + if (!lov_pattern_supported(v1->lmm_pattern) && !(v1->lmm_pattern & LOV_PATTERN_F_RELEASED)) { lod_free_def_comp_entries(lds); RETURN(-EINVAL); } - CDEBUG(D_LAYOUT, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d append_stripes=%d\n", - PFID(lu_object_fid(&lo->ldo_obj.do_lu)), - (int)v1->lmm_stripe_count, (int)v1->lmm_stripe_size, - (int)v1->lmm_stripe_offset, - ah ? ah->dah_append_stripes : 0); + llc->llc_stripe_count = v1->lmm_stripe_count; + llc->llc_stripe_size = v1->lmm_stripe_size; + llc->llc_stripe_offset = v1->lmm_stripe_offset; + llc->llc_pattern = v1->lmm_pattern; - if (ah && ah->dah_append_stripes) - lod_comp->llc_stripe_count = ah->dah_append_stripes; - else - lod_comp->llc_stripe_count = v1->lmm_stripe_count; - lod_comp->llc_stripe_size = v1->lmm_stripe_size; - lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; - lod_comp->llc_pattern = v1->lmm_pattern; + if (append_stripe_count != 0 || append_pool != NULL) + llc->llc_pattern = LOV_PATTERN_RAID0; + + if (append_stripe_count != 0) + llc->llc_stripe_count = append_stripe_count; pool = NULL; - if (ah && ah->dah_append_pool && ah->dah_append_pool[0]) { - pool = ah->dah_append_pool; + if (append_pool != NULL) { + pool = append_pool; } else if (v1->lmm_magic == LOV_USER_MAGIC_V3) { /* XXX: sanity check here */ - v3 = (struct lov_user_md_v3 *) v1; + v3 = (struct lov_user_md_v3 *)v1; if (v3->lmm_pool_name[0] != '\0') pool = v3->lmm_pool_name; } - lod_set_def_pool(lds, i, pool); - if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) { + + lod_set_pool(&llc->llc_pool, pool); + + if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC && + append_stripe_count == 0 && + append_pool == NULL) { v3 = (struct lov_user_md_v3 *)v1; - rc = lod_comp_copy_ost_lists(lod_comp, v3); + rc = lod_comp_copy_ost_lists(llc, v3); if (rc) RETURN(rc); - } else if (lod_comp->llc_ostlist.op_array && - lod_comp->llc_ostlist.op_count) { - for (j = 0; j < lod_comp->llc_ostlist.op_count; j++) - lod_comp->llc_ostlist.op_array[j] = -1; - lod_comp->llc_ostlist.op_count = 0; + } else if (llc->llc_ostlist.op_array && + llc->llc_ostlist.op_count) { + for (j = 0; j < llc->llc_ostlist.op_count; j++) + llc->llc_ostlist.op_array[j] = -1; + llc->llc_ostlist.op_count = 0; } } @@ -5247,15 +5493,26 @@ static int lod_get_default_lov_striping(const struct lu_env *env, RETURN(rc); } +static inline void lod_lum2lds(struct lod_default_striping *lds, + const struct lmv_user_md *lum) +{ + lds->lds_dir_def_stripe_count = le32_to_cpu(lum->lum_stripe_count); + lds->lds_dir_def_stripe_offset = le32_to_cpu(lum->lum_stripe_offset); + lds->lds_dir_def_hash_type = le32_to_cpu(lum->lum_hash_type); + lds->lds_dir_def_max_inherit = lum->lum_max_inherit; + lds->lds_dir_def_max_inherit_rr = lum->lum_max_inherit_rr; + lds->lds_dir_def_striping_set = 1; +} + /** - * Get default directory striping. - * - * \param[in] env execution environment - * \param[in] lo object - * \param[out] lds default striping - * - * \retval 0 on success - * \retval negative if failed + * lod_get_default_lmv_striping() - Get default directory striping. + * @env: execution environment + * @lo: object + * @lds: default striping + * + * Return: + * * %0 on success + * * %negative if failed */ static int lod_get_default_lmv_striping(const struct lu_env *env, struct lod_object *lo, @@ -5273,57 +5530,64 @@ static int lod_get_default_lmv_striping(const struct lu_env *env, if (rc >= (int)sizeof(*lmu)) { struct lod_thread_info *info = lod_env_info(env); - lmu = info->lti_ea_store; - - lds->lds_dir_def_stripe_count = - le32_to_cpu(lmu->lum_stripe_count); - lds->lds_dir_def_stripe_offset = - le32_to_cpu(lmu->lum_stripe_offset); - lds->lds_dir_def_hash_type = - le32_to_cpu(lmu->lum_hash_type); - lds->lds_dir_def_max_inherit = lmu->lum_max_inherit; - lds->lds_dir_def_max_inherit_rr = lmu->lum_max_inherit_rr; - lds->lds_dir_def_striping_set = 1; + lmu = info->lti_ea_buf.lb_buf; + lod_lum2lds(lds, lmu); } return 0; } /** - * Get default striping in the object. + * lod_get_default_striping() - Get default striping in the object. + * @env: execution environment + * @lo: object + * @ah: pointer to a struct that gives allocation hints + * @lds: default striping * * Get object default striping and default directory striping. * - * \param[in] env execution environment - * \param[in] lo object - * \param[out] lds default striping - * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ static int lod_get_default_striping(const struct lu_env *env, struct lod_object *lo, + struct dt_allocation_hint *ah, struct lod_default_striping *lds) { int rc, rc1; rc = lod_get_default_lov_striping(env, lo, lds, NULL); - rc1 = lod_get_default_lmv_striping(env, lo, lds); - if (rc == 0 && rc1 < 0) - rc = rc1; + if (lds->lds_def_striping_set) { + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + + rc = lod_verify_striping(env, d, lo, &info->lti_buf, false); + if (rc) + lds->lds_def_striping_set = 0; + } + + if (ah->dah_eadata_is_dmv) { + lod_lum2lds(lds, ah->dah_eadata); + } else if (ah->dah_dmv_imp_inherit) { + lds->lds_dir_def_striping_set = 0; + } else { + rc1 = lod_get_default_lmv_striping(env, lo, lds); + if (rc == 0 && rc1 < 0) + rc = rc1; + } return rc; } /** - * Apply default striping on object. + * lod_striping_from_default() - Apply default striping on object. + * @lo: new object + * @lds: default striping + * @mode: new object's mode * * If object striping pattern is not set, set to the one in default striping. * The default striping is from parent or fs. - * - * \param[in] lo new object - * \param[in] lds default striping - * \param[in] mode new object's mode */ static void lod_striping_from_default(struct lod_object *lo, const struct lod_default_striping *lds, @@ -5350,8 +5614,9 @@ static void lod_striping_from_default(struct lod_object *lo, struct lod_layout_component *def_comp = &lds->lds_def_comp_entries[i]; - CDEBUG(D_LAYOUT, "Inherit from default: flags=%#x " - "size=%hu nr=%u offset=%u pattern=%#x pool=%s\n", + CDEBUG(D_LAYOUT, + "inherit "DFID" file layout from default: flags=%#x size=%u nr=%u offset=%u pattern=%#x pool=%s\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), def_comp->llc_flags, def_comp->llc_stripe_size, def_comp->llc_stripe_count, @@ -5397,21 +5662,22 @@ static void lod_striping_from_default(struct lod_object *lo, if (lo->ldo_dir_stripe_count == 0) lo->ldo_dir_stripe_count = lds->lds_dir_def_stripe_count; - if (lo->ldo_dir_stripe_offset == -1) + if (lo->ldo_dir_stripe_offset == LMV_OFFSET_DEFAULT) lo->ldo_dir_stripe_offset = lds->lds_dir_def_stripe_offset; - if (lo->ldo_dir_hash_type == 0) + if (lo->ldo_dir_hash_type == LMV_HASH_TYPE_UNKNOWN) lo->ldo_dir_hash_type = lds->lds_dir_def_hash_type; - CDEBUG(D_LAYOUT, "striping from default dir: count:%hu, " - "offset:%u, hash_type:%u\n", + CDEBUG(D_LAYOUT, + "inherit "DFID" dir layout from default: count=%hu offset=%u hash_type=%x\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), lo->ldo_dir_stripe_count, lo->ldo_dir_stripe_offset, lo->ldo_dir_hash_type); } } static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root, - char *append_pool) + const char *append_pool) { struct lod_layout_component *lod_comp; @@ -5437,16 +5703,16 @@ static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root, return false; } -/** +/* * Implementation of dt_object_operations::do_ah_init. * * This method is used to make a decision on the striping configuration for the * object being created. It can be taken from the \a parent object if it exists, * or filesystem's default. The resulting configuration (number of stripes, - * stripe size/offset, pool name, etc) is stored in the object itself and will - * be used by the methods like ->doo_declare_create(). + * stripe size/offset, pool name, hash_type, etc.) is stored in the object + * itself and will be used by the methods like ->doo_declare_create(). * - * \see dt_object_operations::do_ah_init() in the API description for details. + * see dt_object_operations::do_ah_init() in the API description for details. */ static void lod_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah, @@ -5468,8 +5734,8 @@ static void lod_ah_init(const struct lu_env *env, LASSERT(child); - if (ah->dah_append_stripes == -1) - ah->dah_append_stripes = + if (ah->dah_append_stripe_count == -1) + ah->dah_append_stripe_count = d->lod_ost_descs.ltd_lov_desc.ld_tgt_count; if (likely(parent)) { @@ -5491,43 +5757,32 @@ static void lod_ah_init(const struct lu_env *env, if (S_ISDIR(child_mode)) { const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; + int max_stripe_count = 0; + int mdt_count = d->lod_remote_mdt_count + 1; /* other default values are 0 */ - lc->ldo_dir_stripe_offset = -1; + lc->ldo_dir_stripe_offset = LMV_OFFSET_DEFAULT; /* no default striping configuration is needed for * foreign dirs */ if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && le32_to_cpu(lum1->lum_magic) == LMV_MAGIC_FOREIGN) { - lc->ldo_dir_is_foreign = true; + lc->ldo_is_foreign = true; /* keep stripe_count 0 and stripe_offset -1 */ CDEBUG(D_INFO, "no default striping for foreign dir\n"); RETURN_EXIT; } - /* - * If parent object is not root directory, - * then get default striping from parent object. - */ - if (likely(lp != NULL)) { - lod_get_default_striping(env, lp, lds); - - /* inherit default striping except ROOT */ - if ((lds->lds_def_striping_set || - lds->lds_dir_def_striping_set) && - !fid_is_root(lod_object_fid(lp))) - lc->ldo_def_striping = lds; - } + if (likely(lp != NULL)) + lod_get_default_striping(env, lp, ah, lds); /* It should always honour the specified stripes */ - /* Note: old client (< 2.7)might also do lfs mkdir, whose EA - * will have old magic. In this case, we should ignore the - * stripe count and try to create dir by default stripe. - */ - if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && + if (ah->dah_eadata && ah->dah_eadata_len && + !ah->dah_eadata_is_dmv && (le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC || - le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC_SPECIFIC)) { + le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC_SPECIFIC || + le32_to_cpu(lum1->lum_magic) == LMV_MAGIC_V1)) { lc->ldo_dir_stripe_count = le32_to_cpu(lum1->lum_stripe_count); lc->ldo_dir_stripe_offset = @@ -5535,32 +5790,118 @@ static void lod_ah_init(const struct lu_env *env, lc->ldo_dir_hash_type = le32_to_cpu(lum1->lum_hash_type); CDEBUG(D_INFO, - "set dirstripe: count %hu, offset %d, hash %u\n", + "set dirstripe: count %hu, offset %d, hash %x\n", lc->ldo_dir_stripe_count, (int)lc->ldo_dir_stripe_offset, lc->ldo_dir_hash_type); + + if (d->lod_mdt_descs.ltd_lmv_desc.ld_active_tgt_count && + lc->ldo_dir_stripe_count < 2 && + lum1->lum_max_inherit != LMV_INHERIT_NONE) { + /* when filesystem-wide default LMV is set, dirs + * will be created on MDT by space usage, but if + * dir is created with "lfs mkdir -c 1 ...", its + * subdirs should be kept on the same MDT. To + * guarantee this, set default LMV for such dir. + */ + lds->lds_dir_def_stripe_count = + le32_to_cpu(lum1->lum_stripe_count); + /* if "-1" stripe offset is set, save current + * MDT index in default LMV. + */ + if (le32_to_cpu(lum1->lum_stripe_offset) == + LMV_OFFSET_DEFAULT) + lds->lds_dir_def_stripe_offset = + lod2lu_dev(d)->ld_site->ld_seq_site->ss_node_id; + else + lds->lds_dir_def_stripe_offset = + le32_to_cpu(lum1->lum_stripe_offset); + lds->lds_dir_def_hash_type = + le32_to_cpu(lum1->lum_hash_type); + lds->lds_dir_def_max_inherit = + lum1->lum_max_inherit; + /* it will be decreased by 1 later in setting */ + if (lum1->lum_max_inherit >= LMV_INHERIT_END && + lum1->lum_max_inherit < LMV_INHERIT_MAX) + lds->lds_dir_def_max_inherit++; + lds->lds_dir_def_max_inherit_rr = + lum1->lum_max_inherit_rr; + lds->lds_dir_def_striping_set = 1; + /* don't inherit LOV from ROOT */ + if (lds->lds_def_striping_set && + fid_is_root(lod_object_fid(lp))) + lds->lds_def_striping_set = 0; + lc->ldo_def_striping = lds; + } else if (lds->lds_def_striping_set && + !fid_is_root(lod_object_fid(lp))) { + /* don't inherit default LMV for "lfs mkdir" */ + lds->lds_dir_def_striping_set = 0; + lc->ldo_def_striping = lds; + } } else { + /* inherit default striping except ROOT */ + if ((lds->lds_def_striping_set || + lds->lds_dir_def_striping_set) && + !fid_is_root(lod_object_fid(lp))) + lc->ldo_def_striping = lds; + /* transfer defaults LMV to new directory */ lod_striping_from_default(lc, lds, child_mode); /* set count 0 to create normal directory */ if (lc->ldo_dir_stripe_count == 1) lc->ldo_dir_stripe_count = 0; + + /* do not save default LMV on server */ + if (ah->dah_dmv_imp_inherit) { + lds->lds_dir_def_striping_set = 0; + if (!lds->lds_def_striping_set) + lc->ldo_def_striping = NULL; + } } - /* shrink the stripe_count to the avaible MDT count */ - if (lc->ldo_dir_stripe_count > d->lod_remote_mdt_count + 1 && - !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) { - lc->ldo_dir_stripe_count = d->lod_remote_mdt_count + 1; + /* + * 1. overstriped case(< 0), adjust stripe count given + * overstriped factor by mdt_countmdt_count; + * 1.1 use lod_max_mdt_stripecount as max_stripe_count; + * 2. overstriped case(> 0), multiply max_stripe_count by + * max_stripes_per_mdt; + */ + max_stripe_count = mdt_count; + if ((__s16)lc->ldo_dir_stripe_count >= + LMV_OVERSTRIPE_COUNT_MAX && + (__s16)lc->ldo_dir_stripe_count <= + LMV_OVERSTRIPE_COUNT_MIN) { + lc->ldo_dir_stripe_count = mdt_count * + -(__s16)lc->ldo_dir_stripe_count; + max_stripe_count = d->lod_max_mdt_stripecount ?: + mdt_count * d->lod_max_stripes_per_mdt; + } else if (lc->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) { + max_stripe_count *= d->lod_max_stripes_per_mdt; + } + /* shrink the stripe_count to max stripe count */ + if (lc->ldo_dir_stripe_count > max_stripe_count && + !CFS_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) { + lc->ldo_dir_stripe_count = max_stripe_count; if (lc->ldo_dir_stripe_count == 1) lc->ldo_dir_stripe_count = 0; } - if (!(lc->ldo_dir_hash_type & LMV_HASH_TYPE_MASK)) - lc->ldo_dir_hash_type |= + if (!lmv_is_known_hash_type(lc->ldo_dir_hash_type)) + lc->ldo_dir_hash_type = + (lc->ldo_dir_hash_type & LMV_HASH_FLAG_KNOWN) | d->lod_mdt_descs.ltd_lmv_desc.ld_pattern; - CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n", + /* make sure all fscrypt metadata stays on same mdt */ + if (child->do_lu.lo_header->loh_attr & LOHA_FSCRYPT_MD) { + lc->ldo_dir_stripe_count = 0; + lds->lds_dir_def_stripe_offset = + lod2lu_dev(d)->ld_site->ld_seq_site->ss_node_id; + lds->lds_dir_def_striping_set = 1; + lc->ldo_def_striping = lds; + } + + CDEBUG(D_INFO, "final dir stripe_count=%hu offset=%d hash=%x\n", lc->ldo_dir_stripe_count, (int)lc->ldo_dir_stripe_offset, lc->ldo_dir_hash_type); @@ -5581,8 +5922,12 @@ static void lod_ah_init(const struct lu_env *env, */ if (likely(lp != NULL)) { rc = lod_get_default_lov_striping(env, lp, lds, ah); - if (rc == 0) - lod_striping_from_default(lc, lds, child_mode); + if (rc == 0 && lds->lds_def_striping_set) { + rc = lod_verify_striping(env, d, lp, &info->lti_buf, + false); + if (rc == 0) + lod_striping_from_default(lc, lds, child_mode); + } } /* Initialize lod_device::lod_md_root object reference */ @@ -5612,8 +5957,14 @@ static void lod_ah_init(const struct lu_env *env, lod_need_inherit_more(lc, true, ah->dah_append_pool)) { rc = lod_get_default_lov_striping(env, d->lod_md_root, lds, ah); + if (rc || !lds->lds_def_striping_set) + goto out; + + rc = lod_verify_striping(env, d, d->lod_md_root, &info->lti_buf, + false); if (rc) goto out; + if (lc->ldo_comp_cnt == 0) { lod_striping_from_default(lc, lds, child_mode); } else if (!lds->lds_def_striping_is_composite) { @@ -5634,7 +5985,7 @@ static void lod_ah_init(const struct lu_env *env, lod_comp->llc_stripe_offset = def_comp->llc_stripe_offset; if (lod_comp->llc_pool == NULL) - lod_obj_set_pool(lc, 0, def_comp->llc_pool); + lod_qos_set_pool(lc, 0, def_comp->llc_pool); } } out: @@ -5656,16 +6007,17 @@ out: LASSERT(!lc->ldo_is_composite); lod_comp = &lc->ldo_comp_entries[0]; desc = &d->lod_ost_descs.ltd_lov_desc; - lod_adjust_stripe_info(lod_comp, desc, ah->dah_append_stripes); + lod_adjust_stripe_info(lod_comp, desc, + ah->dah_append_stripe_count); if (ah->dah_append_pool && ah->dah_append_pool[0]) - lod_obj_set_pool(lc, 0, ah->dah_append_pool); + lod_qos_set_pool(lc, 0, ah->dah_append_pool); } EXIT; } /** - * Size initialization on late striping. + * lod_declare_init_size() - Size initialization on late striping. * * Propagate the size of a truncated object to a deferred striping. * This function handles a special case when truncate was done on a @@ -5673,12 +6025,13 @@ out: * we can't lose that size, so we have to propagate it to the stripes * being created. * - * \param[in] env execution environment - * \param[in] dt object - * \param[in] th transaction handle + * @env: execution environment + * @dt: object + * @th: transaction handle * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ static int lod_declare_init_size(const struct lu_env *env, struct dt_object *dt, struct thandle *th) @@ -5747,23 +6100,24 @@ static int lod_declare_init_size(const struct lu_env *env, } /** - * Declare creation of striped object. + * lod_declare_striped_create() - Declare creation of striped object. * * The function declares creation stripes for a regular object. The function * also declares whether the stripes will be created with non-zero size if - * previously size was set non-zero on the master object. If object \a dt is - * not local, then only fully defined striping can be applied in \a lovea. - * Otherwise \a lovea can be in the form of pattern, see lod_qos_parse_config() + * previously size was set non-zero on the master object. If object @dt is + * not local, then only fully defined striping can be applied in @lovea. + * Otherwise @lovea can be in the form of pattern, see lod_qos_parse_config() * for the details. * - * \param[in] env execution environment - * \param[in] dt object - * \param[in] attr attributes the stripes will be created with - * \param[in] lovea a buffer containing striping description - * \param[in] th transaction handle + * @env: execution environment + * @dt: object + * @attr: attributes the stripes will be created with + * @lovea: a buffer containing striping description + * @th: transaction handle * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ int lod_declare_striped_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, @@ -5775,7 +6129,7 @@ int lod_declare_striped_create(const struct lu_env *env, struct dt_object *dt, int rc; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) GOTO(out, rc = -ENOMEM); if (!dt_object_remote(next)) { @@ -5821,48 +6175,6 @@ out: } /* - * Whether subdirectories under \a dt should be created on MDTs by space QoS - * - * If LMV_HASH_FLAG_SPACE is set on directory default layout, its subdirectories - * should be created on MDT by space QoS. - * - * \param[in] env execution environment - * \param[in] dev lu device - * \param[in] dt object - * - * \retval 1 if directory should create subdir by space usage - * \retval 0 if not - * \retval -ev if failed - */ -static inline int dt_object_qos_mkdir(const struct lu_env *env, - struct lu_device *dev, - struct dt_object *dt) -{ - struct lod_thread_info *info = lod_env_info(env); - struct lu_object *obj; - struct lod_object *lo; - struct lmv_user_md *lmu; - int rc; - - obj = lu_object_find_slice(env, dev, lu_object_fid(&dt->do_lu), NULL); - if (IS_ERR(obj)) - return PTR_ERR(obj); - - lo = lu2lod_obj(obj); - - rc = lod_get_default_lmv_ea(env, lo); - dt_object_put(env, dt); - if (rc <= 0) - return rc; - - if (rc < (int)sizeof(*lmu)) - return -EINVAL; - - lmu = info->lti_ea_store; - return le32_to_cpu(lmu->lum_stripe_offset) == LMV_OFFSET_DEFAULT; -} - -/** * Implementation of dt_object_operations::do_declare_create. * * The method declares creation of a new object. If the object will be striped, @@ -5870,7 +6182,7 @@ static inline int dt_object_qos_mkdir(const struct lu_env *env, * creation of the stripes and declare initialization of the striping * information to be stored in the master object. * - * \see dt_object_operations::do_declare_create() in the API description + * see dt_object_operations::do_declare_create() in the API description * for details. */ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, @@ -5908,70 +6220,53 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, } else if (dof->dof_type == DFT_DIR) { struct seq_server_site *ss; struct lu_buf buf = { NULL }; - struct lu_buf *lmu = NULL; ss = lu_site2seq(dt->do_lu.lo_dev->ld_site); - - /* If the parent has default stripeEA, and client - * did not find it before sending create request, - * then MDT will return -EREMOTE, and client will - * retrieve the default stripeEA and re-create the - * sub directory. - * - * Note: if dah_eadata != NULL, it means creating the - * striped directory with specified stripeEA, then it - * should ignore the default stripeEA */ - if (hint != NULL && hint->dah_eadata == NULL) { - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT)) + if (hint && (!hint->dah_eadata || hint->dah_eadata_is_dmv)) { + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT)) GOTO(out, rc = -EREMOTE); + } else if (hint) { + LASSERT(hint->dah_eadata && !hint->dah_eadata_is_dmv); + buf.lb_buf = (void *)hint->dah_eadata; + buf.lb_len = hint->dah_eadata_len; + } - if (lo->ldo_dir_stripe_offset == LMV_OFFSET_DEFAULT) { - struct lod_default_striping *lds; - lds = lo->ldo_def_striping; - /* - * child and parent should be on the same MDT, - * but if parent has default LMV, and the start - * MDT offset is -1, it's allowed. This check - * is not necessary after 2.12.22 because client - * follows this already, but old client may not. - */ - if (hint->dah_parent && - dt_object_remote(hint->dah_parent) && lds && - lds->lds_dir_def_stripe_offset != - LMV_OFFSET_DEFAULT) - GOTO(out, rc = -EREMOTE); - } else if (lo->ldo_dir_stripe_offset != - ss->ss_node_id) { - struct lod_device *lod; - struct lu_tgt_desc *mdt = NULL; - bool found_mdt = false; - - lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); - lod_foreach_mdt(lod, mdt) { - if (mdt->ltd_index == - lo->ldo_dir_stripe_offset) { - found_mdt = true; - break; - } + /* if dir target MDT is not current MDT, it's possible that + * directory creation is disabled on the target MDT. + */ + if (lo->ldo_dir_stripe_offset != LMV_OFFSET_DEFAULT && + lo->ldo_dir_stripe_offset != ss->ss_node_id) { + struct lod_device *lod; + struct lu_tgt_desc *mdt = NULL; + bool no_create = false; + + lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + rc = -EINVAL; + lod_foreach_mdt(lod, mdt) { + if (mdt->ltd_index == + lo->ldo_dir_stripe_offset) { + if (unlikely(hint && !hint->dah_eadata)) + /* old client may not cache DMV, + * allow it to retry. + */ + rc = -EREMOTE; + else + rc = -EPROTO; + /* refresh statfs */ + dt_statfs(env, mdt->ltd_tgt, + &mdt->ltd_statfs); + no_create = (mdt->ltd_statfs.os_state & + OS_STATFS_NOCREATE); + break; } - - /* If the MDT indicated by stripe_offset can be - * found, then tell client to resend the create - * request to the correct MDT, otherwise return - * error to client */ - if (found_mdt) - GOTO(out, rc = -EREMOTE); - else - GOTO(out, rc = -EINVAL); } - } else if (hint && hint->dah_eadata) { - lmu = &buf; - lmu->lb_buf = (void *)hint->dah_eadata; - lmu->lb_len = hint->dah_eadata_len; + + if (!no_create) + GOTO(out, rc); } - rc = lod_declare_dir_striping_create(env, dt, attr, lmu, dof, + rc = lod_declare_dir_striping_create(env, dt, attr, &buf, dof, th); } out: @@ -5983,13 +6278,14 @@ out: } /** - * Generate component ID for new created component. - * - * \param[in] lo LOD object - * \param[in] comp_idx index of ldo_comp_entries - * - * \retval component ID on success - * \retval LCME_ID_INVAL on failure + * lod_gen_component_id() - Generate component ID for new created component. + * @lo: LOD object + * @mirror_id: New generated component will have this @mirror_id + * @comp_idx: index of ldo_comp_entries + * + * Return: + * * %component ID on success + * * %LCME_ID_INVAL on failure */ static __u32 lod_gen_component_id(struct lod_object *lo, int mirror_id, int comp_idx) @@ -6019,10 +6315,10 @@ again: if (i == lo->ldo_comp_cnt) RETURN(pflr_id(mirror_id, id)); } - if (end == LCME_ID_MAX) { + + if (end == SEQ_ID_MAX) { + end = min_t(__u32, start, SEQ_ID_MAX) - 1; start = 1; - end = min(lo->ldo_layout_gen & LCME_ID_MASK, - (__u32)(LCME_ID_MAX - 1)); goto again; } @@ -6030,7 +6326,12 @@ again: } /** - * Creation of a striped regular object. + * lod_striped_create() - Creation of a striped regular object. + * @env: execution environment + * @dt: object + * @attr: attributes the stripes will be created with + * @dof: format of stripes (see OSD API description) + * @th: transaction handle * * The function is called to create the stripe objects for a regular * striped file. This can happen at the initial object creation or @@ -6039,14 +6340,9 @@ again: * prepared in the form of the list of objects (ldo_stripe field). * This is done during declare phase. * - * \param[in] env execution environment - * \param[in] dt object - * \param[in] attr attributes the stripes will be created with - * \param[in] dof format of stripes (see OSD API description) - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_object_format *dof, @@ -6094,10 +6390,15 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, if (lod_comp_inited(lod_comp)) continue; + if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) { + lod_comp_set_init(lod_comp); + continue; + } + if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) lod_comp_set_init(lod_comp); - if (lov_pattern(lod_comp->llc_pattern) == LOV_PATTERN_MDT) + if (lov_pattern(lod_comp->llc_pattern) & LOV_PATTERN_MDT) lod_comp_set_init(lod_comp); if (lod_comp->llc_stripe == NULL) @@ -6148,18 +6449,18 @@ static inline bool lod_obj_is_dom(struct dt_object *dt) if (!lo->ldo_comp_cnt) return false; - return (lov_pattern(lo->ldo_comp_entries[0].llc_pattern) == + return (lov_pattern(lo->ldo_comp_entries[0].llc_pattern) & LOV_PATTERN_MDT); } -/** +/* * Implementation of dt_object_operations::do_create. * * If any of preceeding methods (like ->do_declare_create(), * ->do_ah_init(), etc) chose to create a striped object, * then this method will create the master and the stripes. * - * \see dt_object_operations::do_create() in the API description for details. + * see dt_object_operations::do_create() in the API description for details. */ static int lod_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, @@ -6192,14 +6493,14 @@ lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo, if (data->locd_declare) return lod_sub_declare_destroy(env, dt, th); - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || stripe_idx == cfs_fail_val) return lod_sub_destroy(env, dt, th); return 0; } -/** +/* * Implementation of dt_object_operations::do_declare_destroy. * * If the object is a striped directory, then the function declares reference @@ -6207,7 +6508,7 @@ lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo, * destroy of all the stripes. In all the cases, it declares an intention to * destroy the object itself. * - * \see dt_object_operations::do_declare_destroy() in the API description + * see dt_object_operations::do_declare_destroy() in the API description * for details. */ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, @@ -6264,8 +6565,8 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, if (rc) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || - OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || + CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) RETURN(0); if (!lod_obj_is_striped(dt)) @@ -6300,14 +6601,14 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -/** +/* * Implementation of dt_object_operations::do_destroy. * * If the object is a striped directory, then the function removes references * from the master object (this is an index) to the stripes and destroys all * the stripes. In all the cases, the function destroys the object itself. * - * \see dt_object_operations::do_destroy() in the API description for details. + * see dt_object_operations::do_destroy() in the API description for details. */ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, struct thandle *th) @@ -6356,8 +6657,8 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, if (rc != 0) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || - OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || + CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) RETURN(0); if (!lod_obj_is_striped(dt)) @@ -6373,7 +6674,7 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, if (!dt_object_exists(stripe)) continue; - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || i == cfs_fail_val) { dt_write_lock(env, stripe, DT_TGT_CHILD); rc = lod_sub_ref_del(env, stripe, th); @@ -6397,10 +6698,10 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -/** +/* * Implementation of dt_object_operations::do_declare_ref_add. * - * \see dt_object_operations::do_declare_ref_add() in the API description + * see dt_object_operations::do_declare_ref_add() in the API description * for details. */ static int lod_declare_ref_add(const struct lu_env *env, @@ -6409,10 +6710,10 @@ static int lod_declare_ref_add(const struct lu_env *env, return lod_sub_declare_ref_add(env, dt_object_child(dt), th); } -/** +/* * Implementation of dt_object_operations::do_ref_add. * - * \see dt_object_operations::do_ref_add() in the API description for details. + * see dt_object_operations::do_ref_add() in the API description for details. */ static int lod_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) @@ -6420,10 +6721,10 @@ static int lod_ref_add(const struct lu_env *env, return lod_sub_ref_add(env, dt_object_child(dt), th); } -/** +/* * Implementation of dt_object_operations::do_declare_ref_del. * - * \see dt_object_operations::do_declare_ref_del() in the API description + * see dt_object_operations::do_declare_ref_del() in the API description * for details. */ static int lod_declare_ref_del(const struct lu_env *env, @@ -6432,10 +6733,10 @@ static int lod_declare_ref_del(const struct lu_env *env, return lod_sub_declare_ref_del(env, dt_object_child(dt), th); } -/** +/* * Implementation of dt_object_operations::do_ref_del * - * \see dt_object_operations::do_ref_del() in the API description for details. + * see dt_object_operations::do_ref_del() in the API description for details. */ static int lod_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) @@ -6443,10 +6744,10 @@ static int lod_ref_del(const struct lu_env *env, return lod_sub_ref_del(env, dt_object_child(dt), th); } -/** +/* * Implementation of dt_object_operations::do_object_sync. * - * \see dt_object_operations::do_object_sync() in the API description + * see dt_object_operations::do_object_sync() in the API description * for details. */ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt, @@ -6455,12 +6756,12 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt, return dt_object_sync(env, dt_object_child(dt), start, end); } -/** +/* * Implementation of dt_object_operations::do_object_unlock. * * Used to release LDLM lock(s). * - * \see dt_object_operations::do_object_unlock() in the API description + * see dt_object_operations::do_object_unlock() in the API description * for details. */ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, @@ -6502,12 +6803,12 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, RETURN(0); } -/** +/* * Implementation of dt_object_operations::do_object_lock. * * Used to get LDLM lock on the non-striped and striped objects. * - * \see dt_object_operations::do_object_lock() in the API description + * see dt_object_operations::do_object_lock() in the API description * for details. */ static int lod_object_lock(const struct lu_env *env, @@ -6538,7 +6839,7 @@ static int lod_object_lock(const struct lu_env *env, RETURN(rc); /* No stripes */ - if (lo->ldo_dir_stripe_count <= 1) + if (lo->ldo_dir_stripe_count == 0) RETURN(0); slave_locks_size = offsetof(typeof(*slave_locks), @@ -6572,10 +6873,6 @@ static int lod_object_lock(const struct lu_env *env, ldlm_completion_callback completion = einfo->ei_cb_cp; __u64 dlmflags = LDLM_FL_ATOMIC_CB; - if (einfo->ei_mode == LCK_PW || - einfo->ei_mode == LCK_EX) - dlmflags |= LDLM_FL_COS_INCOMPAT; - LASSERT(ns != NULL); rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS, policy, einfo->ei_mode, @@ -6599,10 +6896,10 @@ static int lod_object_lock(const struct lu_env *env, RETURN(0); } -/** +/* * Implementation of dt_object_operations::do_invalidate. * - * \see dt_object_operations::do_invalidate() in the API description for details + * see dt_object_operations::do_invalidate() in the API description for details */ static int lod_invalidate(const struct lu_env *env, struct dt_object *dt) { @@ -6638,7 +6935,15 @@ static int lod_declare_instantiate_components(const struct lu_env *env, } /** - * Check OSTs for an existing component for further extension + * lod_sel_osts_allowed() - Check OSTs for an existing component for further + * extension + * @env: execution environment for this thread + * @lo: object we're checking + * @index: index of this component + * @reserve: space to be reserved for each OST. + * @extent: layout extent for requested operation + * @comp_extent: extension component extent + * @write: if this is write operation * * Checks if OSTs are still healthy and not out of space. Gets free space * on OSTs (relative to allocation watermark rmb_low) and compares to @@ -6646,16 +6951,9 @@ static int lod_declare_instantiate_components(const struct lu_env *env, * * Decides whether or not to extend a component on its current OSTs. * - * \param[in] env execution environment for this thread - * \param[in] lo object we're checking - * \param[in] index index of this component - * \param[in] extension_size extension size for this component - * \param[in] extent layout extent for requested operation - * \param[in] comp_extent extension component extent - * \param[in] write if this is write operation - * - * \retval true - OK to extend on current OSTs - * \retval false - do not extend on current OSTs + * Return: + * * %true - OK to extend on current OSTs + * * %false - do not extend on current OSTs */ static bool lod_sel_osts_allowed(const struct lu_env *env, struct lod_object *lo, @@ -6717,9 +7015,11 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, if (sfs->os_state & OS_STATFS_ENOSPC || sfs->os_state & OS_STATFS_READONLY || + sfs->os_state & OS_STATFS_NOCREATE || sfs->os_state & OS_STATFS_DEGRADED) { - CDEBUG(D_LAYOUT, "ost %d is not availble for SEL " - "extension, state %u\n", index, sfs->os_state); + CDEBUG(D_LAYOUT, + "OST%04x unusable for SEL extension, state %x\n", + index, sfs->os_state); ret = false; break; } @@ -6749,19 +7049,19 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, } /** - * Adjust extents after component removal + * lod_sel_adjust_extents() - Adjust extents after component removal + * @env: execution environment for this thread + * @lo: object + * @max_comp: layout component + * @index: index of this component * * When we remove an extension component, we move the start of the next * component to match the start of the extension component, so no space is left * without layout. * - * \param[in] env execution environment for this thread - * \param[in] lo object - * \param[in] max_comp layout component - * \param[in] index index of this component - * - * \retval 0 on success - * \retval negative errno on error + * Return: + * * %0 on success + * * %negative errno on error */ static void lod_sel_adjust_extents(const struct lu_env *env, struct lod_object *lo, @@ -6846,6 +7146,9 @@ static __u64 lod_extension_new_end(__u64 extension_size, __u64 extent_end, } /** + * lod_sel_stripe_reserved() - Calculate reservation (per-OST extension_size) + * @lod_comp: lod component we are checking + * * Calculate the exact reservation (per-OST extension_size) on the OSTs being * instantiated. It needs to be calculated in advance and taken into account at * the instantiation time, because otherwise lod_statfs_and_check() may consider @@ -6853,9 +7156,7 @@ static __u64 lod_extension_new_end(__u64 extension_size, __u64 extent_end, * OST may turn out to be low-on-space, thus inappropriate OST may be used and * ENOSPC occurs. * - * \param[in] lod_comp lod component we are checking - * - * \retval size to reserved on each OST of lod_comp's stripe. + * Returns size to reserved on each OST of lod_comp's stripe. */ static __u64 lod_sel_stripe_reserved(struct lod_layout_component *lod_comp) { @@ -6869,14 +7170,26 @@ static __u64 lod_sel_stripe_reserved(struct lod_layout_component *lod_comp) * times, this is the data for the next call. Fields could be changed to * component indexes when needed, (e.g. if there is no need to instantiate * all the previous components up to the current position) to tell the caller - * where to start over from. */ + * where to start over from. + */ struct sel_data { int sd_force; int sd_repeat; }; /** - * Process extent updates for a particular layout component + * lod_sel_handler() - Process extent updates for a particular layout component + * @env: execution environment for this thread + * @lo: object to update the layout of + * @extent: layout extent for requested operation, update layout to fit this + * operation + * @th: transaction handle for this operation + * @max_comp: the highest comp for the portion of the layout we are operating + * on (For FLR, the chosen replica). Updated because we may remove components. + * @index: index of the extension space component we're working on + * @write: if this is write op + * @sd: if the extension is to be forced; set here to force it on the arshad + * 2nd call for the same extension component * * Handle layout updates for a particular extension space component touched by * a layout update operation. Core function of self-extending PFL feature. @@ -6915,24 +7228,9 @@ struct sel_data { * component is exhausted (all of its range has been given to real components), * so we remove it and restart processing. * - * \param[in] env execution environment for this thread - * \param[in,out] lo object to update the layout of - * \param[in] extent layout extent for requested operation, update - * layout to fit this operation - * \param[in] th transaction handle for this operation - * \param[in,out] max_comp the highest comp for the portion of the layout - * we are operating on (For FLR, the chosen - * replica). Updated because we may remove - * components. - * \param[in] index index of the extension space component we're - * working on - * \param[in] write if this is write op - * \param[in,out] force if the extension is to be forced; set here - to force it on the 2nd call for the same - extension component - * - * \retval 0 on success - * \retval negative errno on error + * Return: + * * %0 on success + * * %negative errno on error */ static int lod_sel_handler(const struct lu_env *env, struct lod_object *lo, @@ -7088,7 +7386,14 @@ static int lod_sel_handler(const struct lu_env *env, } /** - * Declare layout extent updates + * lod_declare_update_extents() - Declare layout extent updates + * @env: execution environment for this thread + * @lo: object to update the layout of + * @extent: layout extent for requested operation, update layout to + * fit this operation + * @th: transaction handle for this operation + * @pick: identifies chosen mirror for FLR layouts + * @write: if this is write op * * Handles extensions. Identifies extension components touched by current * operation and passes them to processing function. @@ -7096,16 +7401,10 @@ static int lod_sel_handler(const struct lu_env *env, * Restarts with updated layouts from the processing function until the current * operation no longer touches an extension space component. * - * \param[in] env execution environment for this thread - * \param[in,out] lo object to update the layout of - * \param[in] extent layout extent for requested operation, update layout to - * fit this operation - * \param[in] th transaction handle for this operation - * \param[in] pick identifies chosen mirror for FLR layouts - * \param[in] write if this is write op - * - * \retval 1 on layout changed, 0 on no change - * \retval negative errno on error + * Return: + * * %1 on layout changed + * * %0 on no change + * * %negative errno on error */ static int lod_declare_update_extents(const struct lu_env *env, struct lod_object *lo, struct lu_extent *extent, @@ -7123,8 +7422,13 @@ static int lod_declare_update_extents(const struct lu_env *env, ENTRY; /* This makes us work on the components of the chosen mirror */ - start_index = lo->ldo_mirrors[pick].lme_start; - max_comp = lo->ldo_mirrors[pick].lme_end + 1; + if (lo->ldo_mirrors) { + start_index = lo->ldo_mirrors[pick].lme_start; + max_comp = lo->ldo_mirrors[pick].lme_end + 1; + } else { + start_index = 0; + max_comp = lo->ldo_comp_cnt; + } if (lo->ldo_flr_state == LCM_FL_NONE) LASSERT(start_index == 0 && max_comp == lo->ldo_comp_cnt); @@ -7153,12 +7457,14 @@ static int lod_declare_update_extents(const struct lu_env *env, /* We may have added or removed components. If so, we must update the * start & ends of all the mirrors after the current one, and the end * of the current mirror. */ - change = max_comp - 1 - lo->ldo_mirrors[pick].lme_end; - if (change) { - lo->ldo_mirrors[pick].lme_end += change; - for (i = pick + 1; i < lo->ldo_mirror_count; i++) { - lo->ldo_mirrors[i].lme_start += change; - lo->ldo_mirrors[i].lme_end += change; + if (lo->ldo_mirrors) { + change = max_comp - 1 - lo->ldo_mirrors[pick].lme_end; + if (change) { + lo->ldo_mirrors[pick].lme_end += change; + for (i = pick + 1; i < lo->ldo_mirror_count; i++) { + lo->ldo_mirrors[i].lme_start += change; + lo->ldo_mirrors[i].lme_end += change; + } } } @@ -7173,22 +7479,25 @@ out: /* If striping is already instantiated or INIT'ed DOM? */ static bool lod_is_instantiation_needed(struct lod_layout_component *comp) { - return !(((lov_pattern(comp->llc_pattern) == LOV_PATTERN_MDT) && + if (comp->llc_magic == LOV_MAGIC_FOREIGN) + return false; + + return !(((lov_pattern(comp->llc_pattern) & LOV_PATTERN_MDT) && lod_comp_inited(comp)) || comp->llc_stripe); } /** - * Declare layout update for a non-FLR layout. - * - * \param[in] env execution environment for this thread - * \param[in,out] lo object to update the layout of - * \param[in] layout layout intent for requested operation, "update" is - * a process of reacting to this - * \param[in] buf buffer containing lov ea (see comment on usage inline) - * \param[in] th transaction handle for this operation - * - * \retval 0 on success - * \retval negative errno on error + * lod_declare_update_plain() - Declare layout update for a non-FLR layout. + * @env: execution environment for this thread + * @lo: object to update the layout of + * @layout: layout intent for requested operation, "update" is a process of + * reacting to this + * @buf: buffer containing lov ea (see comment on usage inline) + * @th: transaction handle for this operation + * + * Return: + * * %0 on success + * * %negative errno on error */ static int lod_declare_update_plain(const struct lu_env *env, struct lod_object *lo, struct layout_intent *layout, @@ -7250,22 +7559,22 @@ static int lod_declare_update_plain(const struct lu_env *env, lod_comp = &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1]; if (lo->ldo_comp_cnt > 1 && lod_comp->llc_extent.e_end != OBD_OBJECT_EOF && - lod_comp->llc_extent.e_end < layout->li_extent.e_end) { + lod_comp->llc_extent.e_end < layout->lai_extent.e_end) { CDEBUG_LIMIT(replay ? D_ERROR : D_LAYOUT, "%s: the defined layout [0, %#llx) does not " "covers the write range "DEXT"\n", lod2obd(d)->obd_name, lod_comp->llc_extent.e_end, - PEXT(&layout->li_extent)); + PEXT(&layout->lai_extent)); GOTO(out, rc = -EINVAL); } CDEBUG(D_LAYOUT, "%s: "DFID": update components "DEXT"\n", lod2obd(d)->obd_name, PFID(lod_object_fid(lo)), - PEXT(&layout->li_extent)); + PEXT(&layout->lai_extent)); if (!replay) { - rc = lod_declare_update_extents(env, lo, &layout->li_extent, - th, 0, layout->li_opc == LAYOUT_INTENT_WRITE); + rc = lod_declare_update_extents(env, lo, &layout->lai_extent, + th, 0, layout->lai_opc == LAYOUT_INTENT_WRITE); if (rc < 0) GOTO(out, rc); else if (rc) @@ -7279,7 +7588,7 @@ static int lod_declare_update_plain(const struct lu_env *env, for (i = 0; i < lo->ldo_comp_cnt; i++) { lod_comp = &lo->ldo_comp_entries[i]; - if (lod_comp->llc_extent.e_start >= layout->li_extent.e_end) + if (lod_comp->llc_extent.e_start >= layout->lai_extent.e_end) break; if (!replay) { @@ -7332,7 +7641,7 @@ static inline int lod_comp_index(struct lod_object *lo, return lod_comp - lo->ldo_comp_entries; } -/** +/* * Stale other mirrors by writing extent. */ static int lod_stale_components(const struct lu_env *env, struct lod_object *lo, @@ -7364,6 +7673,7 @@ restart: for (i = 0; i < lo->ldo_mirror_count; i++) { if (i == primary) continue; + rc = lod_declare_update_extents(env, lo, &pri_extent, th, i, 0); /* if update_extents changed the layout, it may have @@ -7387,6 +7697,8 @@ restart: lod_comp->llc_flags |= LCME_FL_STALE; lo->ldo_mirrors[i].lme_stale = 1; + if (lod_is_hsm(lod_comp)) + lod_comp->llc_foreign_flags |= HS_DIRTY; } } } @@ -7395,15 +7707,16 @@ restart: } /** - * check an OST's availability - * \param[in] env execution environment - * \param[in] lo lod object - * \param[in] dt dt object - * \param[in] index mirror index - * - * \retval negative if failed - * \retval 1 if \a dt is available - * \retval 0 if \a dt is not available + * lod_check_ost_avail() - check an OST's availability + * @env: execution environment + * @lo: lod object + * @dt: dt object + * @index: mirror index + * + * Return: + * * %negative if failed + * * %1 if @dt is available + * * %0 if @dt is not available */ static inline int lod_check_ost_avail(const struct lu_env *env, struct lod_object *lo, @@ -7424,12 +7737,9 @@ static inline int lod_check_ost_avail(const struct lu_env *env, } ost = OST_TGT(lod, idx); - if (ost->ltd_statfs.os_state & - (OS_STATFS_READONLY | OS_STATFS_ENOSPC | OS_STATFS_ENOINO | - OS_STATFS_NOPRECREATE) || - ost->ltd_active == 0) { - CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n", - PFID(lod_object_fid(lo)), index, idx, rc); + if (ost->ltd_active == 0) { + CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail\n", + PFID(lod_object_fid(lo)), index, idx); return 0; } @@ -7437,10 +7747,14 @@ static inline int lod_check_ost_avail(const struct lu_env *env, } /** - * Pick primary mirror for write - * \param[in] env execution environment - * \param[in] lo object - * \param[in] extent write range + * lod_primary_pick() - Pick primary mirror for write + * @env: execution environment + * @lo: object + * @extent: write range + * + * Return: + * * %0 on success + * * %negative on failure */ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, struct lu_extent *extent) @@ -7452,7 +7766,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, int picked = -1, second_pick = -1, third_pick = -1; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_FLR_RANDOM_PICK_MIRROR)) { + if (CFS_FAIL_CHECK(OBD_FAIL_FLR_RANDOM_PICK_MIRROR)) { get_random_bytes(&seq, sizeof(seq)); seq %= lo->ldo_mirror_count; } @@ -7464,6 +7778,11 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, * cluster. */ lod_qos_statfs_update(env, lod, &lod->lod_ost_descs); + + rc = lod_fill_mirrors(lo); + if (rc) + RETURN(rc); + for (i = 0; i < lo->ldo_mirror_count; i++) { bool ost_avail = true; int index = (i + seq) % lo->ldo_mirror_count; @@ -7566,7 +7885,7 @@ static int lod_prepare_resync_mirror(const struct lu_env *env, return 0; } -/** +/* * figure out the components should be instantiated for resync. */ static int lod_prepare_resync(const struct lu_env *env, struct lod_object *lo, @@ -7609,6 +7928,334 @@ static int lod_prepare_resync(const struct lu_env *env, struct lod_object *lo, return need_sync ? 0 : -EALREADY; } +static int lod_layout_pccro_check(const struct lu_env *env, + struct dt_object *dt, + struct md_layout_change *mlc) +{ + struct lod_object *lo = lod_dt_obj(dt); + int rc; + + rc = lod_striping_load(env, lo); + if (rc) + return rc; + + return lo->ldo_flr_state & LCM_FL_PCC_RDONLY ? -EALREADY : 0; +} + +/* Check if the dir layout conforms the requested one */ +static int lod_dir_layout_check(const struct lu_env *env, + struct dt_object *dt, + struct md_layout_change *mlc) +{ + struct lmv_user_md_v1 *lum = mlc->mlc_buf.lb_buf; + size_t lum_len = mlc->mlc_buf.lb_len; + struct lod_object *lo = lod_dt_obj(dt); + struct lod_device *ld = lu2lod_dev(dt->do_lu.lo_dev); + int lum_stripe_count, lum_num_objs; + int rc; + int i; + ENTRY; + + rc = lod_striping_load(env, lo); + if (rc) + RETURN(rc); + + lum_stripe_count = le32_to_cpu(lum->lum_stripe_count); + lum_num_objs = lmv_foreign_to_md_stripes(lum_len); + + if (lmv_hash_is_migrating(lo->ldo_dir_hash_type)) + lum_stripe_count = lo->ldo_dir_migrate_offset; + + if (lum_num_objs > lum_stripe_count) + RETURN(-EINVAL); + + for (i = 0; i < lum_num_objs; i++) { + struct lmv_user_mds_data *stripe_desc = lum->lum_objects + i; + struct dt_object *stripe_obj = lo->ldo_stripe[i]; + __u32 lum_mds_idx, dt_mds_idx; + int type; + + lum_mds_idx = le32_to_cpu(stripe_desc->lum_mds); + rc = lod_fld_lookup(env, ld, lu_object_fid(&stripe_obj->do_lu), + &dt_mds_idx, &type); + if (rc < 0) + RETURN(rc); + + if (lum_mds_idx == LMV_OFFSET_DEFAULT) + continue; + if (lum_mds_idx != dt_mds_idx) { + if (!lmv_hash_is_migrating(lo->ldo_dir_hash_type)) + RETURN(0); + CERROR("%s: attempt to resume migration, stripe #%d mismatch: %u != %u, rc = %d\n", + dt->do_lu.lo_dev->ld_obd->obd_name, i, + lum_mds_idx, dt_mds_idx, -EPERM); + RETURN(-EPERM); + } + } + + /* all compatibility check passed */ + RETURN(-EALREADY); +} + +static int lod_layout_check(const struct lu_env *env, + struct dt_object *dt, + struct md_layout_change *mlc) +{ + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) + return lod_dir_layout_check(env, dt, mlc); + + LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr)); + return lod_layout_pccro_check(env, dt, mlc); +} + +static struct lod_layout_component * +lod_locate_comp_hsm(struct lod_object *lo, int *hsm_mirror_id) +{ + struct lod_layout_component *lod_comp = NULL; + int i; + + if (!lo->ldo_is_composite) + return NULL; + + for (i = 0; i < lo->ldo_mirror_count; i++) { + /* + * FIXME: In the current design, there is only one HSM + * mirror component in range [0, EOF] for a FLR file. This + * should be fixed to support multiple HSM mirror components + * with different HSM backend types and partial file ranges + * in the future. + */ + if (lo->ldo_mirrors[i].lme_hsm) { + __u16 start_idx; + __u16 end_idx; + + if (hsm_mirror_id) + *hsm_mirror_id = i; + start_idx = lo->ldo_mirrors[i].lme_start; + end_idx = lo->ldo_mirrors[i].lme_end; + LASSERT(start_idx == end_idx); + lod_comp = &lo->ldo_comp_entries[start_idx]; + LASSERT(lo->ldo_is_composite && lod_is_hsm(lod_comp) && + lod_comp->llc_extent.e_start == 0 && + lod_comp->llc_extent.e_end == LUSTRE_EOF); + break; + } + } + + return lod_comp; +} + +static int lod_declare_pccro_set(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_buf *buf = &info->lti_buf; + struct lod_object *lo = lod_dt_obj(dt); + struct lod_layout_component *lod_comp; + struct lod_layout_component *comp_array; + struct lod_mirror_entry *mirror_array; + __u16 mirror_id; + int hsm_mirror_id; + int mirror_cnt; + int new_cnt; + int rc; + int i; + + ENTRY; + + rc = lod_striping_load(env, lo); + if (rc) + RETURN(rc); + + if (lo->ldo_flr_state & LCM_FL_PCC_RDONLY) + RETURN(-EALREADY); + + rc = lod_layout_data_init(info, lo->ldo_comp_cnt); + if (rc) + RETURN(rc); + + lod_comp = lod_locate_comp_hsm(lo, &hsm_mirror_id); + if (lod_comp) { + if (lod_comp->llc_foreign_flags & HS_PCCRO) { + CDEBUG(D_LAYOUT, "bad HSM flags: %#x\n", + lod_comp->llc_foreign_flags); + RETURN(-EINVAL); + } + + lod_obj_inc_layout_gen(lo); + lod_comp->llc_foreign_flags |= HS_PCCRO; + lod_comp->llc_foreign_flags &= ~HS_DIRTY; + lod_comp->llc_flags &= ~LCME_FL_STALE; + lo->ldo_mirrors[hsm_mirror_id].lme_stale = 0; + lo->ldo_flr_state |= LCM_FL_PCC_RDONLY; + buf->lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + buf, XATTR_NAME_LOV, 0, th); + RETURN(rc); + } + + /* + * Create an new composite layout with only one HSM component. + * Field @lhm_archive_uuid is used to be the identifier within HSM + * backend for the archive copy. In the PCC case with a POSIX archive, + * This can just be the original inode FID. This is important because + * the inode FID may change due to layout swaps or migration to a new + * MDT, and we do not want that to cause problems with finding the copy + * in HSM/PCC. + */ + mirror_cnt = lo->ldo_mirror_count + 1; + if (!lo->ldo_is_composite) { + LASSERT(lo->ldo_mirror_count == 0); + mirror_cnt++; + } + + OBD_ALLOC_PTR_ARRAY(mirror_array, mirror_cnt); + if (mirror_array == NULL) + RETURN(-ENOMEM); + + new_cnt = lo->ldo_comp_cnt + 1; + OBD_ALLOC_PTR_ARRAY_LARGE(comp_array, new_cnt); + if (comp_array == NULL) { + OBD_FREE_PTR_ARRAY(mirror_array, mirror_cnt); + RETURN(-ENOMEM); + } + + mirror_id = 0; + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + + /* + * Add mirror from a non-flr file, create new mirror ID. + * Otherwise, keep existing mirror's component ID, used + * for mirror extension. + */ + if (lo->ldo_mirror_count == 0 && + mirror_id_of(lod_comp->llc_id) == 0) + lod_comp->llc_id = pflr_id(1, i + 1); + + if (lod_comp->llc_id != LCME_ID_INVAL && + mirror_id_of(lod_comp->llc_id) > mirror_id) + mirror_id = mirror_id_of(lod_comp->llc_id); + + if (!lo->ldo_is_composite) { + lod_comp->llc_extent.e_start = 0; + lod_comp->llc_extent.e_end = LUSTRE_EOF; + lod_comp_set_init(lod_comp); + } + } + + memcpy(comp_array, lo->ldo_comp_entries, + sizeof(*comp_array) * lo->ldo_comp_cnt); + + lod_comp = &comp_array[new_cnt - 1]; + lod_comp->llc_magic = LOV_MAGIC_FOREIGN; + lod_comp->llc_extent.e_start = 0; + lod_comp->llc_extent.e_end = LUSTRE_EOF; + lod_comp->llc_length = sizeof(struct lov_hsm_base); + lod_comp->llc_type = LU_FOREIGN_TYPE_PCCRO; + lod_comp->llc_foreign_flags = HS_EXISTS | HS_ARCHIVED | HS_PCCRO; + memset(&lod_comp->llc_hsm, 0, sizeof(lod_comp->llc_hsm)); + + if (lo->ldo_mirrors) + OBD_FREE_PTR_ARRAY(lo->ldo_mirrors, lo->ldo_mirror_count); + OBD_FREE_PTR_ARRAY_LARGE(lo->ldo_comp_entries, lo->ldo_comp_cnt); + + /* + * The @ldo_mirror will be refilled by lod_fill_mirrors() when + * call lod_striped_create() for layout change. + */ + lo->ldo_mirrors = mirror_array; + lo->ldo_mirror_count = mirror_cnt; + lo->ldo_comp_entries = comp_array; + lo->ldo_comp_cnt = new_cnt; + lo->ldo_is_composite = 1; + + ++mirror_id; + lod_comp->llc_id = LCME_ID_INVAL; + lod_comp->llc_id = lod_gen_component_id(lo, mirror_id, new_cnt - 1); + + if (lo->ldo_flr_state == LCM_FL_NONE) + lo->ldo_flr_state = LCM_FL_RDONLY; + lo->ldo_flr_state |= LCM_FL_PCC_RDONLY; + buf->lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + buf, XATTR_NAME_LOV, 0, th); + if (rc) + lod_striping_free(env, lo); + + RETURN(rc); +} + +/* + * TODO: When clear LCM_FL_PCC_RDONLY flag from the layouts, it means the file + * is going to be modified. Currently it needs two RPCs: first one is to clear + * LCM_FL_PCC_RDONLY flag; the second one is to pick primary mirror and mark + * the file as LCM_FL_WRITE_PENDING. + * These two RPCs can be combined in one RPC call. + */ +static int lod_declare_pccro_clear(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct lod_layout_component *lod_comp; + int rc; + + ENTRY; + + rc = lod_striping_load(env, lo); + if (rc) + RETURN(rc); + + if (!(lo->ldo_flr_state & LCM_FL_PCC_RDONLY)) + RETURN(-EALREADY); + + rc = lod_layout_data_init(info, lo->ldo_comp_cnt); + if (rc) + RETURN(rc); + + lod_comp = lod_locate_comp_hsm(lo, NULL); + if (lod_comp == NULL) { + CDEBUG(D_LAYOUT, "Not found any HSM component\n"); + GOTO(out, rc = -EINVAL); + } + + lod_comp->llc_foreign_flags &= ~HS_PCCRO; + lo->ldo_flr_state &= ~LCM_FL_PCC_RDONLY; + lod_obj_inc_layout_gen(lo); + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + &info->lti_buf, XATTR_NAME_LOV, 0, th); +out: + if (rc) + lod_striping_free(env, lo); + + RETURN(rc); +} + +static int lod_declare_update_pccro(const struct lu_env *env, + struct dt_object *dt, + struct md_layout_change *mlc, + struct thandle *th) +{ + struct layout_intent *intent = mlc->mlc_intent; + int rc; + + switch (intent->lai_opc) { + case LAYOUT_INTENT_PCCRO_SET: + rc = lod_declare_pccro_set(env, dt, th); + break; + case LAYOUT_INTENT_PCCRO_CLEAR: + rc = lod_declare_pccro_clear(env, dt, th); + break; + default: + rc = -EOPNOTSUPP; + break; + } + + return rc; +} + static int lod_declare_update_rdonly(const struct lu_env *env, struct lod_object *lo, struct md_layout_change *mlc, struct thandle *th) @@ -7627,10 +8274,10 @@ static int lod_declare_update_rdonly(const struct lu_env *env, if (mlc->mlc_opc == MD_LAYOUT_WRITE) { struct layout_intent *layout = mlc->mlc_intent; - int write = layout->li_opc == LAYOUT_INTENT_WRITE; + int write = layout->lai_opc == LAYOUT_INTENT_WRITE; int picked; - extent = layout->li_extent; + extent = layout->lai_extent; CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n", PFID(lod_object_fid(lo)), PEXT(&extent)); @@ -7648,7 +8295,7 @@ static int lod_declare_update_rdonly(const struct lu_env *env, if (rc < 0) GOTO(out, rc); - if (layout->li_opc == LAYOUT_INTENT_TRUNC) { + if (layout->lai_opc == LAYOUT_INTENT_TRUNC) { /** * trunc transfers [0, size) in the intent extent, we'd * stale components overlapping [size, eof). @@ -7663,7 +8310,7 @@ static int lod_declare_update_rdonly(const struct lu_env *env, GOTO(out, rc); /* restore truncate intent extent */ - if (layout->li_opc == LAYOUT_INTENT_TRUNC) + if (layout->lai_opc == LAYOUT_INTENT_TRUNC) extent.e_end = extent.e_start; /* instantiate components for the picked mirror, start from 0 */ @@ -7725,19 +8372,13 @@ static int lod_declare_update_rdonly(const struct lu_env *env, * This way it can make sure that the layout version is * monotonously increased in this writing era. */ lod_obj_inc_layout_gen(lo); - if (lo->ldo_layout_gen > (LCME_ID_MAX >> 1)) { - __u32 layout_version; - - get_random_bytes(&layout_version, sizeof(layout_version)); - lo->ldo_layout_gen = layout_version & 0xffff; - } rc = lod_declare_instantiate_components(env, lo, th, 0); if (rc) GOTO(out, rc); layout_attr->la_valid = LA_LAYOUT_VERSION; - layout_attr->la_layout_version = 0; /* set current version */ + layout_attr->la_layout_version = 0; if (mlc->mlc_opc == MD_LAYOUT_RESYNC) layout_attr->la_layout_version = LU_LAYOUT_RESYNC; rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); @@ -7773,6 +8414,8 @@ static int lod_declare_update_write_pending(const struct lu_env *env, continue; if (lo->ldo_mirrors[i].lme_prefer == 0) continue; + if (lo->ldo_mirrors[i].lme_hsm) + continue; primary = i; break; @@ -7808,11 +8451,11 @@ static int lod_declare_update_write_pending(const struct lu_env *env, if (mlc->mlc_opc == MD_LAYOUT_WRITE) { struct layout_intent *layout = mlc->mlc_intent; - int write = layout->li_opc == LAYOUT_INTENT_WRITE; + int write = layout->lai_opc == LAYOUT_INTENT_WRITE; - LASSERT(mlc->mlc_intent != NULL); + LASSERT(layout != NULL); - extent = mlc->mlc_intent->li_extent; + extent = layout->lai_extent; CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n", PFID(lod_object_fid(lo)), PEXT(&extent)); @@ -7823,7 +8466,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env, if (rc < 0) GOTO(out, rc); - if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) { + if (layout->lai_opc == LAYOUT_INTENT_TRUNC) { /** * trunc transfers [0, size) in the intent extent, we'd * stale components overlapping [size, eof). @@ -7841,7 +8484,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env, * instantiate [0, mlc->mlc_intent->e_end) */ /* restore truncate intent extent */ - if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) + if (layout->lai_opc == LAYOUT_INTENT_TRUNC) extent.e_end = extent.e_start; extent.e_start = 0; @@ -7887,20 +8530,20 @@ static int lod_declare_update_write_pending(const struct lu_env *env, if (rc) GOTO(out, rc); + lod_obj_inc_layout_gen(lo); + /* 3. transfer layout version to OST objects. * transfer new layout version to OST objects so that stale writes * can be denied. It also ends an era of writing by setting * LU_LAYOUT_RESYNC. Normal client can never use this bit to * send write RPC; only resync RPCs could do it. */ layout_attr->la_valid = LA_LAYOUT_VERSION; - layout_attr->la_layout_version = 0; /* set current version */ + layout_attr->la_layout_version = 0; if (mlc->mlc_opc == MD_LAYOUT_RESYNC) layout_attr->la_layout_version = LU_LAYOUT_RESYNC; rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); if (rc) GOTO(out, rc); - - lod_obj_inc_layout_gen(lo); out: if (rc) lod_striping_free(env, lo); @@ -7986,7 +8629,7 @@ static int lod_declare_update_sync_pending(const struct lu_env *env, lod_obj_inc_layout_gen(lo); layout_attr->la_valid = LA_LAYOUT_VERSION; - layout_attr->la_layout_version = 0; /* set current version */ + layout_attr->la_layout_version = 0; rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); if (rc) GOTO(out, rc); @@ -8007,17 +8650,17 @@ typedef int (*mlc_handler)(const struct lu_env *env, struct dt_object *dt, struct thandle *th); /** - * Attach stripes after target's for migrating directory. NB, we - * only need to declare this, the actual work is done inside + * lod_dir_declare_layout_attach() - Attach stripes after target's for migrating + * directory. NB, we only need to declare this, the actual work is done inside * lod_xattr_set_lmv(). - * - * \param[in] env execution environment - * \param[in] dt target object - * \param[in] mlc layout change data - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed + * @env: execution environment + * @dt: target object + * @mlc: layout change data + * @th: transaction handle + * + * Return: + * * %0 on success + * * %negative if failed */ static int lod_dir_declare_layout_attach(const struct lu_env *env, struct dt_object *dt, @@ -8052,8 +8695,8 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env, if (!lmv_is_sane(lmv)) RETURN(-EINVAL); - if (!dt_try_as_dir(env, dt)) - return -ENOTDIR; + if (!dt_try_as_dir(env, dt, false)) + RETURN(-ENOTDIR); dof->dof_type = DFT_DIR; @@ -8093,7 +8736,7 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env, stripes[i + lo->ldo_dir_stripe_count] = dto; - if (!dt_try_as_dir(env, dto)) + if (!dt_try_as_dir(env, dto, true)) GOTO(out, rc = -ENOTDIR); rc = lod_sub_declare_ref_add(env, dto, th); @@ -8151,9 +8794,11 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env, OBD_FREE_PTR_ARRAY(lo->ldo_stripe, lo->ldo_dir_stripes_allocated); lo->ldo_stripe = stripes; + lo->ldo_is_foreign = 0; lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count; lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type); lo->ldo_dir_stripe_count += stripe_count; + lo->ldo_dir_layout_version++; lo->ldo_dir_stripes_allocated += stripe_count; /* plain directory split creates target as a plain directory, while @@ -8185,7 +8830,7 @@ static int lod_dir_declare_layout_detach(const struct lu_env *env, int i; int rc = 0; - if (!dt_try_as_dir(env, dt)) + if (!dt_try_as_dir(env, dt, true)) return -ENOTDIR; if (!lo->ldo_dir_stripe_count) @@ -8197,7 +8842,7 @@ static int lod_dir_declare_layout_detach(const struct lu_env *env, if (!dto) continue; - if (!dt_try_as_dir(env, dto)) + if (!dt_try_as_dir(env, dto, true)) return -ENOTDIR; rc = lod_sub_declare_delete(env, dto, @@ -8230,7 +8875,7 @@ static int dt_dir_is_empty(const struct lu_env *env, ENTRY; - if (!dt_try_as_dir(env, obj)) + if (!dt_try_as_dir(env, obj, true)) RETURN(-ENOTDIR); iops = &obj->do_index_ops->dio_it; @@ -8277,7 +8922,7 @@ static int lod_dir_declare_layout_shrink(const struct lu_env *env, LASSERT(lmu); - if (!dt_try_as_dir(env, dt)) + if (!dt_try_as_dir(env, dt, true)) return -ENOTDIR; /* shouldn't be called on plain directory */ @@ -8336,15 +8981,15 @@ static int lod_dir_declare_layout_shrink(const struct lu_env *env, } /** - * Allocate stripes for split directory. - * - * \param[in] env execution environment - * \param[in] dt target object - * \param[in] mlc layout change data - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed + * lod_dir_declare_layout_split() - Allocate stripes for split directory. + * @env: execution environment + * @dt: target object + * @mlc: layout change data + * @th: transaction handle + * + * Return: + * * %0 on success + * * %negative if failed */ static int lod_dir_declare_layout_split(const struct lu_env *env, struct dt_object *dt, @@ -8357,6 +9002,7 @@ static int lod_dir_declare_layout_split(const struct lu_env *env, struct dt_object_format *dof = &info->lti_format; struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata; struct dt_object **stripes; + int mdt_count = lod->lod_remote_mdt_count + 1; u32 stripe_count; u32 saved_count; int i; @@ -8367,8 +9013,27 @@ static int lod_dir_declare_layout_split(const struct lu_env *env, LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC); LASSERT(le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT); - saved_count = lo->ldo_dir_stripes_allocated; + saved_count = lo->ldo_dir_stripe_count; stripe_count = le32_to_cpu(lum->lum_stripe_count); + + /* if the split target is overstriped, we need to put that flag in the + * current layout so it can allocate the larger number of stripes + * + * Note we need to pick up any hash *flags* which affect allocation + * *before* allocation, so they're used in allocating the directory, + * rather than after when we finalize directory setup (at the end of + * this function). + */ + if (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_FLAG_OVERSTRIPED) { + /* silently clamp stripe count if it exceeds limit */ + if (stripe_count > mdt_count * lod->lod_max_stripes_per_mdt) + stripe_count = mdt_count * lod->lod_max_stripes_per_mdt; + if (stripe_count > mdt_count) + lo->ldo_dir_hash_type |= LMV_HASH_FLAG_OVERSTRIPED; + } else if (stripe_count > mdt_count) { + RETURN(-E2BIG); + } + if (stripe_count <= saved_count) RETURN(-EINVAL); @@ -8378,10 +9043,11 @@ static int lod_dir_declare_layout_split(const struct lu_env *env, if (!stripes) RETURN(-ENOMEM); - for (i = 0; i < lo->ldo_dir_stripes_allocated; i++) + for (i = 0; i < saved_count; i++) stripes[i] = lo->ldo_stripe[i]; lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs); + rc = lod_mdt_alloc_qos(env, lo, stripes, saved_count, stripe_count); if (rc == -EAGAIN) rc = lod_mdt_alloc_rr(env, lo, stripes, saved_count, @@ -8395,6 +9061,7 @@ static int lod_dir_declare_layout_split(const struct lu_env *env, OBD_FREE(lo->ldo_stripe, sizeof(*stripes) * lo->ldo_dir_stripes_allocated); lo->ldo_stripe = stripes; + lo->ldo_is_foreign = 0; lo->ldo_dir_striped = 1; lo->ldo_dir_stripe_count = rc; lo->ldo_dir_stripes_allocated = stripe_count; @@ -8415,21 +9082,23 @@ static int lod_dir_declare_layout_split(const struct lu_env *env, RETURN(rc); } -/* - * detach all stripes from dir master object, NB, stripes are not destroyed, but +/** + * lod_dir_layout_detach() - detach all stripes from dir master object + * @env: execution environment + * @dt: target object + * @mlc: layout change data + * @th: transaction handle + * + * Detach all stripes from dir master object, NB, stripes are not destroyed, but * deleted from it's parent namespace, this function is called in two places: * 1. mdd_migrate_mdt() detach stripes from source, and attach them to * target. * 2. mdd_dir_layout_update() detach stripe before turning 1-stripe directory to * a plain directory. * - * \param[in] env execution environment - * \param[in] dt target object - * \param[in] mlc layout change data - * \param[in] th transaction handle - * - * \retval 0 on success - * \retval negative if failed + * Return: + * * %0 on success + * * %negative if failed */ static int lod_dir_layout_detach(const struct lu_env *env, struct dt_object *dt, @@ -8625,6 +9294,19 @@ static int lod_declare_layout_change(const struct lu_env *env, dt_object_remote(dt_object_child(dt))) RETURN(-EINVAL); + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + struct layout_intent *intent = mlc->mlc_intent; + + if (intent->lai_opc == LAYOUT_INTENT_PCCRO_SET || + intent->lai_opc == LAYOUT_INTENT_PCCRO_CLEAR) { + if (!S_ISREG(dt->do_lu.lo_header->loh_attr)) + RETURN(-EINVAL); + + rc = lod_declare_update_pccro(env, dt, mlc, th); + RETURN(rc); + } + } + rc = lod_striping_load(env, lo); if (rc) GOTO(out, rc); @@ -8653,18 +9335,32 @@ static int lod_declare_layout_change(const struct lu_env *env, rc = -ENOTSUPP; break; } + if (rc == 0) + rc = lod_save_layout_gen_intrans(info, lo); + out: RETURN(rc); } /** - * Instantiate layout component objects which covers the intent write offset. + * lod_layout_change() - Client is trying to write to un-instantiated + * layout component. + * @env: execution environment for this thread + * @dt: object + * @mlc: data structure to describe the changes to the DT object's layout + * @th: transactional handle (for atomic changes) + * + * Instantiate layout component objects which covers the intent write + * offset. + * + * Return: 0 on success, negative on error */ static int lod_layout_change(const struct lu_env *env, struct dt_object *dt, struct md_layout_change *mlc, struct thandle *th) { + struct lod_thread_info *info = lod_env_info(env); struct lu_attr *attr = &lod_env_info(env)->lti_attr; - struct lu_attr *layout_attr = &lod_env_info(env)->lti_layout_attr; + struct lu_attr *layout_attr = &info->lti_layout_attr; struct lod_object *lo = lod_dt_obj(dt); int rc; @@ -8676,6 +9372,16 @@ static int lod_layout_change(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } + rc = lod_check_layout_gen_intrans(info, lo); + if (rc > 0) { + CDEBUG(D_LAYOUT, + "%s: obj "DFID" gen changed from %d to %d in transaction, retry the transaction \n", + dt->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), + info->lti_gen[rc - 1], lo->ldo_layout_gen); + RETURN(-EAGAIN); + } + rc = lod_striped_create(env, dt, attr, NULL, th); if (!rc && layout_attr->la_valid & LA_LAYOUT_VERSION) { layout_attr->la_layout_version |= lo->ldo_layout_gen; @@ -8716,12 +9422,13 @@ const struct dt_object_operations lod_obj_ops = { .do_invalidate = lod_invalidate, .do_declare_layout_change = lod_declare_layout_change, .do_layout_change = lod_layout_change, + .do_layout_check = lod_layout_check, }; -/** +/* * Implementation of dt_body_operations::dbo_read. * - * \see dt_body_operations::dbo_read() in the API description for details. + * see dt_body_operations::dbo_read() in the API description for details. */ static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, loff_t *pos) @@ -8733,10 +9440,10 @@ static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, return next->do_body_ops->dbo_read(env, next, buf, pos); } -/** +/* * Implementation of dt_body_operations::dbo_declare_write. * - * \see dt_body_operations::dbo_declare_write() in the API description + * see dt_body_operations::dbo_declare_write() in the API description * for details. */ static ssize_t lod_declare_write(const struct lu_env *env, @@ -8747,10 +9454,10 @@ static ssize_t lod_declare_write(const struct lu_env *env, return lod_sub_declare_write(env, dt_object_child(dt), buf, pos, th); } -/** +/* * Implementation of dt_body_operations::dbo_write. * - * \see dt_body_operations::dbo_write() in the API description for details. + * see dt_body_operations::dbo_write() in the API description for details. */ static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, @@ -8795,15 +9502,21 @@ static const struct dt_body_operations lod_body_ops = { }; /** - * Implementation of lu_object_operations::loo_object_init. + * lod_object_init() - Implementation of lu_object_operations::loo_object_init. + * @env: execution environment + * @lo: pointer to lu_object (based on FID) + * @conf: @lo configuration * * The function determines the type and the index of the target device using * sequence of the object's FID. Then passes control down to the - * corresponding device: - * OSD for the local objects, OSP for remote + * corresponding device: OSD for the local objects, OSP for remote * - * \see lu_object_operations::loo_object_init() in the API description + * see lu_object_operations::loo_object_init() in the API description * for details. + * + * Return: + * * %0 on Success + * * %negative on Error */ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, const struct lu_object_conf *conf) @@ -8863,76 +9576,27 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, } /** - * - * Alloc cached foreign LOV - * - * \param[in] lo object - * \param[in] size size of foreign LOV - * - * \retval 0 on success - * \retval negative if failed - */ -int lod_alloc_foreign_lov(struct lod_object *lo, size_t size) -{ - OBD_ALLOC_LARGE(lo->ldo_foreign_lov, size); - if (lo->ldo_foreign_lov == NULL) - return -ENOMEM; - lo->ldo_foreign_lov_size = size; - lo->ldo_is_foreign = 1; - return 0; -} - -/** - * - * Free cached foreign LOV - * - * \param[in] lo object - */ -void lod_free_foreign_lov(struct lod_object *lo) -{ - if (lo->ldo_foreign_lov != NULL) - OBD_FREE_LARGE(lo->ldo_foreign_lov, lo->ldo_foreign_lov_size); - lo->ldo_foreign_lov = NULL; - lo->ldo_foreign_lov_size = 0; - lo->ldo_is_foreign = 0; -} - -/** - * - * Free cached foreign LMV - * - * \param[in] lo object - */ -void lod_free_foreign_lmv(struct lod_object *lo) -{ - if (lo->ldo_foreign_lmv != NULL) - OBD_FREE_LARGE(lo->ldo_foreign_lmv, lo->ldo_foreign_lmv_size); - lo->ldo_foreign_lmv = NULL; - lo->ldo_foreign_lmv_size = 0; - lo->ldo_dir_is_foreign = 0; -} - -/** - * - * Release resources associated with striping. + * lod_striping_free_nolock() - Release resources associated with striping. + * @env: execution environment + * @lo: object (file/folder) * * If the object is striped (regular or directory), then release * the stripe objects references and free the ldo_stripe array. - * - * \param[in] env execution environment - * \param[in] lo object */ void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) { struct lod_layout_component *lod_comp; + __u32 obj_attr = lo->ldo_obj.do_lu.lo_header->loh_attr; int i, j; if (unlikely(lo->ldo_is_foreign)) { - lod_free_foreign_lov(lo); - lo->ldo_comp_cached = 0; - } else if (unlikely(lo->ldo_dir_is_foreign)) { - lod_free_foreign_lmv(lo); - lo->ldo_dir_stripe_loaded = 0; + if (S_ISREG(obj_attr)) { + lod_free_foreign_lov(lo); + lo->ldo_comp_cached = 0; + } else if (S_ISDIR(obj_attr)) { + lod_free_foreign_lmv(lo); + lo->ldo_dir_stripe_loaded = 0; + } } else if (lo->ldo_stripe != NULL) { LASSERT(lo->ldo_comp_entries == NULL); LASSERT(lo->ldo_dir_stripes_allocated > 0); @@ -8948,11 +9612,15 @@ void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) lo->ldo_dir_stripes_allocated = 0; lo->ldo_dir_stripe_loaded = 0; lo->ldo_dir_stripe_count = 0; + lo->ldo_obj.do_index_ops = NULL; } else if (lo->ldo_comp_entries != NULL) { for (i = 0; i < lo->ldo_comp_cnt; i++) { /* free lod_layout_component::llc_stripe array */ lod_comp = &lo->ldo_comp_entries[i]; + /* HSM layout component */ + if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) + continue; if (lod_comp->llc_stripe == NULL) continue; LASSERT(lod_comp->llc_stripes_allocated != 0); @@ -8981,10 +9649,10 @@ void lod_striping_free(const struct lu_env *env, struct lod_object *lo) mutex_unlock(&lo->ldo_layout_mutex); } -/** +/* * Implementation of lu_object_operations::loo_object_free. * - * \see lu_object_operations::loo_object_free() in the API description + * see lu_object_operations::loo_object_free() in the API description * for details. */ static void lod_object_free(const struct lu_env *env, struct lu_object *o) @@ -8998,10 +9666,10 @@ static void lod_object_free(const struct lu_env *env, struct lu_object *o) OBD_SLAB_FREE_PTR(lo, lod_object_kmem); } -/** +/* * Implementation of lu_object_operations::loo_object_release. * - * \see lu_object_operations::loo_object_release() in the API description + * see lu_object_operations::loo_object_release() in the API description * for details. */ static void lod_object_release(const struct lu_env *env, struct lu_object *o) @@ -9010,10 +9678,10 @@ static void lod_object_release(const struct lu_env *env, struct lu_object *o) * creation failed before? */ } -/** +/* * Implementation of lu_object_operations::loo_object_print. * - * \see lu_object_operations::loo_object_print() in the API description + * see lu_object_operations::loo_object_print() in the API description * for details. */ static int lod_object_print(const struct lu_env *env, void *cookie,