* Group identifier for quota purposes.
*/
gid_t cat_gid;
+
+ /* nlink of the directory */
+ __u64 cat_nlink;
};
/**
* @{
*/
-
+#include <obd_support.h>
/*
* super-class definitions.
*/
*/
int (*dt_trans_start)(const struct lu_env *env,
struct dt_device *dev, struct thandle *th);
- /**
- * Finish previously started transaction.
- */
- int (*dt_trans_stop)(const struct lu_env *env,
- struct thandle *th);
+ /**
+ * Finish previously started transaction.
+ */
+ int (*dt_trans_stop)(const struct lu_env *env, struct dt_device *dev,
+ struct thandle *th);
/**
* Add commit callback to the transaction.
*/
* It can contain any allocation hint in the future.
*/
struct dt_allocation_hint {
- struct dt_object *dah_parent;
- __u32 dah_mode;
+ struct dt_object *dah_parent;
+ const void *dah_eadata;
+ int dah_eadata_len;
+ __u32 dah_mode;
};
/**
return container_of0(o, struct dt_object, do_lu);
}
+struct thandle_update {
+ /* In DNE, one transaction can be disassembled into
+ * updates on several different MDTs, and these updates
+ * will be attached to tu_remote_update_list per target.
+ * Only single thread will access the list, no need lock
+ */
+ struct list_head tu_remote_update_list;
+
+ /* sent after or before local transaction */
+ unsigned int tu_sent_after_local_trans:1,
+ tu_only_remote_trans:1;
+};
+
/**
* This is the general purpose transaction handle.
* 1. Transaction Life Cycle
/** the dt device on which the transactions are executed */
struct dt_device *th_dev;
+ atomic_t th_refc;
+ /* the size of transaction */
+ int th_alloc_size;
+
/** context for this transaction, tag is LCT_TX_HANDLE */
struct lu_context th_ctx;
/* local transation, no need to inform other layers */
unsigned int th_local:1;
- /* In DNE, one transaction can be disassemblied into
- * updates on several different MDTs, and these updates
- * will be attached to th_remote_update_list per target.
- * Only single thread will access the list, no need lock
- */
- cfs_list_t th_remote_update_list;
- struct update_request *th_current_request;
+ struct thandle_update *th_update;
};
+static inline void thandle_get(struct thandle *thandle)
+{
+ atomic_inc(&thandle->th_refc);
+}
+
+static inline void thandle_put(struct thandle *thandle)
+{
+ if (atomic_dec_and_test(&thandle->th_refc)) {
+ if (thandle->th_update != NULL)
+ OBD_FREE_PTR(thandle->th_update);
+ OBD_FREE(thandle, thandle->th_alloc_size);
+ }
+}
/**
* Transaction call-backs.
*
static inline int dt_trans_stop(const struct lu_env *env,
struct dt_device *d, struct thandle *th)
{
- LASSERT(d->dd_ops->dt_trans_stop);
- return d->dd_ops->dt_trans_stop(env, th);
+ LASSERT(d->dd_ops->dt_trans_stop);
+ return d->dd_ops->dt_trans_stop(env, d, th);
}
static inline int dt_trans_cb_add(struct thandle *th,
/* target/out_lib.c */
struct update_request *
-out_find_update(struct thandle *th, struct dt_device *dt_dev);
+out_find_update(struct thandle_update *tu, struct dt_device *dt_dev);
void out_destroy_update_req(struct update_request *update);
struct update_request *out_create_update_req(struct dt_device *dt);
struct update_request *out_find_create_update_loc(struct thandle *th,
#define XATTR_NAME_LOV "trusted.lov"
#define XATTR_NAME_LMA "trusted.lma"
#define XATTR_NAME_LMV "trusted.lmv"
+#define XATTR_NAME_DEFALT_LMV "trusted.dmv"
#define XATTR_NAME_LINK "trusted.link"
#define XATTR_NAME_FID "trusted.fid"
#define XATTR_NAME_VERSION "trusted.version"
__u32 ld_tgt_count; /* how many MDS's */
__u32 ld_active_tgt_count; /* how many active */
__u32 ld_default_stripe_count; /* how many objects are used */
- __u32 ld_pattern; /* default MEA_MAGIC_* */
+ __u32 ld_pattern; /* default hash pattern */
__u64 ld_default_hash_size;
__u64 ld_padding_1; /* also fix lustre_swab_lmv_desc */
__u32 ld_padding_2; /* also fix lustre_swab_lmv_desc */
#define LMV_MAGIC_V1 0x0CD10CD0 /* normal stripe lmv magic */
#define LMV_USER_MAGIC 0x0CD20CD0 /* default lmv magic*/
#define LMV_MAGIC LMV_MAGIC_V1
+
+enum lmv_hash_type {
+ LMV_HASH_TYPE_ALL_CHARS = 1,
+ LMV_HASH_TYPE_FNV_1A_64 = 2,
+};
+
+#define LMV_HASH_NAME_ALL_CHARS "all_char"
+#define LMV_HASH_NAME_FNV_1A_64 "fnv_1a_64"
+
+/**
+ * The FNV-1a hash algorithm is as follows:
+ * hash = FNV_offset_basis
+ * for each octet_of_data to be hashed
+ * hash = hash XOR octet_of_data
+ * hash = hash × FNV_prime
+ * return hash
+ * http://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function#FNV-1a_hash
+ *
+ * http://www.isthe.com/chongo/tech/comp/fnv/index.html#FNV-reference-source
+ * FNV_prime is 2^40 + 2^8 + 0xb3 = 0x100000001b3ULL
+ **/
+#define LUSTRE_FNV_1A_64_PRIME 0x100000001b3ULL
+#define LUSTRE_FNV_1A_64_OFFSET_BIAS 0xcbf29ce484222325ULL
+static inline __u64 lustre_hash_fnv_1a_64(const void *buf, size_t size)
+{
+ __u64 hash = LUSTRE_FNV_1A_64_OFFSET_BIAS;
+ const unsigned char *p = buf;
+ size_t i;
+
+ for (i = 0; i < size; i++) {
+ hash ^= p[i];
+ hash *= LUSTRE_FNV_1A_64_PRIME;
+ }
+
+ return hash;
+}
+
struct lmv_mds_md_v1 {
__u32 lmv_magic;
__u32 lmv_stripe_count; /* stripe count */
} __attribute__((packed));
#endif
-/* keep this to be the same size as lov_user_ost_data_v1 */
struct lmv_user_mds_data {
struct lu_fid lum_fid;
__u32 lum_padding;
__u32 lum_mds;
};
-/* lum_type */
-enum {
- LMV_STRIPE_TYPE = 0,
- LMV_DEFAULT_TYPE = 1,
-};
-
+/* Got this according to how get LOV_MAX_STRIPE_COUNT, see above,
+ * (max buffer size - lmv+rpc header) / sizeof(struct lmv_user_mds_data) */
+#define LMV_MAX_STRIPE_COUNT 2000 /* ((12 * 4096 - 256) / 24) */
#define lmv_user_md lmv_user_md_v1
struct lmv_user_md_v1 {
__u32 lum_magic; /* must be the first field */
__u32 lum_padding3;
char lum_pool_name[LOV_MAXPOOLNAME];
struct lmv_user_mds_data lum_objects[0];
-};
+} __attribute__((packed));
static inline int lmv_user_md_size(int stripes, int lmm_magic)
{
extern int llapi_file_fget_mdtidx(int fd, int *mdtidx);
extern int llapi_dir_create_pool(const char *name, int flags, int stripe_offset,
int stripe_count, int stripe_pattern,
- char *poolname);
+ const char *poolname);
int llapi_direntry_remove(char *dname);
extern int llapi_obd_statfs(char *path, __u32 type, __u32 index,
struct obd_statfs *stat_buf,
#define LOVEA_DELETE_VALUES(size, count, offset) (size == 0 && count == 0 && \
offset == (typeof(offset))(-1))
+#define LMVEA_DELETE_VALUES(count, offset) ((count) == 0 && \
+ (offset) == (typeof(offset))(-1))
/* #define POISON_BULK 0 */
/*
void lmv_free_md(union lmv_mds_md *lmm);
int lmv_alloc_memmd(struct lmv_stripe_md **lsmp, int stripe_count);
void lmv_free_memmd(struct lmv_stripe_md *lsm);
+
+static inline void lmv1_cpu_to_le(struct lmv_mds_md_v1 *lmv_dst,
+ const struct lmv_mds_md_v1 *lmv_src)
+{
+ int i;
+
+ lmv_dst->lmv_magic = cpu_to_le32(lmv_src->lmv_magic);
+ lmv_dst->lmv_stripe_count = cpu_to_le32(lmv_src->lmv_stripe_count);
+ lmv_dst->lmv_master_mdt_index =
+ cpu_to_le32(lmv_src->lmv_master_mdt_index);
+ lmv_dst->lmv_hash_type = cpu_to_le32(lmv_src->lmv_hash_type);
+ lmv_dst->lmv_layout_version = cpu_to_le32(lmv_src->lmv_layout_version);
+ for (i = 0; i < lmv_src->lmv_stripe_count; i++)
+ fid_cpu_to_le(&lmv_dst->lmv_stripe_fids[i],
+ &lmv_src->lmv_stripe_fids[i]);
+}
+
+static inline void lmv1_le_to_cpu(struct lmv_mds_md_v1 *lmv_dst,
+ const struct lmv_mds_md_v1 *lmv_src)
+{
+ int i;
+
+ lmv_dst->lmv_magic = le32_to_cpu(lmv_src->lmv_magic);
+ lmv_dst->lmv_stripe_count = le32_to_cpu(lmv_src->lmv_stripe_count);
+ lmv_dst->lmv_master_mdt_index =
+ le32_to_cpu(lmv_src->lmv_master_mdt_index);
+ lmv_dst->lmv_hash_type = le32_to_cpu(lmv_src->lmv_hash_type);
+ lmv_dst->lmv_layout_version = le32_to_cpu(lmv_src->lmv_layout_version);
+ for (i = 0; i < lmv_src->lmv_stripe_count; i++)
+ fid_le_to_cpu(&lmv_dst->lmv_stripe_fids[i],
+ &lmv_src->lmv_stripe_fids[i]);
+}
+
+static inline void lmv_cpu_to_le(union lmv_mds_md *lmv_dst,
+ const union lmv_mds_md *lmv_src)
+{
+ switch (lmv_src->lmv_magic) {
+ case LMV_MAGIC_V1:
+ lmv1_cpu_to_le(&lmv_dst->lmv_md_v1, &lmv_src->lmv_md_v1);
+ break;
+ default:
+ break;
+ }
+}
+
+static inline void lmv_le_to_cpu(union lmv_mds_md *lmv_dst,
+ const union lmv_mds_md *lmv_src)
+{
+ switch (le32_to_cpu(lmv_src->lmv_magic)) {
+ case LMV_MAGIC_V1:
+ lmv1_le_to_cpu(&lmv_dst->lmv_md_v1, &lmv_src->lmv_md_v1);
+ break;
+ default:
+ break;
+ }
+}
#endif
if (index >= UPDATE_BUF_COUNT)
return NULL;
- ptr = (char *)update + cfs_size_round(offsetof(struct update,
- u_bufs[0]));
- for (i = 0; i < index; i++) {
- LASSERT(update->u_lens[i] > 0);
- ptr += cfs_size_round(update->u_lens[i]);
+ if (unlikely(update->u_lens[index] == 0)) {
+ ptr = NULL;
+ } else {
+ ptr = (char *)update +
+ cfs_size_round(offsetof(struct update, u_bufs[0]));
+ for (i = 0; i < index; i++)
+ ptr += cfs_size_round(update->u_lens[i]);
}
if (size != NULL)
struct lu_fid ma_pfid;
struct md_hsm ma_hsm;
struct lov_mds_md *ma_lmm;
- struct lmv_stripe_md *ma_lmv;
+ union lmv_mds_md *ma_lmv;
void *ma_acl;
struct llog_cookie *ma_cookie;
struct lustre_capa *ma_capa;
};
/* lmv structures */
-#define MEA_MAGIC_LAST_CHAR 0xb2221ca1
-#define MEA_MAGIC_ALL_CHARS 0xb222a11c
-#define MEA_MAGIC_HASH_SEGMENT 0xb222a11b
-
-#define MAX_HASH_SIZE_32 0x7fffffffUL
-#define MAX_HASH_SIZE 0x7fffffffffffffffULL
-#define MAX_HASH_HIGHEST_BIT 0x1000000000000000ULL
-
struct lustre_md {
struct mdt_body *body;
struct lov_stripe_md *lsm;
};
struct lookup_intent;
+struct cl_attr;
struct md_ops {
/* Every operation from MD_STATS_FIRST_OP up to and including
int (*m_free_lustre_md)(struct obd_export *, struct lustre_md *);
+ int (*m_merge_attr)(struct obd_export *,
+ const struct lmv_stripe_md *lsm,
+ struct cl_attr *attr);
+
+ int (*m_update_lsm_md)(struct obd_export *, struct lmv_stripe_md *lsm,
+ struct mdt_body *, ldlm_blocking_callback);
+
int (*m_set_open_replay_data)(struct obd_export *,
struct obd_client_handle *,
struct lookup_intent *);
RETURN(MDP(exp->exp_obd, free_lustre_md)(exp, md));
}
+static inline int md_update_lsm_md(struct obd_export *exp,
+ struct lmv_stripe_md *lsm,
+ struct mdt_body *body,
+ ldlm_blocking_callback cb)
+{
+ ENTRY;
+ EXP_CHECK_MD_OP(exp, update_lsm_md);
+ EXP_MD_COUNTER_INCREMENT(exp, update_lsm_md);
+ RETURN(MDP(exp->exp_obd, update_lsm_md)(exp, lsm, body, cb));
+}
+
+static inline int md_merge_attr(struct obd_export *exp,
+ const struct lmv_stripe_md *lsm,
+ struct cl_attr *attr)
+{
+ ENTRY;
+ EXP_CHECK_MD_OP(exp, merge_attr);
+ EXP_MD_COUNTER_INCREMENT(exp, merge_attr);
+ RETURN(MDP(exp->exp_obd, merge_attr)(exp, lsm, attr));
+}
+
static inline int md_setxattr(struct obd_export *exp,
const struct lu_fid *fid, struct obd_capa *oc,
obd_valid valid, const char *name,
void class_init_uuidlist(void);
void class_exit_uuidlist(void);
-/* mea.c */
-int mea_name2idx(struct lmv_stripe_md *mea, const char *name, int namelen);
-int raw_name2idx(int hashtype, int count, const char *name, int namelen);
-
/* prng.c */
#define ll_generate_random_uuid(uuid_out) cfs_get_random_bytes(uuid_out, sizeof(class_uuid_t))
return rc;
}
-int ll_dir_setdirstripe(struct inode *dir, struct lmv_user_md *lump,
- char *filename)
+static int ll_dir_setdirstripe(struct inode *dir, struct lmv_user_md *lump,
+ const char *filename)
{
struct ptlrpc_request *request = NULL;
struct md_op_data *op_data;
struct ll_sb_info *sbi = ll_i2sbi(dir);
int mode;
int err;
-
ENTRY;
+ if (unlikely(lump->lum_magic != LMV_USER_MAGIC))
+ RETURN(-EINVAL);
+
+ if (lump->lum_stripe_offset == (__u32)-1) {
+ int mdtidx;
+
+ mdtidx = ll_get_mdt_idx(dir);
+ if (mdtidx < 0)
+ RETURN(mdtidx);
+
+ lump->lum_stripe_offset = mdtidx;
+ }
+
+ CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p) name %s"
+ "stripe_offset %d, stripe_count: %u\n",
+ PFID(ll_inode2fid(dir)), dir, filename,
+ (int)lump->lum_stripe_offset, lump->lum_stripe_count);
+
+ if (lump->lum_magic != cpu_to_le32(LMV_USER_MAGIC))
+ lustre_swab_lmv_user_md(lump);
+
mode = (0755 & (S_IRWXUGO|S_ISVTX) & ~current->fs->umask) | S_IFDIR;
op_data = ll_prep_md_op_data(NULL, dir, NULL, filename,
strlen(filename), mode, LUSTRE_OPC_MKDIR,
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
- if (lump != NULL && lump->lmm_magic == cpu_to_le32(LMV_USER_MAGIC))
- op_data->op_cli_flags |= CLI_SET_MEA;
-
/* swabbing is done in lov_setstripe() on server side */
rc = md_setattr(sbi->ll_md_exp, op_data, lump, lum_size,
NULL, 0, &req, NULL);
GOTO(free_lmv, rc = -ENOMEM);
memcpy(tmp, &lum, sizeof(lum));
- tmp->lum_type = LMV_STRIPE_TYPE;
tmp->lum_stripe_count = 1;
mdtindex = ll_get_mdt_idx(inode);
if (mdtindex < 0)
return rc;
}
+static int ll_merge_md_attr(struct inode *inode)
+{
+ struct cl_attr attr = { 0 };
+ int rc;
+
+ LASSERT(ll_i2info(inode)->lli_lsm_md != NULL);
+ rc = md_merge_attr(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md,
+ &attr);
+ if (rc != 0)
+ RETURN(rc);
+
+ ll_i2info(inode)->lli_stripe_dir_size = attr.cat_size;
+ ll_i2info(inode)->lli_stripe_dir_nlink = attr.cat_nlink;
+
+ ll_i2info(inode)->lli_lvb.lvb_atime = attr.cat_atime;
+ ll_i2info(inode)->lli_lvb.lvb_mtime = attr.cat_mtime;
+ ll_i2info(inode)->lli_lvb.lvb_ctime = attr.cat_ctime;
+
+ RETURN(0);
+}
+
int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
__u64 ibits)
{
/* if object isn't regular file, don't validate size */
if (!S_ISREG(inode->i_mode)) {
+ if (S_ISDIR(inode->i_mode) &&
+ ll_i2info(inode)->lli_lsm_md != NULL) {
+ rc = ll_merge_md_attr(inode);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
if (res)
return res;
- stat->dev = inode->i_sb->s_dev;
- if (ll_need_32bit_api(sbi))
- stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
- else
- stat->ino = inode->i_ino;
- stat->mode = inode->i_mode;
- stat->nlink = inode->i_nlink;
- stat->uid = inode->i_uid;
- stat->gid = inode->i_gid;
+ stat->dev = inode->i_sb->s_dev;
+ if (ll_need_32bit_api(sbi))
+ stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
+ else
+ stat->ino = inode->i_ino;
+ stat->mode = inode->i_mode;
+ stat->uid = inode->i_uid;
+ stat->gid = inode->i_gid;
stat->rdev = inode->i_rdev;
- stat->atime = inode->i_atime;
- stat->mtime = inode->i_mtime;
- stat->ctime = inode->i_ctime;
+ stat->atime = inode->i_atime;
+ stat->mtime = inode->i_mtime;
+ stat->ctime = inode->i_ctime;
stat->blksize = 1 << inode->i_blkbits;
+ stat->blocks = inode->i_blocks;
- stat->size = i_size_read(inode);
- stat->blocks = inode->i_blocks;
+ if (S_ISDIR(inode->i_mode) &&
+ ll_i2info(inode)->lli_lsm_md != NULL) {
+ stat->nlink = lli->lli_stripe_dir_nlink;
+ stat->size = lli->lli_stripe_dir_size;
+ } else {
+ stat->nlink = inode->i_nlink;
+ stat->size = i_size_read(inode);
+ }
return 0;
}
/* for struct cl_lock_descr and struct cl_io */
#include <cl_object.h>
#include <lclient.h>
+#include <lustre_lmv.h>
#include <lustre_mdc.h>
#include <linux/lustre_intent.h>
#include <linux/compat.h>
* -- I am the owner of dir statahead. */
pid_t d_opendir_pid;
/* directory stripe information */
- struct lmv_stripe_md *d_lmv_md;
+ struct lmv_stripe_md *d_lsm_md;
+ /* striped directory size */
+ loff_t d_stripe_size;
+ /* striped directory nlink */
+ __u64 d_stripe_nlink;
} d;
#define lli_readdir_mutex u.d.d_readdir_mutex
#define lli_def_acl u.d.d_def_acl
#define lli_sa_lock u.d.d_sa_lock
#define lli_opendir_pid u.d.d_opendir_pid
-#define lli_lmv_md u.d.d_lmv_md
+#define lli_lsm_md u.d.d_lsm_md
+#define lli_stripe_dir_size u.d.d_stripe_size
+#define lli_stripe_dir_nlink u.d.d_stripe_nlink
/* for non-directory */
struct {
struct inode *dir);
struct inode *ll_iget(struct super_block *sb, ino_t hash,
struct lustre_md *lic);
+int ll_test_inode_by_fid(struct inode *inode, void *opaque);
int ll_md_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
void *data, int flag);
#ifndef HAVE_IOP_ATOMIC_OPEN
return inode;
}
+static void ll_dir_clear_lsm_md(struct inode *inode)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+
+ LASSERT(S_ISDIR(inode->i_mode));
+
+ if (lli->lli_lsm_md != NULL) {
+ lmv_free_memmd(lli->lli_lsm_md);
+ lli->lli_lsm_md = NULL;
+ }
+}
+
+static struct inode *ll_iget_anon_dir(struct super_block *sb,
+ const struct lu_fid *fid,
+ struct lustre_md *md)
+{
+ struct ll_sb_info *sbi = ll_s2sbi(sb);
+ struct mdt_body *body = md->body;
+ struct inode *inode;
+ ino_t ino;
+ ENTRY;
+
+ ino = cl_fid_build_ino(fid, sbi->ll_flags & LL_SBI_32BIT_API);
+ inode = iget_locked(sb, ino);
+ if (inode == NULL) {
+ CERROR("%s: failed get simple inode "DFID": rc = -ENOENT\n",
+ ll_get_fsname(sb, NULL, 0), PFID(fid));
+ RETURN(ERR_PTR(-ENOENT));
+ }
+
+ if (inode->i_state & I_NEW) {
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct lmv_stripe_md *lsm = md->lmv;
+
+ inode->i_mode = (inode->i_mode & ~S_IFMT) |
+ (body->mode & S_IFMT);
+ LASSERTF(S_ISDIR(inode->i_mode), "Not slave inode "DFID"\n",
+ PFID(fid));
+
+ LTIME_S(inode->i_mtime) = 0;
+ LTIME_S(inode->i_atime) = 0;
+ LTIME_S(inode->i_ctime) = 0;
+ inode->i_rdev = 0;
+
+ /* initializing backing dev info. */
+ inode->i_mapping->backing_dev_info =
+ &s2lsi(inode->i_sb)->lsi_bdi;
+ inode->i_op = &ll_dir_inode_operations;
+ inode->i_fop = &ll_dir_operations;
+ lli->lli_fid = *fid;
+ ll_lli_init(lli);
+
+ LASSERT(lsm != NULL);
+ /* master stripe FID */
+ lli->lli_pfid = lsm->lsm_md_oinfo[0].lmo_fid;
+ CDEBUG(D_INODE, "lli %p master "DFID" slave "DFID"\n",
+ lli, PFID(fid), PFID(&lli->lli_pfid));
+ unlock_new_inode(inode);
+ }
+
+ RETURN(inode);
+}
+
+static int ll_init_lsm_md(struct inode *inode, struct lustre_md *md)
+{
+ struct lu_fid *fid;
+ struct lmv_stripe_md *lsm = md->lmv;
+ int i;
+
+ LASSERT(lsm != NULL);
+ /* XXX sigh, this lsm_root initialization should be in
+ * LMV layer, but it needs ll_iget right now, so we
+ * put this here right now. */
+ for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
+ fid = &lsm->lsm_md_oinfo[i].lmo_fid;
+ LASSERT(lsm->lsm_md_oinfo[i].lmo_root == NULL);
+ if (i == 0) {
+ lsm->lsm_md_oinfo[i].lmo_root = inode;
+ } else {
+ /* Unfortunately ll_iget will call ll_update_inode,
+ * where the initialization of slave inode is slightly
+ * different, so it reset lsm_md to NULL to avoid
+ * initializing lsm for slave inode. */
+ lsm->lsm_md_oinfo[i].lmo_root =
+ ll_iget_anon_dir(inode->i_sb, fid, md);
+ if (IS_ERR(lsm->lsm_md_oinfo[i].lmo_root)) {
+ int rc = PTR_ERR(lsm->lsm_md_oinfo[i].lmo_root);
+
+ lsm->lsm_md_oinfo[i].lmo_root = NULL;
+ return rc;
+ }
+ }
+ }
+
+ /* Here is where the lsm is being initialized(fill lmo_info) after
+ * client retrieve MD stripe information from MDT. */
+ return md_update_lsm_md(ll_i2mdexp(inode), lsm, md->body,
+ ll_md_blocking_ast);
+}
+
+static inline int lli_lsm_md_eq(const struct lmv_stripe_md *lsm_md1,
+ const struct lmv_stripe_md *lsm_md2)
+{
+ return lsm_md1->lsm_md_magic == lsm_md2->lsm_md_magic &&
+ lsm_md1->lsm_md_stripe_count == lsm_md2->lsm_md_stripe_count &&
+ lsm_md1->lsm_md_master_mdt_index ==
+ lsm_md2->lsm_md_master_mdt_index &&
+ lsm_md1->lsm_md_hash_type == lsm_md2->lsm_md_hash_type &&
+ lsm_md1->lsm_md_layout_version ==
+ lsm_md2->lsm_md_layout_version &&
+ strcmp(lsm_md1->lsm_md_pool_name,
+ lsm_md2->lsm_md_pool_name) == 0;
+}
+
+static void ll_update_lsm_md(struct inode *inode, struct lustre_md *md)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct lmv_stripe_md *lsm = md->lmv;
+ int idx;
+
+ LASSERT(lsm != NULL);
+ LASSERT(S_ISDIR(inode->i_mode));
+ if (lli->lli_lsm_md == NULL) {
+ int rc;
+
+ rc = ll_init_lsm_md(inode, md);
+ if (rc != 0) {
+ CERROR("%s: init "DFID" failed: rc = %d\n",
+ ll_get_fsname(inode->i_sb, NULL, 0),
+ PFID(&lli->lli_fid), rc);
+ return;
+ }
+ lli->lli_lsm_md = lsm;
+ /* set lsm_md to NULL, so the following free lustre_md
+ * will not free this lsm */
+ md->lmv = NULL;
+ return;
+ }
+
+ /* Compare the old and new stripe information */
+ if (!lli_lsm_md_eq(lli->lli_lsm_md, lsm)) {
+ CERROR("inode %p %lu mismatch\n"
+ " new(%p) vs lli_lsm_md(%p):\n"
+ " magic: %x %x\n"
+ " count: %x %x\n"
+ " master: %x %x\n"
+ " hash_type: %x %x\n"
+ " layout: %x %x\n"
+ " pool: %s %s\n",
+ inode, inode->i_ino, lsm, lli->lli_lsm_md,
+ lsm->lsm_md_magic, lli->lli_lsm_md->lsm_md_magic,
+ lsm->lsm_md_stripe_count,
+ lli->lli_lsm_md->lsm_md_stripe_count,
+ lsm->lsm_md_master_mdt_index,
+ lli->lli_lsm_md->lsm_md_master_mdt_index,
+ lsm->lsm_md_hash_type, lli->lli_lsm_md->lsm_md_hash_type,
+ lsm->lsm_md_layout_version,
+ lli->lli_lsm_md->lsm_md_layout_version,
+ lsm->lsm_md_pool_name,
+ lli->lli_lsm_md->lsm_md_pool_name);
+ return;
+ }
+
+ for (idx = 0; idx < lli->lli_lsm_md->lsm_md_stripe_count; idx++) {
+ if (!lu_fid_eq(&lli->lli_lsm_md->lsm_md_oinfo[idx].lmo_fid,
+ &lsm->lsm_md_oinfo[idx].lmo_fid)) {
+ CERROR("%s: FID in lsm mismatch idx %d, old: "DFID
+ "new:"DFID"\n",
+ ll_get_fsname(inode->i_sb, NULL, 0), idx,
+ PFID(&lli->lli_lsm_md->lsm_md_oinfo[idx].lmo_fid),
+ PFID(&lsm->lsm_md_oinfo[idx].lmo_fid));
+ return;
+ }
+ }
+
+ md_update_lsm_md(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md,
+ md->body, ll_md_blocking_ast);
+}
+
void ll_clear_inode(struct inode *inode)
{
struct ll_inode_info *lli = ll_i2info(inode);
#endif
lli->lli_inode_magic = LLI_INODE_DEAD;
- ll_clear_inode_capas(inode);
- if (!S_ISDIR(inode->i_mode))
- LASSERT(cfs_list_empty(&lli->lli_agl_list));
+ ll_clear_inode_capas(inode);
+ if (S_ISDIR(inode->i_mode))
+ ll_dir_clear_lsm_md(inode);
+ else
+ LASSERT(list_empty(&lli->lli_agl_list));
- /*
- * XXX This has to be done before lsm is freed below, because
- * cl_object still uses inode lsm.
- */
- cl_inode_fini(inode);
+ /*
+ * XXX This has to be done before lsm is freed below, because
+ * cl_object still uses inode lsm.
+ */
+ cl_inode_fini(inode);
lli->lli_has_smd = false;
EXIT;
lli->lli_maxbytes = MAX_LFS_FILESIZE;
}
- if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
+ if (S_ISDIR(inode->i_mode) && md->lmv != NULL)
+ ll_update_lsm_md(inode, md);
+
+ if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
if (body->valid & OBD_MD_FLRMTPERM)
ll_update_remote_perm(inode, md->remote_perm);
}
#endif
inode->i_ino = cl_fid_build_ino(&body->fid1,
sbi->ll_flags & LL_SBI_32BIT_API);
- inode->i_generation = cl_fid_build_gen(&body->fid1);
+ inode->i_generation = cl_fid_build_gen(&body->fid1);
if (body->valid & OBD_MD_FLATIME) {
if (body->atime > LTIME_S(inode->i_atime))
if (body->valid & OBD_MD_FLRDEV)
inode->i_rdev = old_decode_dev(body->rdev);
- if (body->valid & OBD_MD_FLID) {
- /* FID shouldn't be changed! */
- if (fid_is_sane(&lli->lli_fid)) {
- LASSERTF(lu_fid_eq(&lli->lli_fid, &body->fid1),
- "Trying to change FID "DFID
+ if (body->valid & OBD_MD_FLID) {
+ /* FID shouldn't be changed! */
+ if (fid_is_sane(&lli->lli_fid)) {
+ LASSERTF(lu_fid_eq(&lli->lli_fid, &body->fid1),
+ "Trying to change FID "DFID
" to the "DFID", inode "DFID"(%p)\n",
PFID(&lli->lli_fid), PFID(&body->fid1),
PFID(ll_inode2fid(inode)), inode);
- } else
- lli->lli_fid = body->fid1;
- }
+ } else {
+ lli->lli_fid = body->fid1;
+ }
+ }
LASSERT(fid_seq(&lli->lli_fid) != 0);
struct super_block *sb, struct lookup_intent *it)
{
struct ll_sb_info *sbi = NULL;
- struct lustre_md md;
- int rc;
- ENTRY;
+ struct lustre_md md = { 0 };
+ int rc;
+ ENTRY;
LASSERT(*inode || sb);
sbi = sb ? ll_s2sbi(sb) : ll_i2sbi(*inode);
op_data->op_fid1 = *ll_inode2fid(i1);
op_data->op_capa1 = ll_mdscapa_get(i1);
if (S_ISDIR(i1->i_mode))
- op_data->op_mea1 = ll_i2info(i1)->lli_lmv_md;
+ op_data->op_mea1 = ll_i2info(i1)->lli_lsm_md;
if (i2) {
op_data->op_fid2 = *ll_inode2fid(i2);
op_data->op_capa2 = ll_mdscapa_get(i2);
if (S_ISDIR(i2->i_mode))
- op_data->op_mea2 = ll_i2info(i2)->lli_lmv_md;
+ op_data->op_mea2 = ll_i2info(i2)->lli_lsm_md;
} else {
fid_zero(&op_data->op_fid2);
op_data->op_capa2 = NULL;
return (key0 << 1);
}
-static int ll_nfs_test_inode(struct inode *inode, void *opaque)
-{
- return lu_fid_eq(&ll_i2info(inode)->lli_fid,
- (struct lu_fid *)opaque);
-}
-
struct inode *search_inode_for_lustre(struct super_block *sb,
const struct lu_fid *fid)
{
CDEBUG(D_INFO, "searching inode for:(%lu,"DFID")\n", hash, PFID(fid));
- inode = ilookup5(sb, hash, ll_nfs_test_inode, (void *)fid);
- if (inode)
- RETURN(inode);
+ inode = ilookup5(sb, hash, ll_test_inode_by_fid, (void *)fid);
+ if (inode)
+ RETURN(inode);
rc = ll_get_max_mdsize(sbi, &eadatalen);
if (rc)
ll_unlock_dcache(dir);
}
+int ll_test_inode_by_fid(struct inode *inode, void *opaque)
+{
+ return lu_fid_eq(&ll_i2info(inode)->lli_fid, opaque);
+}
+
int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
void *data, int flag)
{
}
if ((bits & MDS_INODELOCK_UPDATE) && S_ISDIR(inode->i_mode)) {
- CDEBUG(D_INODE, "invalidating inode "DFID"\n",
- PFID(ll_inode2fid(inode)));
+ struct ll_inode_info *lli = ll_i2info(inode);
+
+ CDEBUG(D_INODE, "invalidating inode "DFID" lli = %p, "
+ "pfid = "DFID"\n", PFID(ll_inode2fid(inode)),
+ lli, PFID(&lli->lli_pfid));
truncate_inode_pages(inode->i_mapping, 0);
- ll_invalidate_negative_children(inode);
+
+ if (unlikely(!fid_is_zero(&lli->lli_pfid))) {
+ struct inode *master_inode = NULL;
+ unsigned long hash;
+
+ /* This is slave inode, since all of the child
+ * dentry is connected on the master inode, so
+ * we have to invalidate the negative children
+ * on master inode */
+ CDEBUG(D_INODE, "Invalidate s"DFID" m"DFID"\n",
+ PFID(ll_inode2fid(inode)),
+ PFID(&lli->lli_pfid));
+
+ hash = cl_fid_build_ino(&lli->lli_pfid,
+ ll_need_32bit_api(ll_i2sbi(inode)));
+
+ master_inode = ilookup5(inode->i_sb, hash,
+ ll_test_inode_by_fid,
+ (void *)&lli->lli_pfid);
+ if (master_inode != NULL &&
+ !IS_ERR(master_inode)) {
+ ll_invalidate_negative_children(
+ master_inode);
+ iput(master_inode);
+ }
+ } else {
+ ll_invalidate_negative_children(inode);
+ }
}
if ((bits & (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM)) &&
return rc;
}
+#ifdef __KERNEL__
+int lmv_revalidate_slaves(struct obd_export *exp, struct mdt_body *mbody,
+ struct lmv_stripe_md *lsm,
+ ldlm_blocking_callback cb_blocking,
+ int extra_lock_flags)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct mdt_body *body;
+ struct md_op_data *op_data;
+ unsigned long size = 0;
+ unsigned long nlink = 0;
+ obd_time atime = 0;
+ obd_time ctime = 0;
+ obd_time mtime = 0;
+ int i;
+ int rc = 0;
+
+ ENTRY;
+
+ /**
+ * revalidate slaves has some problems, temporarily return,
+ * we may not need that
+ */
+ if (lsm->lsm_md_stripe_count <= 1)
+ RETURN(0);
+
+ OBD_ALLOC_PTR(op_data);
+ if (op_data == NULL)
+ RETURN(-ENOMEM);
+
+ /**
+ * Loop over the stripe information, check validity and update them
+ * from MDS if needed.
+ */
+ for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
+ struct lu_fid fid;
+ struct lookup_intent it = { .it_op = IT_GETATTR };
+ struct ptlrpc_request *req = NULL;
+ struct lustre_handle *lockh = NULL;
+ struct lmv_tgt_desc *tgt = NULL;
+ struct inode *inode;
+
+ fid = lsm->lsm_md_oinfo[i].lmo_fid;
+ inode = lsm->lsm_md_oinfo[i].lmo_root;
+ if (i == 0) {
+ if (mbody != NULL) {
+ body = mbody;
+ goto update;
+ } else {
+ goto release_lock;
+ }
+ }
+
+ /*
+ * Prepare op_data for revalidating. Note that @fid2 shluld be
+ * defined otherwise it will go to server and take new lock
+ * which is not needed here.
+ */
+ memset(op_data, 0, sizeof(*op_data));
+ op_data->op_fid1 = fid;
+ op_data->op_fid2 = fid;
+
+ tgt = lmv_locate_mds(lmv, op_data, &fid);
+ if (IS_ERR(tgt))
+ GOTO(cleanup, rc = PTR_ERR(tgt));
+
+ CDEBUG(D_INODE, "Revalidate slave "DFID" -> mds #%d\n",
+ PFID(&fid), tgt->ltd_idx);
+
+ rc = md_intent_lock(tgt->ltd_exp, op_data, NULL, 0, &it, 0,
+ &req, cb_blocking, extra_lock_flags);
+ if (rc < 0)
+ GOTO(cleanup, rc);
+
+ lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
+ if (rc > 0 && req == NULL) {
+ /* slave inode is still valid */
+ CDEBUG(D_INODE, "slave "DFID" is still valid.\n",
+ PFID(&fid));
+ rc = 0;
+ } else {
+ /* refresh slave from server */
+ body = req_capsule_server_get(&req->rq_pill,
+ &RMF_MDT_BODY);
+ LASSERT(body != NULL);
+update:
+ if (unlikely(body->nlink < 2)) {
+ CERROR("%s: nlink %d < 2 corrupt stripe %d "DFID
+ ":" DFID"\n", obd->obd_name, body->nlink,
+ i, PFID(&lsm->lsm_md_oinfo[i].lmo_fid),
+ PFID(&lsm->lsm_md_oinfo[0].lmo_fid));
+
+ if (req != NULL)
+ ptlrpc_req_finished(req);
+
+ GOTO(cleanup, rc = -EIO);
+ }
+
+ if (i != 0)
+ md_set_lock_data(tgt->ltd_exp, &lockh->cookie,
+ inode, NULL);
+
+ i_size_write(inode, body->size);
+ set_nlink(inode, body->nlink);
+ LTIME_S(inode->i_atime) = body->atime;
+ LTIME_S(inode->i_ctime) = body->ctime;
+ LTIME_S(inode->i_mtime) = body->mtime;
+
+ if (req != NULL)
+ ptlrpc_req_finished(req);
+ }
+release_lock:
+ size += i_size_read(inode);
+
+ if (i != 0)
+ nlink += inode->i_nlink - 2;
+ else
+ nlink += inode->i_nlink;
+
+ atime = LTIME_S(inode->i_atime) > atime ?
+ LTIME_S(inode->i_atime) : atime;
+ ctime = LTIME_S(inode->i_ctime) > ctime ?
+ LTIME_S(inode->i_ctime) : ctime;
+ mtime = LTIME_S(inode->i_mtime) > mtime ?
+ LTIME_S(inode->i_mtime) : mtime;
+
+ if (it.d.lustre.it_lock_mode != 0 && lockh != NULL) {
+ ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
+ it.d.lustre.it_lock_mode = 0;
+ }
+
+ CDEBUG(D_INODE, "i %d "DFID" size %llu, nlink %u, atime "
+ "%lu, mtime %lu, ctime %lu.\n", i, PFID(&fid),
+ i_size_read(inode), inode->i_nlink,
+ LTIME_S(inode->i_atime), LTIME_S(inode->i_mtime),
+ LTIME_S(inode->i_ctime));
+ }
+
+ /*
+ * update attr of master request.
+ */
+ CDEBUG(D_INODE, "Return refreshed attrs: size = %lu nlink %lu atime "
+ LPU64 "ctime "LPU64" mtime "LPU64" for " DFID"\n", size, nlink,
+ atime, ctime, mtime, PFID(&lsm->lsm_md_oinfo[0].lmo_fid));
+
+ if (mbody != NULL) {
+ mbody->atime = atime;
+ mbody->ctime = ctime;
+ mbody->mtime = mtime;
+ }
+cleanup:
+ OBD_FREE_PTR(op_data);
+ RETURN(rc);
+}
+
+#else
+
+int lmv_revalidate_slaves(struct obd_export *exp, struct mdt_body *mbody,
+ struct lmv_stripe_md *lsm,
+ ldlm_blocking_callback cb_blocking,
+ int extra_lock_flags)
+{
+ return 0;
+}
+
+#endif
+
/*
* IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
* may be split dir.
/* Note: client might open with some random flags(sanity 33b), so we can
* not make sure op_fid2 is being initialized with BY_FID flag */
- if (it->it_flags & MDS_OPEN_BY_FID && fid_is_sane(&op_data->op_fid2))
- tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
- else
+ if (it->it_flags & MDS_OPEN_BY_FID && fid_is_sane(&op_data->op_fid2)) {
+ if (op_data->op_mea1 != NULL) {
+ struct lmv_stripe_md *lsm = op_data->op_mea1;
+ const struct lmv_oinfo *oinfo;
+
+ oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
+ op_data->op_namelen);
+ op_data->op_fid1 = oinfo->lmo_fid;
+ }
+
+ tgt = lmv_find_target(lmv, &op_data->op_fid2);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ op_data->op_mds = tgt->ltd_idx;
+ } else {
tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
-
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+ }
/* If it is ready to open the file by FID, do not need
* allocate FID at all, otherwise it will confuse MDT */
body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
if (body == NULL)
RETURN(-EPROTO);
- /*
- * Not cross-ref case, just get out of here.
- */
- if (likely(!(body->valid & OBD_MD_MDS)))
- RETURN(0);
- /*
- * Okay, MDS has returned success. Probably name has been resolved in
- * remote inode.
- */
- rc = lmv_intent_remote(exp, lmm, lmmsize, it, &op_data->op_fid1, flags,
- reqp, cb_blocking, extra_lock_flags);
- if (rc != 0) {
- LASSERT(rc < 0);
- /*
- * This is possible, that some userspace application will try to
- * open file as directory and we will have -ENOTDIR here. As
- * this is normal situation, we should not print error here,
- * only debug info.
- */
- CDEBUG(D_INODE, "Can't handle remote %s: dir "DFID"("DFID"):"
- "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->op_fid2),
- PFID(&op_data->op_fid1), op_data->op_namelen,
- op_data->op_name, rc);
- RETURN(rc);
+ /* Not cross-ref case, just get out of here. */
+ if (unlikely((body->valid & OBD_MD_MDS))) {
+ rc = lmv_intent_remote(exp, lmm, lmmsize, it, &op_data->op_fid1,
+ flags, reqp, cb_blocking,
+ extra_lock_flags);
+ if (rc != 0)
+ RETURN(rc);
+
+ body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
+ if (body == NULL)
+ RETURN(-EPROTO);
}
RETURN(rc);
rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
flags, reqp, cb_blocking, extra_lock_flags);
- if (rc < 0 || *reqp == NULL)
+ if (rc < 0)
RETURN(rc);
+ if (*reqp == NULL) {
+ /* If RPC happens, lsm information will be revalidated
+ * during update_inode process (see ll_update_lsm_md) */
+ if (op_data->op_mea2 != NULL) {
+ rc = lmv_revalidate_slaves(exp, NULL, op_data->op_mea2,
+ cb_blocking, extra_lock_flags);
+ if (rc != 0)
+ RETURN(rc);
+ }
+ RETURN(rc);
+ }
+
/*
* MDS has returned success. Probably name has been resolved in
* remote inode. Let's check this.
body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
if (body == NULL)
RETURN(-EPROTO);
- /* Not cross-ref case, just get out of here. */
- if (likely(!(body->valid & OBD_MD_MDS)))
- RETURN(0);
- rc = lmv_intent_remote(exp, lmm, lmmsize, it, NULL, flags, reqp,
- cb_blocking, extra_lock_flags);
+ /* Not cross-ref case, just get out of here. */
+ if (unlikely((body->valid & OBD_MD_MDS))) {
+ rc = lmv_intent_remote(exp, lmm, lmmsize, it, NULL, flags,
+ reqp, cb_blocking, extra_lock_flags);
+ if (rc != 0)
+ RETURN(rc);
+ body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
+ if (body == NULL)
+ RETURN(-EPROTO);
+ }
RETURN(rc);
}
int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
struct md_op_data *op_data);
+int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp,
+ const union lmv_mds_md *lmm, int stripe_count);
+
+int lmv_revalidate_slaves(struct obd_export *exp, struct mdt_body *mbody,
+ struct lmv_stripe_md *lsm,
+ ldlm_blocking_callback cb_blocking,
+ int extra_lock_flags);
+
static inline struct lmv_tgt_desc *
lmv_get_target(struct lmv_obd *lmv, mdsno_t mds)
{
return lmv_get_target(lmv, mds);
}
-static inline unsigned int
-mea_last_char_hash(unsigned int count, const char *name, int namelen)
+static inline int lmv_stripe_md_size(int stripe_count)
{
- unsigned int c;
-
- c = name[namelen - 1];
- if (c == 0)
- CWARN("invalid name %.*s\n", namelen, name);
-
- c = c % count;
+ struct lmv_stripe_md *lsm;
- return c;
+ return sizeof(*lsm) + stripe_count * sizeof(lsm->lsm_md_oinfo[0]);
}
-static inline unsigned int
-mea_all_chars_hash(unsigned int count, const char *name, int namelen)
-{
- unsigned int c = 0;
-
- while (--namelen >= 0)
- c += name[namelen];
-
- c = c % count;
-
- return c;
-}
+int lmv_name_to_stripe_index(enum lmv_hash_type hashtype,
+ unsigned int max_mdt_index,
+ const char *name, int namelen);
-static inline int lmv_stripe_md_size(int stripe_count)
+static inline const struct lmv_oinfo *
+lsm_name_to_stripe_info(const struct lmv_stripe_md *lsm, const char *name,
+ int namelen)
{
- struct lmv_stripe_md *lsm;
+ int stripe_index;
- return sizeof(*lsm) + stripe_count * sizeof(lsm->lsm_md_oinfo[0]);
+ stripe_index = lmv_name_to_stripe_index(lsm->lsm_md_hash_type,
+ lsm->lsm_md_stripe_count,
+ name, namelen);
+ LASSERT(stripe_index < lsm->lsm_md_stripe_count);
+
+ return &lsm->lsm_md_oinfo[stripe_index];
}
-int raw_name2idx(int hashtype, int count, const char *name, int namelen);
struct lmv_tgt_desc
*lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
#include <lustre_fid.h>
#include "lmv_internal.h"
-int raw_name2idx(int hashtype, int count, const char *name, int namelen)
+/* This hash is only for testing purpose */
+static inline unsigned int
+lmv_hash_all_chars(unsigned int count, const char *name, int namelen)
{
- unsigned int c = 0;
- int idx;
+ unsigned int c = 0;
+ const unsigned char *p = (const unsigned char *)name;
- LASSERT(namelen > 0);
+ while (--namelen >= 0)
+ c += p[namelen];
- if (filename_is_volatile(name, namelen, &idx)) {
- if (idx >= 0 && idx < count)
- return idx;
- goto choose_hash;
- }
+ c = c % count;
+
+ return c;
+}
+
+static inline unsigned int
+lmv_hash_fnv1a(unsigned int count, const char *name, int namelen)
+{
+ __u64 hash;
+
+ hash = lustre_hash_fnv_1a_64(name, namelen);
+
+ hash = hash % count;
+
+ return hash;
+}
- if (count <= 1)
+int lmv_name_to_stripe_index(enum lmv_hash_type hashtype,
+ unsigned int max_mdt_index,
+ const char *name, int namelen)
+{
+ int idx;
+
+ LASSERT(namelen > 0);
+ if (max_mdt_index <= 1)
return 0;
-choose_hash:
switch (hashtype) {
- case MEA_MAGIC_LAST_CHAR:
- c = mea_last_char_hash(count, name, namelen);
+ case LMV_HASH_TYPE_ALL_CHARS:
+ idx = lmv_hash_all_chars(max_mdt_index, name, namelen);
break;
- case MEA_MAGIC_ALL_CHARS:
- c = mea_all_chars_hash(count, name, namelen);
- break;
- case MEA_MAGIC_HASH_SEGMENT:
- CERROR("Unsupported hash type MEA_MAGIC_HASH_SEGMENT\n");
+ case LMV_HASH_TYPE_FNV_1A_64:
+ idx = lmv_hash_fnv1a(max_mdt_index, name, namelen);
break;
default:
CERROR("Unknown hash type 0x%x\n", hashtype);
+ return -EINVAL;
}
- LASSERT(c < count);
- return c;
+ LASSERT(idx < max_mdt_index);
+ return idx;
}
static void lmv_activate_target(struct lmv_obd *lmv,
* If stripe_offset is provided during setdirstripe
* (setdirstripe -i xx), xx MDS will be choosen.
*/
- if (op_data->op_cli_flags & CLI_SET_MEA) {
+ if (op_data->op_cli_flags & CLI_SET_MEA && op_data->op_data != NULL) {
struct lmv_user_md *lum;
- lum = (struct lmv_user_md *)op_data->op_data;
- if (lum->lum_type == LMV_STRIPE_TYPE &&
- lum->lum_stripe_offset != -1) {
- if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) {
- CERROR("%s: Stripe_offset %d > MDT count %d:"
- " rc = %d\n", obd->obd_name,
- lum->lum_stripe_offset,
- lmv->desc.ld_tgt_count, -ERANGE);
- RETURN(-ERANGE);
- }
- *mds = lum->lum_stripe_offset;
- RETURN(0);
- }
+ lum = op_data->op_data;
+ *mds = lum->lum_stripe_offset;
+ } else {
+ /* Allocate new fid on target according to operation type and
+ * parent home mds. */
+ *mds = op_data->op_mds;
}
- /* Allocate new fid on target according to operation type and parent
- * home mds. */
- *mds = op_data->op_mds;
RETURN(0);
}
RETURN(rc);
}
+/**
+ * Choosing the MDT by name or FID in @op_data.
+ * For non-striped directory, it will locate MDT by fid.
+ * For striped-directory, it will locate MDT by name. And also
+ * it will reset op_fid1 with the FID of the choosen stripe.
+ **/
struct lmv_tgt_desc
*lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
struct lu_fid *fid)
{
- struct lmv_tgt_desc *tgt;
+ struct lmv_stripe_md *lsm = op_data->op_mea1;
+ struct lmv_tgt_desc *tgt;
+ const struct lmv_oinfo *oinfo;
- tgt = lmv_find_target(lmv, fid);
- if (IS_ERR(tgt))
+ if (lsm == NULL || lsm->lsm_md_stripe_count <= 1 ||
+ op_data->op_namelen == 0) {
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ return tgt;
+
+ op_data->op_mds = tgt->ltd_idx;
return tgt;
+ }
- op_data->op_mds = tgt->ltd_idx;
+ oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
+ op_data->op_namelen);
+ *fid = oinfo->lmo_fid;
+ op_data->op_mds = oinfo->lmo_mds;
+ tgt = lmv_get_target(lmv, op_data->op_mds);
+
+ CDEBUG(D_INFO, "locate on mds %u\n", op_data->op_mds);
return tgt;
}
if (IS_ERR(tgt))
RETURN(PTR_ERR(tgt));
+ CDEBUG(D_INODE, "CREATE name '%.*s' on "DFID" -> mds #%x\n",
+ op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
+ op_data->op_mds);
+
rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
if (rc)
RETURN(rc);
- CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
- op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
- op_data->op_mds);
+ /* Send the create request to the MDT where the object
+ * will be located */
+ tgt = lmv_find_target(lmv, &op_data->op_fid2);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ op_data->op_mds = tgt->ltd_idx;
+
+ CDEBUG(D_INODE, "CREATE obj "DFID" -> mds #%x\n",
+ PFID(&op_data->op_fid2), op_data->op_mds);
op_data->op_flags |= MF_MDC_CANCEL_FID1;
rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
cap_effective, rdev, request);
-
if (rc == 0) {
if (*request == NULL)
RETURN(rc);
op_data->op_fsuid = current_fsuid();
op_data->op_fsgid = current_fsgid();
op_data->op_cap = cfs_curproc_cap_pack();
+ if (op_data->op_mea2 != NULL) {
+ struct lmv_stripe_md *lsm = op_data->op_mea2;
+ const struct lmv_oinfo *oinfo;
+
+ oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
+ op_data->op_namelen);
+ op_data->op_fid2 = oinfo->lmo_fid;
+ }
+
tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
if (IS_ERR(tgt))
RETURN(PTR_ERR(tgt));
const char *old, int oldlen, const char *new, int newlen,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *src_tgt;
- struct lmv_tgt_desc *tgt_tgt;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *src_tgt;
int rc;
ENTRY;
- LASSERT(oldlen != 0);
+ LASSERT(oldlen != 0);
- CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
- oldlen, old, PFID(&op_data->op_fid1),
- newlen, new, PFID(&op_data->op_fid2));
+ CDEBUG(D_INODE, "RENAME %.*s in "DFID":%d to %.*s in "DFID":%d\n",
+ oldlen, old, PFID(&op_data->op_fid1),
+ op_data->op_mea1 ? op_data->op_mea1->lsm_md_stripe_count : 0,
+ newlen, new, PFID(&op_data->op_fid2),
+ op_data->op_mea2 ? op_data->op_mea2->lsm_md_stripe_count : 0);
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
op_data->op_fsuid = current_fsuid();
op_data->op_fsgid = current_fsgid();
op_data->op_cap = cfs_curproc_cap_pack();
- src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
- if (IS_ERR(src_tgt))
- RETURN(PTR_ERR(src_tgt));
- tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
- if (IS_ERR(tgt_tgt))
- RETURN(PTR_ERR(tgt_tgt));
+ if (op_data->op_mea1 != NULL) {
+ struct lmv_stripe_md *lsm = op_data->op_mea1;
+ const struct lmv_oinfo *oinfo;
+
+ oinfo = lsm_name_to_stripe_info(lsm, old, oldlen);
+ op_data->op_fid1 = oinfo->lmo_fid;
+ op_data->op_mds = oinfo->lmo_mds;
+ src_tgt = lmv_get_target(lmv, op_data->op_mds);
+ if (IS_ERR(src_tgt))
+ RETURN(PTR_ERR(src_tgt));
+ } else {
+ src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(src_tgt))
+ RETURN(PTR_ERR(src_tgt));
+
+ op_data->op_mds = src_tgt->ltd_idx;
+ }
+
+ if (op_data->op_mea2) {
+ struct lmv_stripe_md *lsm = op_data->op_mea2;
+ const struct lmv_oinfo *oinfo;
+
+ oinfo = lsm_name_to_stripe_info(lsm, new, newlen);
+ op_data->op_fid2 = oinfo->lmo_fid;
+ }
+
/*
* LOOKUP lock on src child (fid3) should also be cancelled for
* src_tgt in mdc_rename.
if (rc)
RETURN(rc);
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
if (IS_ERR(tgt))
RETURN(PTR_ERR(tgt));
RETURN(rc);
}
+int lmv_update_lsm_md(struct obd_export *exp, struct lmv_stripe_md *lsm,
+ struct mdt_body *body, ldlm_blocking_callback cb_blocking)
+{
+ if (lsm->lsm_md_stripe_count <= 1)
+ return 0;
+
+ return lmv_revalidate_slaves(exp, body, lsm, cb_blocking, 0);
+}
+
+int lmv_merge_attr(struct obd_export *exp, const struct lmv_stripe_md *lsm,
+ struct cl_attr *attr)
+{
+#ifdef __KERNEL__
+ int i;
+
+ for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
+ struct inode *inode = lsm->lsm_md_oinfo[i].lmo_root;
+
+ CDEBUG(D_INFO, ""DFID" size %llu, nlink %u, atime %lu ctime"
+ "%lu, mtime %lu.\n", PFID(&lsm->lsm_md_oinfo[i].lmo_fid),
+ i_size_read(inode), inode->i_nlink,
+ LTIME_S(inode->i_atime), LTIME_S(inode->i_ctime),
+ LTIME_S(inode->i_mtime));
+
+ /* for slave stripe, it needs to subtract nlink for . and .. */
+ if (i != 0)
+ attr->cat_nlink += inode->i_nlink - 2;
+ else
+ attr->cat_nlink = inode->i_nlink;
+
+ attr->cat_size += i_size_read(inode);
+
+ if (attr->cat_atime < LTIME_S(inode->i_atime))
+ attr->cat_atime = LTIME_S(inode->i_atime);
+
+ if (attr->cat_ctime < LTIME_S(inode->i_ctime))
+ attr->cat_ctime = LTIME_S(inode->i_ctime);
+
+ if (attr->cat_mtime < LTIME_S(inode->i_mtime))
+ attr->cat_mtime = LTIME_S(inode->i_mtime);
+ }
+#endif
+ return 0;
+}
+
struct obd_ops lmv_obd_ops = {
.o_owner = THIS_MODULE,
.o_setup = lmv_setup,
.m_cancel_unused = lmv_cancel_unused,
.m_set_lock_data = lmv_set_lock_data,
.m_lock_match = lmv_lock_match,
- .m_get_lustre_md = lmv_get_lustre_md,
- .m_free_lustre_md = lmv_free_lustre_md,
+ .m_get_lustre_md = lmv_get_lustre_md,
+ .m_free_lustre_md = lmv_free_lustre_md,
+ .m_update_lsm_md = lmv_update_lsm_md,
+ .m_merge_attr = lmv_merge_attr,
.m_set_open_replay_data = lmv_set_open_replay_data,
.m_clear_open_replay_data = lmv_clear_open_replay_data,
.m_renew_capa = lmv_renew_capa,
if (IS_ERR(th))
return th;
- CFS_INIT_LIST_HEAD(&th->th_remote_update_list);
return th;
}
-static int lod_remote_sync(const struct lu_env *env, struct dt_device *dev,
- struct thandle *th)
-{
- struct update_request *update;
- int rc = 0;
- ENTRY;
-
- if (cfs_list_empty(&th->th_remote_update_list))
- RETURN(0);
-
- cfs_list_for_each_entry(update, &th->th_remote_update_list,
- ur_list) {
- /* In DNE phase I, there should be only one OSP
- * here, so we will do send/receive one by one,
- * instead of sending them parallel, will fix this
- * in Phase II */
- th->th_current_request = update;
- rc = dt_trans_start(env, update->ur_dt, th);
- if (rc != 0) {
- /* FIXME how to revert the partial results
- * once error happened? Resolved by 2 Phase commit */
- update->ur_rc = rc;
- break;
- }
- }
-
- RETURN(rc);
-}
-
static int lod_trans_start(const struct lu_env *env, struct dt_device *dev,
struct thandle *th)
{
struct lod_device *lod = dt2lod_dev((struct dt_device *) dev);
- int rc;
+ int rc = 0;
- rc = lod_remote_sync(env, dev, th);
- if (rc)
- return rc;
+ if (unlikely(th->th_update != NULL)) {
+ struct thandle_update *tu = th->th_update;
+ struct update_request *update;
+ list_for_each_entry(update, &tu->tu_remote_update_list,
+ ur_list) {
+ LASSERT(update->ur_dt != NULL);
+ rc = dt_trans_start(env, update->ur_dt, th);
+ if (rc != 0)
+ return rc;
+ }
+ }
return dt_trans_start(env, lod->lod_child, th);
}
-static int lod_trans_stop(const struct lu_env *env, struct thandle *th)
+static int lod_trans_stop(const struct lu_env *env, struct dt_device *dt,
+ struct thandle *th)
{
- struct update_request *update;
- struct update_request *tmp;
- int rc = 0;
- int rc2 = 0;
+ struct thandle_update *tu = th->th_update;
+ struct update_request *update;
+ struct update_request *tmp;
+ int rc2 = 0;
+ int rc;
+ ENTRY;
+
+ rc = dt_trans_stop(env, th->th_dev, th);
+ if (likely(tu == NULL))
+ RETURN(rc);
- cfs_list_for_each_entry_safe(update, tmp,
- &th->th_remote_update_list,
- ur_list) {
- th->th_current_request = update;
+ list_for_each_entry_safe(update, tmp, &tu->tu_remote_update_list,
+ ur_list) {
+ /* update will be freed inside dt_trans_stop */
rc2 = dt_trans_stop(env, update->ur_dt, th);
if (unlikely(rc2 != 0 && rc == 0))
rc = rc2;
}
- rc2 = dt_trans_stop(env, th->th_dev, th);
-
- return rc2 != 0 ? rc2 : rc;
+ RETURN(rc);
}
static void lod_conf_get(const struct lu_env *env,
TGT_PTRS_PER_BLOCK]->ldi_tgt[(index) % TGT_PTRS_PER_BLOCK])
#define OST_TGT(lod, index) LTD_TGT(&lod->lod_ost_descs, index)
+#define MDT_TGT(lod, index) LTD_TGT(&lod->lod_mdt_descs, index)
struct lod_tgt_descs {
/* list of known TGTs */
struct lod_tgt_desc_idx *ltd_tgt_idx[TGT_PTRS];
#define ltd_ost ltd_tgt
#define lod_ost_desc lod_tgt_desc
+#define lod_mdts lod_mdt_descs.ltd_tgts
+#define lod_mdt_bitmap lod_mdt_descs.ltd_tgt_bitmap
+#define lod_remote_mdt_count lod_mdt_descs.ltd_tgtnr
+#define lod_mdts_size lod_mdt_descs.ltd_tgts_size
+#define ltd_mdt ltd_tgt
+#define lod_mdt_desc lod_tgt_desc
+
+struct lod_dir_stripe_info {
+ __u32 ldsi_stripe_offset;
+ __u32 ldsi_def_stripenr;
+ __u32 ldsi_def_stripe_offset;
+ __u32 ldsi_def_hash_type;
+ __u32 ldsi_hash_type;
+
+ unsigned int ldsi_striping_cached:1,
+ ldsi_def_striping_set:1,
+ ldsi_striped:1;
+};
+
/*
* XXX: shrink this structure, currently it's 72bytes on 32bit arch,
* so, slab will be allocating 128bytes
struct dt_object ldo_obj;
/* if object is striped, then the next fields describe stripes */
+ /* For striped directory, ldo_stripenr == slave stripe count */
__u16 ldo_stripenr;
__u16 ldo_layout_gen;
__u32 ldo_stripe_size;
* is cached in stripenr/stripe_size */
unsigned int ldo_stripes_allocated:16,
ldo_striping_cached:1,
- ldo_def_striping_set:1;
+ ldo_def_striping_set:1,
+ /* ldo_dir_slave_stripe indicate this is a slave stripe of
+ * a striped dir */
+ ldo_dir_slave_stripe:1;
__u32 ldo_def_stripe_size;
__u16 ldo_def_stripenr;
__u16 ldo_def_stripe_offset;
+ struct lod_dir_stripe_info *ldo_dir_stripe;
mdsno_t ldo_mds_num;
};
+#define ldo_dir_stripe_offset ldo_dir_stripe->ldsi_stripe_offset
+#define ldo_dir_def_stripenr ldo_dir_stripe->ldsi_def_stripenr
+#define ldo_dir_hash_type ldo_dir_stripe->ldsi_hash_type
+#define ldo_dir_def_hash_type ldo_dir_stripe->ldsi_def_hash_type
+#define ldo_dir_striping_cached ldo_dir_stripe->ldsi_striping_cached
+#define ldo_dir_striped ldo_dir_stripe->ldsi_striped
+#define ldo_dir_def_striping_set ldo_dir_stripe->ldsi_def_striping_set
+#define ldo_dir_def_stripe_offset ldo_dir_stripe->ldsi_def_stripe_offset
struct lod_it {
struct dt_object *lit_obj; /* object from the layer below */
/* per-thread buffer for LOV EA */
void *lti_ea_store;
int lti_ea_store_size;
+ /* per-thread buffer for LMV EA */
struct lu_buf lti_buf;
struct ost_id lti_ostid;
struct lu_fid lti_fid;
int lod_fini_tgt(const struct lu_env *env, struct lod_device *lod,
struct lod_tgt_descs *ltd, bool for_ost);
int lod_load_striping(const struct lu_env *env, struct lod_object *mo);
-int lod_get_lov_ea(const struct lu_env *env, struct lod_object *mo);
+
+int lod_get_ea(const struct lu_env *env, struct lod_object *lo,
+ const char *name);
+static inline int
+lod_get_lov_ea(const struct lu_env *env, struct lod_object *lo)
+{
+ return lod_get_ea(env, lo, XATTR_NAME_LOV);
+}
+
+static inline int
+lod_get_lmv_ea(const struct lu_env *env, struct lod_object *lo)
+{
+ return lod_get_ea(env, lo, XATTR_NAME_LMV);
+}
+
+static inline int
+lod_get_default_lmv_ea(const struct lu_env *env, struct lod_object *lo)
+{
+ return lod_get_ea(env, lo, XATTR_NAME_DEFALT_LMV);
+}
+
void lod_fix_desc(struct lov_desc *desc);
void lod_fix_desc_qos_maxage(__u32 *val);
void lod_fix_desc_pattern(__u32 *val);
int lod_pools_fini(struct lod_device *m);
int lod_parse_striping(const struct lu_env *env, struct lod_object *mo,
const struct lu_buf *buf);
+int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
+ const struct lu_buf *buf);
int lod_initialize_objects(const struct lu_env *env, struct lod_object *mo,
struct lov_ost_data_v1 *objs);
int lod_store_def_striping(const struct lu_env *env, struct dt_object *dt,
int lod_verify_striping(struct lod_device *d, const struct lu_buf *buf, int specific);
int lod_generate_and_set_lovea(const struct lu_env *env,
struct lod_object *mo, struct thandle *th);
-
+int lod_ea_store_resize(struct lod_thread_info *info, int size);
/* lod_pool.c */
int lod_ost_pool_add(struct ost_pool *op, __u32 idx, unsigned int min_count);
int lod_ost_pool_remove(struct ost_pool *op, __u32 idx);
{
int round = size_roundup_power2(size);
- LASSERT(round <= lov_mds_md_size(LOV_MAX_STRIPE_COUNT, LOV_MAGIC_V3));
+ LASSERT(round <=
+ lov_mds_md_size(LOV_MAX_STRIPE_COUNT, LOV_MAGIC_V3));
if (info->lti_ea_store) {
LASSERT(info->lti_ea_store_size);
LASSERT(info->lti_ea_store_size < round);
objs[i].l_ost_gen = cpu_to_le32(0);
rc = lod_fld_lookup(env, lod, fid, &index, LU_SEQ_RANGE_OST);
if (rc < 0) {
- lod_object_free_striping(env, lo);
CERROR("%s: Can not locate "DFID": rc = %d\n",
lod2obd(lod)->obd_name, PFID(fid), rc);
+ lod_object_free_striping(env, lo);
RETURN(rc);
}
objs[i].l_ost_idx = cpu_to_le32(index);
RETURN(rc);
}
-int lod_get_lov_ea(const struct lu_env *env, struct lod_object *lo)
+int lod_get_ea(const struct lu_env *env, struct lod_object *lo,
+ const char *name)
{
- struct lod_thread_info *info = lod_env_info(env);
- struct dt_object *next = dt_object_child(&lo->ldo_obj);
+ struct lod_thread_info *info = lod_env_info(env);
+ struct dt_object *next = dt_object_child(&lo->ldo_obj);
int rc;
ENTRY;
LASSERT(info);
- if (unlikely(info->lti_ea_store_size == 0)) {
+ if (unlikely(info->lti_ea_store == NULL)) {
/* just to enter in allocation block below */
rc = -ERANGE;
} else {
repeat:
info->lti_buf.lb_buf = info->lti_ea_store;
info->lti_buf.lb_len = info->lti_ea_store_size;
- rc = dt_xattr_get(env, next, &info->lti_buf, XATTR_NAME_LOV,
- BYPASS_CAPA);
+ rc = dt_xattr_get(env, next, &info->lti_buf, name, BYPASS_CAPA);
}
/* if object is not striped or inaccessible */
- if (rc == -ENODATA)
+ if (rc == -ENODATA || rc == -ENOENT)
RETURN(0);
if (rc == -ERANGE) {
/* EA doesn't fit, reallocate new buffer */
- rc = dt_xattr_get(env, next, &LU_BUF_NULL, XATTR_NAME_LOV,
+ rc = dt_xattr_get(env, next, &LU_BUF_NULL, name,
BYPASS_CAPA);
- if (rc == -ENODATA)
+ if (rc == -ENODATA || rc == -ENOENT)
RETURN(0);
else if (rc < 0)
RETURN(rc);
struct dt_object *next = dt_object_child(dt);
struct lov_user_md_v3 *v3;
int rc;
- int cplen = 0;
ENTRY;
- LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr));
-
+ if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
+ RETURN(-ENOTDIR);
/*
* store striping defaults into new directory
* used to implement defaults inheritance
lo->ldo_def_stripe_offset))
RETURN(0);
- /* XXX: use thread info */
- OBD_ALLOC_PTR(v3);
- if (v3 == NULL)
- RETURN(-ENOMEM);
-
- v3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3);
- v3->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
- v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
+ v3 = info->lti_ea_store;
+ if (info->lti_ea_store_size < sizeof(*v3)) {
+ rc = lod_ea_store_resize(info, sizeof(*v3));
+ if (rc != 0)
+ RETURN(rc);
+ v3 = info->lti_ea_store;
+ }
+ memset(v3, 0, sizeof(*v3));
+ v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
- if (lo->ldo_pool) {
- cplen = strlcpy(v3->lmm_pool_name, lo->ldo_pool,
- sizeof(v3->lmm_pool_name));
- if (cplen >= sizeof(v3->lmm_pool_name)) {
- OBD_FREE_PTR(v3);
- RETURN(-E2BIG);
- }
- }
-
+ v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
+ if (lo->ldo_pool)
+ strncpy(v3->lmm_pool_name, lo->ldo_pool,
+ sizeof(v3->lmm_pool_name));
info->lti_buf.lb_buf = v3;
info->lti_buf.lb_len = sizeof(*v3);
rc = dt_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, 0, th,
BYPASS_CAPA);
- OBD_FREE_PTR(v3);
-
RETURN(rc);
}
{
struct lod_thread_info *info = lod_env_info(env);
struct dt_object *next = dt_object_child(&lo->ldo_obj);
- int rc;
+ int rc = 0;
ENTRY;
/*
if (lo->ldo_stripe != NULL)
GOTO(out, rc = 0);
- if (!dt_object_exists(next))
+ /* Do not load stripe for slaves of striped dir */
+ if (!dt_object_exists(next) || lo->ldo_dir_slave_stripe)
GOTO(out, rc = 0);
/* only regular files can be striped */
- if (!(lu_object_attr(lod2lu_obj(lo)) & S_IFREG))
- GOTO(out, rc = 0);
-
- rc = lod_get_lov_ea(env, lo);
- if (rc <= 0)
- GOTO(out, rc);
-
- /*
- * there is LOV EA (striping information) in this object
- * let's parse it and create in-core objects for the stripes
- */
- info->lti_buf.lb_buf = info->lti_ea_store;
- info->lti_buf.lb_len = info->lti_ea_store_size;
- rc = lod_parse_striping(env, lo, &info->lti_buf);
+ if (lu_object_attr(lod2lu_obj(lo)) & S_IFREG) {
+ rc = lod_get_lov_ea(env, lo);
+ if (rc <= 0)
+ GOTO(out, rc);
+ /*
+ * there is LOV EA (striping information) in this object
+ * let's parse it and create in-core objects for the stripes
+ */
+ info->lti_buf.lb_buf = info->lti_ea_store;
+ info->lti_buf.lb_len = info->lti_ea_store_size;
+ rc = lod_parse_striping(env, lo, &info->lti_buf);
+ } else if (lu_object_attr(lod2lu_obj(lo)) & S_IFDIR) {
+ rc = lod_get_lmv_ea(env, lo);
+ if (rc <= 0)
+ GOTO(out, rc);
+ /*
+ * there is LOV EA (striping information) in this object
+ * let's parse it and create in-core objects for the stripes
+ */
+ info->lti_buf.lb_buf = info->lti_ea_store;
+ info->lti_buf.lb_len = info->lti_ea_store_size;
+ rc = lod_parse_dir_striping(env, lo, &info->lti_buf);
+ }
out:
dt_write_unlock(env, next);
RETURN(rc);
#include <lustre_fid.h>
#include <lustre_param.h>
#include <lustre_fid.h>
+#include <lustre_lmv.h>
#include <obd_lov.h>
#include "lod_internal.h"
+static const char dot[] = ".";
+static const char dotdot[] = "..";
+
extern struct kmem_cache *lod_object_kmem;
static const struct dt_body_operations lod_body_lnk_ops;
* Therefore we need not load striping unless ownership is
* changing. This should save memory and (we hope) speed up
* rename(). */
- if (!(attr->la_valid & (LA_UID | LA_GID)))
- RETURN(rc);
-
+ if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
+ if (!(attr->la_valid & (LA_UID | LA_GID)))
+ RETURN(rc);
+ } else {
+ if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
+ LA_ATIME | LA_MTIME | LA_CTIME)))
+ RETURN(rc);
+ }
/*
* load striping information, notice we don't do this when object
* is being initialized as we don't need this information till
if (rc)
RETURN(rc);
+ if (lo->ldo_stripenr == 0)
+ RETURN(0);
+
+ if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) {
+ struct lu_attr *la = &lod_env_info(env)->lti_attr;
+ bool setattr_time = false;
+
+ rc = dt_attr_get(env, dt_object_child(dt), la,
+ BYPASS_CAPA);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* If it will only setattr time, it will only set
+ * time < current_time */
+ if ((attr->la_valid & LA_ATIME &&
+ attr->la_atime < la->la_atime) ||
+ (attr->la_valid & LA_CTIME &&
+ attr->la_ctime < la->la_ctime) ||
+ (attr->la_valid & LA_MTIME &&
+ attr->la_mtime < la->la_mtime))
+ setattr_time = true;
+
+ if (!setattr_time)
+ RETURN(0);
+ }
/*
* if object is striped declare changes on the stripes
*/
- LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
+ LASSERT(lo->ldo_stripe);
for (i = 0; i < lo->ldo_stripenr; i++) {
LASSERT(lo->ldo_stripe[i]);
+
rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
if (rc) {
CERROR("failed declaration: %d\n", rc);
if (rc)
RETURN(rc);
- if (!(attr->la_valid & (LA_UID | LA_GID)))
- RETURN(rc);
+ if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
+ if (!(attr->la_valid & (LA_UID | LA_GID)))
+ RETURN(rc);
+ } else {
+ if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
+ LA_ATIME | LA_MTIME | LA_CTIME)))
+ RETURN(rc);
+ }
+
+ if (lo->ldo_stripenr == 0)
+ RETURN(0);
+
+ if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) {
+ struct lu_attr *la = &lod_env_info(env)->lti_attr;
+ bool setattr_time = false;
+
+ rc = dt_attr_get(env, dt_object_child(dt), la,
+ BYPASS_CAPA);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* If it will only setattr time, it will only set
+ * time < current_time */
+ if ((attr->la_valid & LA_ATIME &&
+ attr->la_atime < la->la_atime) ||
+ (attr->la_valid & LA_CTIME &&
+ attr->la_atime < la->la_ctime) ||
+ (attr->la_valid & LA_MTIME &&
+ attr->la_atime < la->la_mtime))
+ setattr_time = true;
+
+ if (!setattr_time)
+ RETURN(0);
+ }
/*
* if object is striped, apply changes to all the stripes
*/
- LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
+ LASSERT(lo->ldo_stripe);
for (i = 0; i < lo->ldo_stripenr; i++) {
LASSERT(lo->ldo_stripe[i]);
rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
RETURN(rc);
}
+static int lod_verify_md_striping(struct lod_device *lod,
+ const struct lmv_user_md_v1 *lum)
+{
+ int rc = 0;
+ ENTRY;
+
+ if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
+ GOTO(out, rc = -EINVAL);
+
+ if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
+ GOTO(out, rc = -EINVAL);
+
+ if (unlikely(le32_to_cpu(lum->lum_stripe_count) >
+ lod->lod_remote_mdt_count + 1))
+ GOTO(out, rc = -EINVAL);
+out:
+ if (rc != 0)
+ CERROR("%s: invalid lmv_user_md: magic = %x, "
+ "stripe_offset = %d, stripe_count = %u: rc = %d\n",
+ lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
+ (int)le32_to_cpu(lum->lum_stripe_offset),
+ le32_to_cpu(lum->lum_stripe_count), rc);
+ return rc;
+}
+
+int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
+ struct lu_buf *lmv_buf)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lmv_mds_md_v1 *lmm1;
+ int stripe_count;
+ int lmm_size;
+ int i;
+ int rc;
+ __u32 mdtidx;
+ ENTRY;
+
+ LASSERT(lo->ldo_dir_striped != 0);
+ LASSERT(lo->ldo_stripenr > 0);
+ stripe_count = lo->ldo_stripenr + 1;
+ lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
+ if (info->lti_ea_store_size < lmm_size) {
+ rc = lod_ea_store_resize(info, lmm_size);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
+ lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
+ lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
+ lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
+ rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
+ &mdtidx, LU_SEQ_RANGE_MDT);
+ if (rc != 0)
+ RETURN(rc);
+
+ lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
+ fid_cpu_to_le(&lmm1->lmv_stripe_fids[0], lu_object_fid(&dt->do_lu));
+ for (i = 0; i < lo->ldo_stripenr; i++) {
+ struct dt_object *dto;
+
+ dto = lo->ldo_stripe[i];
+ LASSERT(dto != NULL);
+ fid_cpu_to_le(&lmm1->lmv_stripe_fids[i + 1],
+ lu_object_fid(&dto->do_lu));
+ }
+
+ lmv_buf->lb_buf = info->lti_ea_store;
+ lmv_buf->lb_len = lmm_size;
+ lo->ldo_dir_striping_cached = 1;
+
+ RETURN(rc);
+}
+
+int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
+ const struct lu_buf *buf)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+ struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct dt_object **stripe;
+ union lmv_mds_md *lmm = buf->lb_buf;
+ struct lmv_mds_md_v1 *lmv1 = &lmm->lmv_md_v1;
+ struct lu_fid *fid = &info->lti_fid;
+ int i;
+ int rc = 0;
+ ENTRY;
+
+ if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
+ RETURN(-EINVAL);
+
+ if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
+ RETURN(0);
+
+ fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[0]);
+ /* Do not load striping information for slave inode */
+ if (!lu_fid_eq(fid, lu_object_fid(&lo->ldo_obj.do_lu))) {
+ lo->ldo_dir_slave_stripe = 1;
+ RETURN(0);
+ }
+
+ LASSERT(lo->ldo_stripe == NULL);
+ OBD_ALLOC(stripe, sizeof(stripe[0]) *
+ (le32_to_cpu(lmv1->lmv_stripe_count) - 1));
+ if (stripe == NULL)
+ RETURN(-ENOMEM);
+
+ /* skip master stripe */
+ for (i = 1; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
+ struct lod_tgt_desc *tgt;
+ int idx;
+ struct dt_object *dto;
+
+ fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
+ rc = lod_fld_lookup(env, lod, fid,
+ &idx, LU_SEQ_RANGE_MDT);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ tgt = LTD_TGT(ltd, idx);
+ if (tgt == NULL)
+ GOTO(out, rc = -ESTALE);
+
+ dto = dt_locate_at(env, tgt->ltd_tgt, fid,
+ lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
+ NULL);
+ if (IS_ERR(dto))
+ GOTO(out, rc = PTR_ERR(dto));
+
+ stripe[i - 1] = dto;
+ }
+out:
+ lo->ldo_stripe = stripe;
+ lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count) - 1;
+ lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count) - 1;
+ if (rc != 0)
+ lod_object_free_striping(env, lo);
+
+ RETURN(rc);
+}
+
+static int lod_prep_md_striped_create(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_attr *attr,
+ const struct lmv_user_md_v1 *lum,
+ struct thandle *th)
+{
+ struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
+ struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct dt_object **stripe;
+ struct lu_buf lmv_buf;
+ int stripe_count;
+ int *idx_array;
+ int rc = 0;
+ int i;
+ int j;
+ ENTRY;
+
+ /* The lum has been verifed in lod_verify_md_striping */
+ LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
+ LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
+
+ /* Do not need allocated master stripe */
+ stripe_count = le32_to_cpu(lum->lum_stripe_count);
+ OBD_ALLOC(stripe, sizeof(stripe[0]) * (stripe_count - 1));
+ if (stripe == NULL)
+ RETURN(-ENOMEM);
+
+ OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
+ if (idx_array == NULL)
+ GOTO(out_free, rc = -ENOMEM);
+
+ idx_array[0] = le32_to_cpu(lum->lum_stripe_offset);
+ for (i = 1; i < stripe_count; i++) {
+ struct lod_tgt_desc *tgt;
+ struct dt_object *dto;
+ struct lu_fid fid;
+ int idx;
+ struct lu_object_conf conf = { 0 };
+
+ idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
+
+ for (j = 0; j < lod->lod_remote_mdt_count;
+ j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
+ bool already_allocated = false;
+ int k;
+
+ CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
+ " allocated %d, last allocated %d\n", idx,
+ lod->lod_remote_mdt_count, i, idx_array[i - 1]);
+
+ /* Find next avaible target */
+ if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
+ continue;
+
+ /* check whether the idx already exists
+ * in current allocated array */
+ for (k = 0; k < i; k++) {
+ if (idx_array[k] == idx) {
+ already_allocated = true;
+ break;
+ }
+ }
+
+ if (already_allocated)
+ continue;
+
+ break;
+ }
+
+ /* Can not allocate more stripes */
+ if (j == lod->lod_remote_mdt_count) {
+ CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
+ lod2obd(lod)->obd_name, stripe_count, i - 1);
+ break;
+ }
+
+ CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
+ " allocated %d, last allocated %d\n", idx,
+ lod->lod_remote_mdt_count, i, idx_array[i - 1]);
+
+ tgt = LTD_TGT(ltd, idx);
+ LASSERT(tgt != NULL);
+
+ rc = obd_fid_alloc(tgt->ltd_exp, &fid, NULL);
+ if (rc < 0)
+ GOTO(out_put, rc);
+ rc = 0;
+
+ conf.loc_flags = LOC_F_NEW;
+ dto = dt_locate_at(env, tgt->ltd_tgt, &fid,
+ dt->do_lu.lo_dev->ld_site->ls_top_dev, &conf);
+ if (IS_ERR(dto))
+ GOTO(out_put, rc = PTR_ERR(dto));
+ stripe[i - 1] = dto;
+ idx_array[i] = idx;
+ }
+
+ lo->ldo_dir_striped = 1;
+ lo->ldo_stripe = stripe;
+ lo->ldo_stripenr = i - 1;
+ lo->ldo_stripes_allocated = stripe_count - 1;
+
+ if (lo->ldo_stripenr == 0)
+ GOTO(out_put, rc = -ENOSPC);
+
+ rc = lod_prep_lmv_md(env, dt, &lmv_buf);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ for (i = 0; i < lo->ldo_stripenr; i++) {
+ struct dt_object *dto;
+
+ dto = stripe[i];
+ /* only create slave striped object */
+ rc = dt_declare_create(env, dto, attr, NULL, NULL, th);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ if (!dt_try_as_dir(env, dto))
+ GOTO(out_put, rc = -EINVAL);
+
+ rc = dt_declare_insert(env, dto,
+ (const struct dt_rec *)lu_object_fid(&dto->do_lu),
+ (const struct dt_key *)dot, th);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ /* master stripe FID will be put to .. */
+ rc = dt_declare_insert(env, dto,
+ (const struct dt_rec *)lu_object_fid(&dt->do_lu),
+ (const struct dt_key *)dotdot, th);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ /* probably nothing to inherite */
+ if (lo->ldo_striping_cached &&
+ !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
+ lo->ldo_def_stripenr,
+ lo->ldo_def_stripe_offset)) {
+ struct lod_thread_info *info;
+ struct lov_user_md_v3 *v3;
+
+ /* sigh, lti_ea_store has been used for lmv_buf,
+ * so we have to allocate buffer for default
+ * stripe EA */
+ OBD_ALLOC_PTR(v3);
+ if (v3 == NULL)
+ GOTO(out_put, rc = -ENOMEM);
+
+ memset(v3, 0, sizeof(*v3));
+ v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
+ v3->lmm_stripe_count =
+ cpu_to_le32(lo->ldo_def_stripenr);
+ v3->lmm_stripe_offset =
+ cpu_to_le32(lo->ldo_def_stripe_offset);
+ v3->lmm_stripe_size =
+ cpu_to_le32(lo->ldo_def_stripe_size);
+ if (lo->ldo_pool)
+ strncpy(v3->lmm_pool_name, lo->ldo_pool,
+ LOV_MAXPOOLNAME);
+
+ info = lod_env_info(env);
+ info->lti_buf.lb_buf = v3;
+ info->lti_buf.lb_len = sizeof(*v3);
+ rc = dt_declare_xattr_set(env, dto,
+ &info->lti_buf,
+ XATTR_NAME_LOV,
+ 0, th);
+ OBD_FREE_PTR(v3);
+ if (rc != 0)
+ GOTO(out_put, rc);
+ }
+ rc = dt_declare_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, 0,
+ th);
+ if (rc != 0)
+ GOTO(out_put, rc);
+ }
+
+ rc = dt_declare_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, 0, th);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+out_put:
+ if (rc < 0) {
+ for (i = 0; i < stripe_count - 1; i++)
+ if (stripe[i] != NULL)
+ lu_object_put(env, &stripe[i]->do_lu);
+ OBD_FREE(stripe, sizeof(stripe[0]) * (stripe_count - 1));
+ }
+
+out_free:
+ if (idx_array != NULL)
+ OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
+
+ RETURN(rc);
+}
+
+/**
+ * Declare create striped md object.
+ */
+static int lod_declare_xattr_set_lmv(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_attr *attr,
+ const struct lu_buf *lum_buf,
+ struct thandle *th)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
+ struct lmv_user_md_v1 *lum;
+ int rc;
+ ENTRY;
+
+ lum = lum_buf->lb_buf;
+ LASSERT(lum != NULL);
+
+ CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
+ le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
+ (int)le32_to_cpu(lum->lum_stripe_offset));
+
+ if (le32_to_cpu(lum->lum_stripe_count) <= 1)
+ GOTO(out, rc = 0);
+
+ rc = lod_verify_md_striping(lod, lum);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ /* prepare dir striped objects */
+ rc = lod_prep_md_striped_create(env, dt, attr, lum, th);
+ if (rc != 0) {
+ /* failed to create striping, let's reset
+ * config so that others don't get confused */
+ lod_object_free_striping(env, lo);
+ GOTO(out, rc);
+ }
+out:
+ RETURN(rc);
+}
+
/*
* LOV xattr is a storage for striping, and LOD owns this xattr.
* but LOD allows others to control striping to some extent
* LU_XATTR_REPLACE is set to indicate a layout swap
*/
mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
- if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV) &&
+ if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
!(fl & LU_XATTR_REPLACE)) {
/*
* this is a request to manipulate object's striping
rc = lod_declare_striped_object(env, dt, attr, buf, th);
if (rc)
RETURN(rc);
+ } else {
+ rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
}
- rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
-
RETURN(rc);
}
+static void lod_lov_stripe_cache_clear(struct lod_object *lo)
+{
+ lo->ldo_striping_cached = 0;
+ lo->ldo_def_striping_set = 0;
+ lod_object_set_pool(lo, NULL);
+ lo->ldo_def_stripe_size = 0;
+ lo->ldo_def_stripenr = 0;
+}
+
static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
struct dt_object *dt,
const struct lu_buf *buf,
int rc;
ENTRY;
- LASSERT(l->ldo_stripe == NULL);
- l->ldo_striping_cached = 0;
- l->ldo_def_striping_set = 0;
- lod_object_set_pool(l, NULL);
- l->ldo_def_stripe_size = 0;
- l->ldo_def_stripenr = 0;
-
+ /* If it is striped dir, we should clear the stripe cache for
+ * slave stripe as well, but there are no effective way to
+ * notify the LOD on the slave MDT, so we do not cache stripe
+ * information for slave stripe for now. XXX*/
+ lod_lov_stripe_cache_clear(l);
LASSERT(buf != NULL && buf->lb_buf != NULL);
lum = buf->lb_buf;
RETURN(rc);
}
+static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name,
+ int fl, struct thandle *th,
+ struct lustre_capa *capa)
+{
+ struct lod_object *lo = lod_dt_obj(dt);
+ struct lu_buf lmv_buf;
+ int i;
+ int rc;
+ ENTRY;
+
+ if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
+ RETURN(-ENOTDIR);
+
+ /* The stripes are supposed to be allocated in declare phase,
+ * if there are no stripes being allocated, it will skip */
+ if (lo->ldo_stripenr == 0)
+ RETURN(0);
+
+ rc = lod_prep_lmv_md(env, dt, &lmv_buf);
+ if (rc != 0)
+ RETURN(rc);
+
+ for (i = 0; i < lo->ldo_stripenr; i++) {
+ struct dt_object *dto;
+ struct lu_attr *attr = &lod_env_info(env)->lti_attr;
+
+ dto = lo->ldo_stripe[i];
+ memset(attr, 0, sizeof(*attr));
+ attr->la_valid = LA_TYPE | LA_MODE;
+ attr->la_mode = S_IFDIR;
+ rc = dt_create(env, dto, attr, NULL, NULL, th);
+ if (rc != 0)
+ RETURN(rc);
+
+ rc = dt_insert(env, dto,
+ (const struct dt_rec *)lu_object_fid(&dto->do_lu),
+ (const struct dt_key *)dot, th, capa, 0);
+ if (rc != 0)
+ RETURN(rc);
+
+ rc = dt_insert(env, dto,
+ (struct dt_rec *)lu_object_fid(&dt->do_lu),
+ (const struct dt_key *)dotdot, th, capa, 0);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (lo->ldo_striping_cached &&
+ !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
+ lo->ldo_def_stripenr,
+ lo->ldo_def_stripe_offset)) {
+ struct lod_thread_info *info;
+ struct lov_user_md_v3 *v3;
+
+ /* sigh, lti_ea_store has been used for lmv_buf,
+ * so we have to allocate buffer for default
+ * stripe EA */
+ OBD_ALLOC_PTR(v3);
+ if (v3 == NULL)
+ RETURN(-ENOMEM);
+
+ memset(v3, 0, sizeof(*v3));
+ v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
+ v3->lmm_stripe_count =
+ cpu_to_le32(lo->ldo_def_stripenr);
+ v3->lmm_stripe_offset =
+ cpu_to_le32(lo->ldo_def_stripe_offset);
+ v3->lmm_stripe_size =
+ cpu_to_le32(lo->ldo_def_stripe_size);
+ if (lo->ldo_pool)
+ strncpy(v3->lmm_pool_name, lo->ldo_pool,
+ LOV_MAXPOOLNAME);
+
+ info = lod_env_info(env);
+ info->lti_buf.lb_buf = v3;
+ info->lti_buf.lb_len = sizeof(*v3);
+ rc = dt_xattr_set(env, dto, &info->lti_buf,
+ XATTR_NAME_LOV, 0, th, capa);
+ OBD_FREE_PTR(v3);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ rc = dt_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, fl, th,
+ capa);
+ }
+
+ rc = dt_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, fl, th, capa);
+
+ RETURN(rc);
+}
+
static int lod_xattr_set(const struct lu_env *env,
struct dt_object *dt, const struct lu_buf *buf,
const char *name, int fl, struct thandle *th,
ENTRY;
attr = dt->do_lu.lo_header->loh_attr & S_IFMT;
- if (S_ISDIR(attr)) {
- if (strcmp(name, XATTR_NAME_LOV) == 0)
- rc = lod_xattr_set_lov_on_dir(env, dt, buf, name,
- fl, th, capa);
- else
- rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
-
+ if (S_ISDIR(attr) && strcmp(name, XATTR_NAME_LOV) == 0) {
+ rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
} else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) {
/* in case of lov EA swap, just set it
* if not, it is a replay so check striping match what we
return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
}
-static int lod_cache_parent_striping(const struct lu_env *env,
- struct lod_object *lp)
+
+static int lod_cache_parent_lov_striping(const struct lu_env *env,
+ struct lod_object *lp)
{
+ struct lod_thread_info *info = lod_env_info(env);
struct lov_user_md_v1 *v1 = NULL;
struct lov_user_md_v3 *v3 = NULL;
int rc;
ENTRY;
- /* dt_ah_init() is called from MDD without parent being write locked
+ /* called from MDD without parent being write locked,
* lock it here */
dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
- if (lp->ldo_striping_cached)
- GOTO(unlock, rc = 0);
-
rc = lod_get_lov_ea(env, lp);
if (rc < 0)
GOTO(unlock, rc);
GOTO(unlock, rc = 0);
}
- v1 = (struct lov_user_md_v1 *)lod_env_info(env)->lti_ea_store;
+ rc = 0;
+ v1 = info->lti_ea_store;
if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
lustre_swab_lov_user_md_v1(v1);
else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
lp->ldo_striping_cached = 1;
lp->ldo_def_striping_set = 1;
-
if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
/* XXX: sanity check here */
v3 = (struct lov_user_md_v3 *) v1;
if (v3->lmm_pool_name[0])
lod_object_set_pool(lp, v3->lmm_pool_name);
}
+ EXIT;
+unlock:
+ dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
+ return rc;
+}
+
+
+static int lod_cache_parent_lmv_striping(const struct lu_env *env,
+ struct lod_object *lp)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lmv_user_md_v1 *v1 = NULL;
+ int rc;
+ ENTRY;
+
+ /* called from MDD without parent being write locked,
+ * lock it here */
+ dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
+ rc = lod_get_default_lmv_ea(env, lp);
+ if (rc < 0)
+ GOTO(unlock, rc);
+
+ if (rc < sizeof(struct lmv_user_md)) {
+ /* don't lookup for non-existing or invalid striping */
+ lp->ldo_dir_def_striping_set = 0;
+ lp->ldo_dir_striping_cached = 1;
+ lp->ldo_dir_def_stripenr = 0;
+ lp->ldo_dir_def_stripe_offset =
+ (typeof(v1->lum_stripe_offset))(-1);
+ lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
+ GOTO(unlock, rc = 0);
+ }
+
+ rc = 0;
+ v1 = info->lti_ea_store;
- CDEBUG(D_OTHER, "def. striping: # %d, sz %d, off %d %s%s on "DFID"\n",
- lp->ldo_def_stripenr, lp->ldo_def_stripe_size,
- lp->ldo_def_stripe_offset, v3 ? "from " : "",
- v3 ? lp->ldo_pool : "", PFID(lu_object_fid(&lp->ldo_obj.do_lu)));
+ lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count) - 1;
+ lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
+ lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
+ lp->ldo_dir_def_striping_set = 1;
+ lp->ldo_dir_striping_cached = 1;
EXIT;
unlock:
return rc;
}
+static int lod_cache_parent_striping(const struct lu_env *env,
+ struct lod_object *lp,
+ umode_t child_mode)
+{
+ int rc = 0;
+ ENTRY;
+
+ rc = lod_load_striping(env, lp);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (!lp->ldo_striping_cached) {
+ /* we haven't tried to get default striping for
+ * the directory yet, let's cache it in the object */
+ rc = lod_cache_parent_lov_striping(env, lp);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
+ rc = lod_cache_parent_lmv_striping(env, lp);
+
+ RETURN(rc);
+}
+
/**
* used to transfer default striping data to the object being created
*/
NULL : nextp, nextc, child_mode);
if (S_ISDIR(child_mode)) {
- if (lp->ldo_striping_cached == 0) {
- /* we haven't tried to get default striping for
- * the directory yet, let's cache it in the object */
- lod_cache_parent_striping(env, lp);
+ int rc;
+
+ if (lc->ldo_dir_stripe == NULL) {
+ OBD_ALLOC_PTR(lc->ldo_dir_stripe);
+ if (lc->ldo_dir_stripe == NULL)
+ return;
}
+
+ if (lp->ldo_dir_stripe == NULL) {
+ OBD_ALLOC_PTR(lp->ldo_dir_stripe);
+ if (lp->ldo_dir_stripe == NULL)
+ return;
+ }
+
+ rc = lod_cache_parent_striping(env, lp, child_mode);
+ if (rc != 0)
+ return;
+
/* transfer defaults to new directory */
if (lp->ldo_striping_cached) {
if (lp->ldo_pool)
lc->ldo_striping_cached = 1;
lc->ldo_def_striping_set = 1;
CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
- (int)lc->ldo_def_stripenr,
(int)lc->ldo_def_stripe_size,
- (int)lc->ldo_def_stripe_offset);
+ (int)lc->ldo_def_stripe_offset,
+ (int)lc->ldo_def_stripenr);
}
- return;
+
+ /* transfer dir defaults to new directory */
+ if (lp->ldo_dir_striping_cached) {
+ lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
+ lc->ldo_dir_def_stripe_offset =
+ lp->ldo_dir_def_stripe_offset;
+ lc->ldo_dir_def_hash_type =
+ lp->ldo_dir_def_hash_type;
+ lc->ldo_dir_striping_cached = 1;
+ lc->ldo_dir_def_striping_set = 1;
+ CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
+ (int)lc->ldo_dir_def_stripenr,
+ (int)lc->ldo_dir_def_stripe_offset,
+ lc->ldo_dir_def_hash_type);
+ }
+
+ /* If the directory is specified with certain stripes */
+ if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
+ const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
+ int rc;
+
+ rc = lod_verify_md_striping(d, lum1);
+ if (rc == 0 &&
+ le32_to_cpu(lum1->lum_stripe_count) > 1) {
+ /* Directory will be striped only if
+ * stripe_count > 1 */
+ lc->ldo_stripenr =
+ le32_to_cpu(lum1->lum_stripe_count) - 1;
+ lc->ldo_dir_stripe_offset =
+ le32_to_cpu(lum1->lum_stripe_offset);
+ lc->ldo_dir_hash_type =
+ le32_to_cpu(lum1->lum_hash_type);
+ CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
+ lc->ldo_stripenr,
+ (int)lc->ldo_dir_stripe_offset);
+ }
+ } else if (lp->ldo_dir_def_striping_set) {
+ /* If there are default dir stripe from parent */
+ lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
+ lc->ldo_dir_stripe_offset =
+ lp->ldo_dir_def_stripe_offset;
+ lc->ldo_dir_hash_type =
+ lp->ldo_dir_def_hash_type;
+ CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
+ lc->ldo_stripenr,
+ (int)lc->ldo_dir_stripe_offset);
+ } else {
+ /* set default stripe for this directory */
+ lc->ldo_stripenr = 0;
+ lc->ldo_dir_stripe_offset = -1;
+ }
+
+ CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
+ lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
+
+ goto out;
}
/*
*/
if (!lod_object_will_be_striped(S_ISREG(child_mode),
lu_object_fid(&child->do_lu)))
- return;
-
+ goto out;
/*
* try from the parent
*/
if (likely(parent)) {
- if (lp->ldo_striping_cached == 0) {
- /* we haven't tried to get default striping for
- * the directory yet, let's cache it in the object */
- lod_cache_parent_striping(env, lp);
- }
+ lod_cache_parent_striping(env, lp, child_mode);
lc->ldo_def_stripe_offset = (__u16) -1;
lc->ldo_stripenr, lc->ldo_stripe_size,
lc->ldo_pool ? lc->ldo_pool : "");
+out:
+ /* we do not cache stripe information for slave stripe, see
+ * lod_xattr_set_lov_on_dir */
+ if (lp != NULL && lp->ldo_dir_slave_stripe)
+ lod_lov_stripe_cache_clear(lp);
+
EXIT;
}
RETURN(rc);
}
-
/**
* Create declaration of striped object
*/
RETURN(rc);
}
+int lod_dir_striping_create_internal(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_attr *attr,
+ const struct dt_object_format *dof,
+ struct thandle *th,
+ bool declare)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct dt_object *next = dt_object_child(dt);
+ struct lod_object *lo = lod_dt_obj(dt);
+ int rc;
+ ENTRY;
+
+ if (lo->ldo_dir_def_striping_set &&
+ !LMVEA_DELETE_VALUES(lo->ldo_stripenr,
+ lo->ldo_dir_stripe_offset)) {
+ struct lmv_user_md_v1 *v1 = info->lti_ea_store;
+ int stripe_count = lo->ldo_stripenr + 1;
+
+ if (info->lti_ea_store_size < sizeof(*v1)) {
+ rc = lod_ea_store_resize(info, sizeof(*v1));
+ if (rc != 0)
+ RETURN(rc);
+ v1 = info->lti_ea_store;
+ }
+
+ memset(v1, 0, sizeof(*v1));
+ v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
+ v1->lum_stripe_count = cpu_to_le32(stripe_count);
+ v1->lum_stripe_offset =
+ cpu_to_le32(lo->ldo_dir_stripe_offset);
+
+ info->lti_buf.lb_buf = v1;
+ info->lti_buf.lb_len = sizeof(*v1);
+
+ if (declare)
+ rc = lod_declare_xattr_set_lmv(env, dt, attr,
+ &info->lti_buf, th);
+ else
+ rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
+ XATTR_NAME_LMV, 0, th,
+ BYPASS_CAPA);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ /* Transfer default LMV striping from the parent */
+ if (lo->ldo_dir_striping_cached &&
+ !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
+ lo->ldo_dir_def_stripe_offset)) {
+ struct lmv_user_md_v1 *v1 = info->lti_ea_store;
+ int def_stripe_count = lo->ldo_dir_def_stripenr + 1;
+
+ if (info->lti_ea_store_size < sizeof(*v1)) {
+ rc = lod_ea_store_resize(info, sizeof(*v1));
+ if (rc != 0)
+ RETURN(rc);
+ v1 = info->lti_ea_store;
+ }
+
+ memset(v1, 0, sizeof(*v1));
+ v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
+ v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
+ v1->lum_stripe_offset =
+ cpu_to_le32(lo->ldo_dir_def_stripe_offset);
+ v1->lum_hash_type =
+ cpu_to_le32(lo->ldo_dir_def_hash_type);
+
+ info->lti_buf.lb_buf = v1;
+ info->lti_buf.lb_len = sizeof(*v1);
+ if (declare)
+ rc = dt_declare_xattr_set(env, next, &info->lti_buf,
+ XATTR_NAME_DEFALT_LMV, 0,
+ th);
+ else
+ rc = dt_xattr_set(env, next, &info->lti_buf,
+ XATTR_NAME_DEFALT_LMV, 0, th,
+ BYPASS_CAPA);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ /* Transfer default LOV striping from the parent */
+ if (lo->ldo_striping_cached &&
+ !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
+ lo->ldo_def_stripenr,
+ lo->ldo_def_stripe_offset)) {
+ struct lov_user_md_v3 *v3 = info->lti_ea_store;
+
+ if (info->lti_ea_store_size < sizeof(*v3)) {
+ rc = lod_ea_store_resize(info, sizeof(*v3));
+ if (rc != 0)
+ RETURN(rc);
+ v3 = info->lti_ea_store;
+ }
+
+ memset(v3, 0, sizeof(*v3));
+ v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
+ v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
+ v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
+ v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
+ if (lo->ldo_pool)
+ strncpy(v3->lmm_pool_name, lo->ldo_pool,
+ LOV_MAXPOOLNAME);
+
+ info->lti_buf.lb_buf = v3;
+ info->lti_buf.lb_len = sizeof(*v3);
+
+ if (declare)
+ rc = dt_declare_xattr_set(env, next, &info->lti_buf,
+ XATTR_NAME_LOV, 0, th);
+ else
+ rc = dt_xattr_set(env, next, &info->lti_buf,
+ XATTR_NAME_LOV, 0, th,
+ BYPASS_CAPA);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ RETURN(0);
+}
+
+static int lod_declare_dir_striping_create(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_attr *attr,
+ struct dt_object_format *dof,
+ struct thandle *th)
+{
+ return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
+}
+
+static int lod_dir_striping_create(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_attr *attr,
+ struct dt_object_format *dof,
+ struct thandle *th)
+{
+ return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
+}
+
static int lod_declare_object_create(const struct lu_env *env,
struct dt_object *dt,
struct lu_attr *attr,
if (lo->ldo_stripenr > 0)
rc = lod_declare_striped_object(env, dt, attr,
NULL, th);
- } else if (dof->dof_type == DFT_DIR && lo->ldo_striping_cached) {
- struct lod_thread_info *info = lod_env_info(env);
-
- struct lov_user_md_v3 *v3;
-
- if (LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
- lo->ldo_def_stripenr,
- lo->ldo_def_stripe_offset))
- RETURN(0);
-
- OBD_ALLOC_PTR(v3);
- if (v3 == NULL)
- RETURN(-ENOMEM);
-
- v3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3);
- v3->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
- fid_to_lmm_oi(lu_object_fid(&dt->do_lu), &v3->lmm_oi);
- lmm_oi_cpu_to_le(&v3->lmm_oi, &v3->lmm_oi);
- v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
- v3->lmm_stripe_count = cpu_to_le32(lo->ldo_def_stripenr);
- v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
- if (lo->ldo_pool)
- strncpy(v3->lmm_pool_name, lo->ldo_pool,
- LOV_MAXPOOLNAME);
-
- info->lti_buf.lb_buf = v3;
- info->lti_buf.lb_len = sizeof(*v3);
-
- /* to transfer default striping from the parent */
- rc = dt_declare_xattr_set(env, next, &info->lti_buf,
- XATTR_NAME_LOV, 0, th);
- OBD_FREE_PTR(v3);
+ } else if (dof->dof_type == DFT_DIR) {
+ rc = lod_declare_dir_striping_create(env, dt, attr, dof, th);
}
-
out:
RETURN(rc);
}
if (rc == 0) {
if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
- rc = lod_store_def_striping(env, dt, th);
- else if (lo->ldo_stripe)
+ rc = lod_dir_striping_create(env, dt, attr, dof, th);
+ else if (lo->ldo_stripe && dof->u.dof_reg.striped != 0)
rc = lod_striping_create(env, dt, attr, dof, th);
}
{
int i;
+ if (lo->ldo_dir_stripe != NULL) {
+ OBD_FREE_PTR(lo->ldo_dir_stripe);
+ lo->ldo_dir_stripe = NULL;
+ }
+
if (lo->ldo_stripe) {
LASSERT(lo->ldo_stripes_allocated > 0);
return (msfs->os_bavail < used);
}
-int lod_ea_store_resize(struct lod_thread_info *info, int size);
-
static inline int lod_qos_ost_in_use_clear(const struct lu_env *env, int stripes)
{
struct lod_thread_info *info = lod_env_info(env);
mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
lmmsize);
+ req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+ obddev->u.cli.cl_max_mds_easize);
+
/* for remote client, fetch remote perm for current user */
if (client_is_remote(exp))
req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
RETURN(rc);
/* calling ->ah_make_hint() is used to transfer information from parent */
- mdd_object_make_hint(env, mdd_pobj, son, attr);
+ mdd_object_make_hint(env, mdd_pobj, son, attr, spec);
handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
spec->sp_cr_flags, spec->no_create);
- if (spec->no_create || spec->sp_cr_flags & MDS_OPEN_HAS_EA) {
- /* replay case or lfs setstripe */
+ if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
spec->u.sp_ea.eadatalen);
} else {
GOTO(out, rc);
/* replay case, create LOV EA from client data */
- if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
+ if (spec->no_create ||
+ (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
const struct lu_buf *buf;
buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
spec->u.sp_ea.eadatalen);
- rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV,
- 0, handle);
+ rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
+ handle);
if (rc)
GOTO(out, rc);
}
if (rc)
return rc;
+ /* XXX: For remote create, it should indicate the remote RPC
+ * will be sent after local transaction is finished, which
+ * is not very nice, but it will be removed once we fully support
+ * async update */
+ if (mdd_object_remote(p) && handle->th_update != NULL)
+ handle->th_update->tu_sent_after_local_trans = 1;
out:
- return rc;
+ return rc;
}
static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
if (rc < 0)
GOTO(out_free, rc);
- mdd_object_make_hint(env, mdd_pobj, son, attr);
+ mdd_object_make_hint(env, mdd_pobj, son, attr, spec);
handle = mdd_trans_create(env, mdd);
if (IS_ERR(handle))
* probably this way we code can be made better.
*/
if (rc == 0 && (spec->no_create ||
- (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) {
+ (spec->sp_cr_flags & MDS_OPEN_HAS_EA &&
+ S_ISREG(attr->la_mode)))) {
const struct lu_buf *buf;
buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
spec->u.sp_ea.eadatalen);
rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
- BYPASS_CAPA);
+ BYPASS_CAPA);
}
if (rc == 0 && spec->sp_cr_flags & MDS_OPEN_VOLATILE)
struct lu_attr mti_tattr;
/** used to set c/mtime */
struct lu_attr mti_la_for_fix;
+ /* Only used in mdd_object_start */
+ struct lu_attr mti_la_for_start;
struct md_attr mti_ma;
struct obd_info mti_oi;
/* mti_ent and mti_key must be conjoint,
struct mdd_object *obj, struct lu_attr *la);
void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
- struct mdd_object *child, struct lu_attr *attr);
+ struct mdd_object *child, const struct lu_attr *attr,
+ const struct md_op_spec *spec);
static inline void mdd_object_get(struct mdd_object *o)
{
if (lu_object_exists(o)) {
struct mdd_object *mdd_obj = lu2mdd_obj(o);
- struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
+ struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
if (rc == 0)
}
void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
- struct mdd_object *child, struct lu_attr *attr)
+ struct mdd_object *child, const struct lu_attr *attr,
+ const struct md_op_spec *spec)
{
struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
- struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
+ struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
struct dt_object *nc = mdd_object_child(child);
+ memset(hint, 0, sizeof(*hint));
+
+ /* For striped directory, give striping EA to lod_ah_init, which will
+ * decide the stripe_offset and stripe count by it. */
+ if (S_ISDIR(attr->la_mode) &&
+ unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
+ hint->dah_eadata = spec->u.sp_ea.eadata;
+ hint->dah_eadata_len = spec->u.sp_ea.eadatalen;
+ } else {
+ hint->dah_eadata = NULL;
+ hint->dah_eadata_len = 0;
+ }
+
+ CDEBUG(D_INFO, DFID" eadata %p, len %d\n", PFID(mdd_object_fid(child)),
+ hint->dah_eadata, hint->dah_eadata_len);
/* @hint will be initialized by underlying device. */
nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
}
void mdd_trans_stop(const struct lu_env *env, struct mdd_device *mdd,
int result, struct thandle *handle)
{
- handle->th_result = result;
- mdd_child_ops(mdd)->dt_trans_stop(env, handle);
+ handle->th_result = result;
+ mdd_child_ops(mdd)->dt_trans_stop(env, mdd->mdd_child, handle);
}
buf->lb_buf = ma->ma_lmm;
buf->lb_len = ma->ma_lmm_size;
rc = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_LOV);
+
if (rc > 0) {
ma->ma_lmm_size = rc;
ma->ma_valid |= MA_LOV;
ma->ma_lmv = buffer->lb_buf;
ma->ma_lmv_size = buffer->lb_len;
ma->ma_need = MA_INODE;
- if (ma->ma_lmm_size > 0)
+ if (ma->ma_lmv_size > 0)
ma->ma_need |= MA_LMV;
} else {
ma->ma_lmm = buffer->lb_buf;
info->mti_opdata = 0;
info->mti_big_lmm_used = 0;
- /* To not check for split by default. */
info->mti_spec.no_create = 0;
info->mti_spec.sp_rm_entry = 0;
}
/* don't return transno along with error */
lustre_msg_set_transno(pill->rc_req->rq_repmsg, 0);
} else {
- /* now we need to pack right LOV EA */
- lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
- LASSERT(req_capsule_get_size(pill, &RMF_MDT_MD,
- RCL_SERVER) ==
- info->mti_attr.ma_lmm_size);
- memcpy(lmm, info->mti_attr.ma_lmm,
- info->mti_attr.ma_lmm_size);
+ /* now we need to pack right LOV/LMV EA */
+ lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
+ if (info->mti_attr.ma_valid & MA_LOV) {
+ LASSERT(req_capsule_get_size(pill, &RMF_MDT_MD,
+ RCL_SERVER) ==
+ info->mti_attr.ma_lmm_size);
+ memcpy(lmm, info->mti_attr.ma_lmm,
+ info->mti_attr.ma_lmm_size);
+ } else if (info->mti_attr.ma_valid & MA_LMV) {
+ LASSERT(req_capsule_get_size(pill, &RMF_MDT_MD,
+ RCL_SERVER) ==
+ info->mti_attr.ma_lmv_size);
+ memcpy(lmm, info->mti_attr.ma_lmv,
+ info->mti_attr.ma_lmv_size);
+ }
}
/* update mdt_max_mdsize so clients will be aware about that */
if (info->mti_mdt->mdt_max_mdsize < info->mti_attr.ma_lmm_size)
RETURN(-EFAULT);
} else {
req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL);
- }
+ if (S_ISDIR(attr->la_mode) &&
+ req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT) > 0) {
+ sp->u.sp_ea.eadata =
+ req_capsule_client_get(pill, &RMF_EADATA);
+ sp->u.sp_ea.eadatalen =
+ req_capsule_get_size(pill, &RMF_EADATA,
+ RCL_CLIENT);
+ sp->sp_cr_flags |= MDS_OPEN_HAS_EA;
+ }
+ }
- rc = mdt_dlmreq_unpack(info);
- RETURN(rc);
+ rc = mdt_dlmreq_unpack(info);
+ RETURN(rc);
}
static int mdt_link_unpack(struct mdt_thread_info *info)
mfd->mfd_mode = mode;
}
+/**
+ * prep ma_lmm/ma_lmv for md_attr from reply
+ */
+void mdt_prep_ma_buf_from_rep(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct md_attr *ma)
+{
+ LASSERT(ma->ma_lmv == NULL && ma->ma_lmm == NULL);
+ if (S_ISDIR(obj->mot_header.loh_attr)) {
+ ma->ma_lmv = req_capsule_server_get(info->mti_pill,
+ &RMF_MDT_MD);
+ ma->ma_lmv_size = req_capsule_get_size(info->mti_pill,
+ &RMF_MDT_MD,
+ RCL_SERVER);
+ if (ma->ma_lmv_size > 0)
+ ma->ma_need |= MA_LMV;
+ } else {
+ ma->ma_lmm = req_capsule_server_get(info->mti_pill,
+ &RMF_MDT_MD);
+ ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
+ &RMF_MDT_MD,
+ RCL_SERVER);
+ if (ma->ma_lmm_size > 0)
+ ma->ma_need |= MA_LOV;
+ }
+}
+
static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
struct mdt_object *o, __u64 flags, int created,
struct ldlm_reply *rep)
repbody->valid |= OBD_MD_FLEASIZE;
}
+ if (ma->ma_valid & MA_LMV) {
+ LASSERT(ma->ma_lmv_size != 0);
+ repbody->eadatasize = ma->ma_lmv_size;
+ LASSERT(isdir);
+ repbody->valid |= OBD_MD_FLDIREA | OBD_MD_MEA;
+ }
+
if (flags & FMODE_WRITE) {
rc = mdt_write_get(o);
if (rc == 0) {
ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
- ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
- ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
- RCL_SERVER);
ma->ma_need = MA_INODE | MA_HSM;
- if (ma->ma_lmm_size > 0)
- ma->ma_need |= MA_LOV;
-
ma->ma_valid = 0;
mdt_req_from_lcd(req, lcd);
if (mdt_object_exists(child)) {
mdt_set_capainfo(info, 1, rr->rr_fid2,
BYPASS_CAPA);
+ mdt_prep_ma_buf_from_rep(info, child, ma);
rc = mdt_attr_get_complex(info, child, ma);
if (rc == 0)
rc = mdt_finish_open(info, parent,
mdt_set_disposition(info, rep, (DISP_IT_EXECD |
DISP_LOOKUP_EXECD |
DISP_LOOKUP_POS));
-
+ mdt_prep_ma_buf_from_rep(info, o, ma);
rc = mdt_attr_get_complex(info, o, ma);
if (rc == 0)
rc = mdt_finish_open(info, NULL, o, flags, 0,
mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
+ mdt_prep_ma_buf_from_rep(info, o, ma);
if (flags & MDS_OPEN_RELEASE)
ma->ma_need |= MA_HSM;
rc = mdt_attr_get_complex(info, o, ma);
if (rc)
goto out;
+ mdt_prep_ma_buf_from_rep(info, o, ma);
mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
rc = mdt_attr_get_complex(info, o, ma);
if (rc != 0)
mdt_counter_incr(req, LPROC_MDT_OPEN);
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
- ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
- ma->ma_lmm_size = req_capsule_get_size(info->mti_pill, &RMF_MDT_MD,
- RCL_SERVER);
ma->ma_need = MA_INODE;
- if (ma->ma_lmm_size > 0)
- ma->ma_need |= MA_LOV;
-
ma->ma_valid = 0;
LASSERT(info->mti_pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
GOTO(out_child, result);
} else {
-
+ mdt_prep_ma_buf_from_rep(info, child, ma);
/* XXX: we should call this once, see few lines below */
if (result == 0)
result = mdt_attr_get_complex(info, child, ma);
} else if (mdt_object_exists(child)) {
/* We have to get attr & LOV EA & HSM for this
* object. */
+ mdt_prep_ma_buf_from_rep(info, child, ma);
ma->ma_need |= MA_HSM;
result = mdt_attr_get_complex(info, child, ma);
} else {
/* Object does not exist. Likely FS corruption. */
CERROR("%s: name '"DNAME"' present, but FID "
- DFID" is invalid\n",
- mdt_obd_name(info->mti_mdt),
+ DFID" is invalid\n", mdt_obd_name(info->mti_mdt),
PNAME(&rr->rr_name), PFID(child_fid));
GOTO(out_child, result = -EIO);
}
lh = &info->mti_lh[MDT_LH_PARENT];
mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
- parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
- MDS_INODELOCK_UPDATE);
- if (IS_ERR(parent))
- RETURN(PTR_ERR(parent));
+ parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+ if (IS_ERR(parent))
+ RETURN(PTR_ERR(parent));
- rc = mdt_version_get_check_save(info, parent, 0);
- if (rc)
- GOTO(out_put_parent, rc);
+ if (!mdt_object_exists(parent))
+ GOTO(put_parent, rc = -ENOENT);
- /*
- * Check child name version during replay.
- * During create replay a file may exist with same name.
- */
+ lh = &info->mti_lh[MDT_LH_PARENT];
+ if (mdt_object_remote(parent)) {
+ mdt_lock_reg_init(lh, LCK_EX);
+ rc = mdt_remote_object_lock(info, parent, &lh->mlh_rreg_lh,
+ lh->mlh_rreg_mode,
+ MDS_INODELOCK_UPDATE);
+ if (rc != ELDLM_OK)
+ GOTO(put_parent, rc);
+
+ } else {
+ mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
+ rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE,
+ MDT_LOCAL_LOCK);
+ if (rc)
+ GOTO(put_parent, rc);
+
+ rc = mdt_version_get_check_save(info, parent, 0);
+ if (rc)
+ GOTO(unlock_parent, rc);
+ }
+
+ /*
+ * Check child name version during replay.
+ * During create replay a file may exist with same name.
+ */
rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
&info->mti_tmp_fid1, 1);
if (rc == 0)
- GOTO(out_put_parent, rc = -EEXIST);
+ GOTO(unlock_parent, rc = -EEXIST);
/* -ENOENT is expected here */
if (rc != -ENOENT)
- GOTO(out_put_parent, rc);
+ GOTO(unlock_parent, rc);
/* save version of file name for replay, it must be ENOENT here */
mdt_enoent_version_save(info, 1);
rc = PTR_ERR(child);
}
mdt_create_pack_capa(info, rc, child, repbody);
-out_put_parent:
- mdt_object_unlock_put(info, parent, lh, rc);
- RETURN(rc);
+unlock_parent:
+ mdt_object_unlock(info, parent, lh, rc);
+put_parent:
+ mdt_object_put(info->mti_env, parent);
+ RETURN(rc);
}
int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
LPROCFS_MD_OP_INIT(num_private_stats, stats, init_ea_size);
LPROCFS_MD_OP_INIT(num_private_stats, stats, get_lustre_md);
LPROCFS_MD_OP_INIT(num_private_stats, stats, free_lustre_md);
+ LPROCFS_MD_OP_INIT(num_private_stats, stats, update_lsm_md);
+ LPROCFS_MD_OP_INIT(num_private_stats, stats, merge_attr);
LPROCFS_MD_OP_INIT(num_private_stats, stats, set_open_replay_data);
LPROCFS_MD_OP_INIT(num_private_stats, stats, clear_open_replay_data);
LPROCFS_MD_OP_INIT(num_private_stats, stats, set_lock_data);
lu_context_exit(&th->th_ctx);
lu_context_fini(&th->th_ctx);
- OBD_FREE_PTR(oh);
+ thandle_put(th);
}
static struct thandle *osd_trans_create(const struct lu_env *env,
th->th_result = 0;
th->th_tags = LCT_TX_HANDLE;
oh->ot_credits = 0;
- oti->oti_dev = osd_dt_dev(d);
+ atomic_set(&th->th_refc, 1);
+ th->th_alloc_size = sizeof(*oh);
+ oti->oti_dev = osd_dt_dev(d);
CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
osd_th_alloced(oh);
/*
* Concurrency: shouldn't matter.
*/
-static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
+static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
+ struct thandle *th)
{
int rc = 0;
struct osd_thandle *oh;
if (rc != 0)
CERROR("Failure to stop transaction: %d\n", rc);
} else {
- OBD_FREE_PTR(oh);
+ thandle_put(&oh->ot_super);
}
/* as we want IO to journal and data IO be concurrent, we don't block
{
LASSERT(ah);
- memset(ah, 0, sizeof(*ah));
ah->dah_parent = parent;
ah->dah_mode = child_mode;
}
if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
return -EACCES;
+ CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n",
+ PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len);
+
osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
if (fl & LU_XATTR_REPLACE)
fs_flags |= XATTR_REPLACE;
else
up_read(&obj->oo_ext_idx_sem);
- if (it->oie_rd_dirent == 0) {
- result = -EIO;
- } else {
- it->oie_dirent = it->oie_buf;
- it->oie_it_dirent = 1;
- }
+ if (it->oie_rd_dirent == 0) {
+ /*If it does not get any dirent, it means it has been reached
+ *to the end of the dir */
+ it->oie_file.f_pos = ldiskfs_get_htree_eof(&it->oie_file);
+ } else {
+ it->oie_dirent = it->oie_buf;
+ it->oie_it_dirent = 1;
+ }
- RETURN(result);
+ RETURN(result);
}
/**
th->th_dev = NULL;
lu_context_exit(&th->th_ctx);
lu_context_fini(&th->th_ctx);
- OBD_FREE_PTR(oh);
+ thandle_put(&oh->ot_super);
EXIT;
}
/*
* Concurrency: shouldn't matter.
*/
-static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
+static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
+ struct thandle *th)
{
struct osd_device *osd = osd_dt_dev(th->th_dev);
struct osd_thandle *oh;
/* there won't be any commit, release reserved quota space now,
* if any */
qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
- OBD_FREE_PTR(oh);
+ thandle_put(&oh->ot_super);
RETURN(0);
}
th->th_dev = dt;
th->th_result = 0;
th->th_tags = LCT_TX_HANDLE;
+ atomic_set(&th->th_refc, 1);
+ th->th_alloc_size = sizeof(*oh);
RETURN(th);
}
if (o != NULL) {
l = &o->opo_obj.do_lu;
- /* For data object, OSP obj would always be the top
- * object, i.e. hdr is always NULL, see lu_object_alloc.
- * But for metadata object, we always build the object
- * stack from MDT. i.e. mdt_object will be the top object
- * i.e. hdr != NULL */
+ /* If hdr is NULL, it means the object is not built
+ * from the top dev(MDT/OST), usually it happens when
+ * building striped object, like data object on MDT or
+ * striped object for directory */
if (hdr == NULL) {
- /* object for OST */
h = &o->opo_header;
lu_object_header_init(h);
dt_object_init(&o->opo_obj, h, d);
RETURN(rc);
}
+int osp_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
+ struct md_op_data *op_data)
+{
+ struct client_obd *cli = &exp->exp_obd->u.cli;
+ struct lu_client_seq *seq = cli->cl_seq;
+
+ ENTRY;
+ RETURN(seq_client_alloc_fid(NULL, seq, fid));
+}
+
/* context key constructor/destructor: mdt_key_init, mdt_key_fini */
LU_KEY_INIT_FINI(osp, struct osp_thread_info);
static void osp_key_exit(const struct lu_context *ctx,
LU_KEY_INIT_FINI(osp_txn, struct osp_txn_info);
struct lu_context_key osp_txn_key = {
- .lct_tags = LCT_OSP_THREAD,
+ .lct_tags = LCT_OSP_THREAD | LCT_TX_HANDLE,
.lct_init = osp_txn_key_init,
.lct_fini = osp_txn_key_fini
};
.o_statfs = osp_obd_statfs,
.o_fid_init = client_fid_init,
.o_fid_fini = client_fid_fini,
+ .o_fid_alloc = osp_fid_alloc,
};
struct llog_operations osp_mds_ost_orig_logops;
struct obdo osi_obdo;
};
-static inline bool is_remote_trans(struct thandle *th)
+/* The transaction only include the updates on the remote node, and
+ * no local updates at all */
+static inline bool is_only_remote_trans(struct thandle *th)
{
return th->th_dev->dd_ops == &osp_dt_ops;
}
struct dt_device *d);
int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
struct thandle *th);
-int osp_trans_stop(const struct lu_env *env, struct thandle *th);
/* osp_object.c */
int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
struct thandle *th);
+int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
+ struct thandle *th);
+
/* osp_precreate.c */
int osp_init_precreate(struct osp_device *d);
int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d);
bufs[0] = (char *)&osi->osi_obdo;
buf_count = 1;
fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
- if (hint->dah_parent) {
+ if (hint != NULL && hint->dah_parent) {
struct lu_fid *fid2;
struct lu_fid *tmp_fid = &osi->osi_fid;
}
osi->osi_obdo.o_valid = 0;
- LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE)));
obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
attr->la_valid);
lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
po->opo_non_exist = 1;
rc = 0;
}
+ init_rwsem(&po->opo_sem);
}
RETURN(rc);
}
* start thread handling precreation and statfs updates
*/
task = kthread_run(osp_precreate_thread, d,
- "osp-pre-%u", d->opd_index);
+ "osp-pre-%u-%u", d->opd_index, d->opd_group);
if (IS_ERR(task)) {
CERROR("can't start precreate thread %ld\n", PTR_ERR(task));
RETURN(PTR_ERR(task));
CFS_INIT_LIST_HEAD(&d->opd_syn_committed_there);
rc = PTR_ERR(kthread_run(osp_sync_thread, d,
- "osp-syn-%u", d->opd_index));
+ "osp-syn-%u-%u", d->opd_index, d->opd_group));
if (IS_ERR_VALUE(rc)) {
CERROR("%s: can't start sync thread: rc = %d\n",
d->opd_obd->obd_name, rc);
return rc;
}
-struct thandle *osp_trans_create(const struct lu_env *env,
- struct dt_device *d)
+/**
+ * If the transaction creation goes to OSP, it means the update
+ * in this transaction only includes remote UPDATE. It is only
+ * used by LFSCK right now.
+ **/
+struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
{
- struct thandle *th;
+ struct thandle *th = NULL;
+ struct thandle_update *tu = NULL;
+ int rc;
OBD_ALLOC_PTR(th);
if (unlikely(th == NULL))
- return ERR_PTR(-ENOMEM);
+ GOTO(out, rc = -ENOMEM);
th->th_dev = d;
th->th_tags = LCT_TX_HANDLE;
- INIT_LIST_HEAD(&th->th_remote_update_list);
+ atomic_set(&th->th_refc, 1);
+ th->th_alloc_size = sizeof(*th);
+
+ OBD_ALLOC_PTR(tu);
+ if (tu == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ INIT_LIST_HEAD(&tu->tu_remote_update_list);
+ tu->tu_only_remote_trans = 1;
+out:
+ if (rc != 0) {
+ if (tu != NULL)
+ OBD_FREE_PTR(tu);
+ if (th != NULL)
+ OBD_FREE_PTR(th);
+ th = ERR_PTR(rc);
+ }
return th;
}
static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
- struct thandle *th)
+ struct update_request *update, struct thandle *th)
{
- struct update_request *update = th->th_current_request;
- int rc = 0;
+ struct thandle_update *tu = th->th_update;
+ int rc = 0;
- if (unlikely(update == NULL || update->ur_buf == NULL ||
- update->ur_buf->ub_count == 0))
- return 0;
+ LASSERT(tu != NULL);
- if (is_remote_trans(th)) {
+ /* If the transaction only includes remote update, it should
+ * still be asynchronous */
+ if (tu->tu_only_remote_trans) {
struct osp_async_update_args *args;
struct ptlrpc_request *req;
list_del_init(&update->ur_list);
- th->th_current_request = NULL;
rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
update->ur_buf,
UPDATE_BUFFER_SIZE, &req);
out_destroy_update_req(update);
}
} else {
+ /* Before we support async update, the cross MDT transaction
+ * has to been synchronized */
th->th_sync = 1;
rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
update, NULL);
int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
struct thandle *th)
{
+ struct thandle_update *tu = th->th_update;
+ struct update_request *update;
int rc = 0;
- if (!is_remote_trans(th))
- rc = osp_trans_trigger(env, dt2osp_dev(dt), th);
+ if (tu == NULL)
+ return rc;
+
+ /* Check whether there are updates related with this OSP */
+ update = out_find_update(tu, dt);
+ if (update == NULL)
+ return rc;
+
+ /* Note: some updates needs to send before local transaction,
+ * some needs to send after local transaction.
+ *
+ * If the transaction only includes remote updates, it will
+ * send updates to remote MDT in osp_trans_stop.
+ *
+ * If it is remote create, it will send the remote req after
+ * local transaction. i.e. create the object locally first,
+ * then insert the name entry.
+ *
+ * If it is remote unlink, it will send the remote req before
+ * the local transaction, i.e. delete the name entry remote
+ * first, then destroy the local object. */
+ if (!tu->tu_only_remote_trans && !tu->tu_sent_after_local_trans)
+ rc = osp_trans_trigger(env, dt2osp_dev(dt), update, th);
return rc;
}
-int osp_trans_stop(const struct lu_env *env, struct thandle *th)
+int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
+ struct thandle *th)
{
- struct update_request *update = th->th_current_request;
- int rc = 0;
+ struct thandle_update *tu = th->th_update;
+ struct update_request *update;
+ int rc = 0;
+
+ LASSERT(tu != NULL);
+ /* Check whether there are updates related with this OSP */
+ update = out_find_update(tu, dt);
+ if (update == NULL)
+ return rc;
- if (is_remote_trans(th)) {
- LASSERT(update == NULL);
+ if (update->ur_buf->ub_count == 0)
+ GOTO(free, rc);
- update = out_find_update(th, th->th_dev);
- th->th_current_request = update;
+ if (tu->tu_only_remote_trans) {
if (th->th_result == 0)
- rc = osp_trans_trigger(env, dt2osp_dev(th->th_dev), th);
+ rc = osp_trans_trigger(env, dt2osp_dev(dt),
+ update, th);
else
rc = th->th_result;
-
- if (th->th_current_request != NULL)
- out_destroy_update_req(update);
-
- OBD_FREE_PTR(th);
} else {
- LASSERT(update != NULL);
-
+ if (tu->tu_sent_after_local_trans)
+ rc = osp_trans_trigger(env, dt2osp_dev(dt),
+ update, th);
rc = update->ur_rc;
- out_destroy_update_req(update);
- th->th_current_request = NULL;
}
-
+free:
+ out_destroy_update_req(update);
+ thandle_put(th);
return rc;
}
__swab32s(&lum->lum_hash_type);
__swab32s(&lum->lum_type);
CLASSERT(offsetof(typeof(*lum), lum_padding1) != 0);
- CLASSERT(offsetof(typeof(*lum), lum_padding2) != 0);
- CLASSERT(offsetof(typeof(*lum), lum_padding3) != 0);
-
for (i = 0; i < lum->lum_stripe_count; i++) {
__swab32s(&lum->lum_objects[i].lum_mds);
lustre_swab_lu_fid(&lum->lum_objects[i].lum_fid);
lu_object_get(&obj->do_lu);
arg->object = obj;
arg->u.create.attr = *attr;
- if (parent_fid)
+ if (parent_fid != NULL)
arg->u.create.fid = *parent_fid;
memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
arg->u.create.dof = *dof;
la_from_obdo(attr, lobdo, lobdo->o_valid);
dof->dof_type = dt_mode_to_dft(attr->la_mode);
- if (S_ISDIR(attr->la_mode)) {
+ if (update->u_lens[1] > 0) {
int size;
fid = update_param_buf(update, 1, &size);
#include <lustre_update.h>
#include <obd.h>
-struct update_request *out_find_update(struct thandle *th,
+struct update_request *out_find_update(struct thandle_update *tu,
struct dt_device *dt_dev)
{
struct update_request *update;
- list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
+ LASSERT(tu != NULL);
+ list_for_each_entry(update, &tu->tu_remote_update_list, ur_list) {
if (update->ur_dt == dt_dev)
return update;
}
EXPORT_SYMBOL(out_create_update_req);
/**
- * Find one loc in th_dev/dev_obj_update for the update,
+ * Find or create one loc in th_dev/dev_obj_update for the update,
* Because only one thread can access this thandle, no need
* lock now.
*/
struct dt_object *dt)
{
struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+ struct thandle_update *tu = th->th_update;
struct update_request *update;
ENTRY;
- update = out_find_update(th, dt_dev);
+ if (tu == NULL) {
+ OBD_ALLOC_PTR(tu);
+ if (tu == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ INIT_LIST_HEAD(&tu->tu_remote_update_list);
+ tu->tu_sent_after_local_trans = 0;
+ th->th_update = tu;
+ }
+
+ update = out_find_update(tu, dt_dev);
if (update != NULL)
RETURN(update);
if (IS_ERR(update))
RETURN(update);
- list_add_tail(&update->ur_list, &th->th_remote_update_list);
+ list_add_tail(&update->ur_list, &tu->tu_remote_update_list);
+
+ thandle_get(th);
RETURN(update);
}
local MDTIDX=1
mkdir -p $DIR/$tdir
- drop_update_reply $((MDTIDX + 1)) "$LFS mkdir -i $MDTIDX $remote_dir" ||
+ drop_update_reply $MDTIDX "$LFS mkdir -i $MDTIDX $remote_dir" ||
error "lfs mkdir failed"
diridx=$($GETSTRIPE -M $remote_dir)
do_node $CLIENT1 mkdir -p $MOUNT1/${tdir}
# OBD_FAIL_MDS_REINT_NET_REP 0x119
- do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+ do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
CLIENT_PID=$!
- fail mds${MDTIDX}
+ fail mds$((MDTIDX + 1))
wait $CLIENT_PID || error "lfs mkdir failed"
- replay_barrier mds${MDTIDX}
+ replay_barrier mds$MDTIDX
create_remote_dir_files_22 || error "Remote creation failed $?"
- fail mds${MDTIDX}
+ fail mds$MDTIDX
checkstat_22 || error "check stat failed $?"
rm -rf $MOUNT1/$tdir || error "rmdir remote_dir failed"
return 0
}
-run_test 22a "c1 lfs mkdir -i 1 dir1, M0 drop reply & fail, c2 mkdir dir1/dir"
+run_test 22a "c1 lfs mkdir -i 1 dir1, M1 drop reply & fail, c2 mkdir dir1/dir"
test_22b () {
[ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
# OBD_FAIL_MDS_REINT_NET_REP 0x119
do_node $CLIENT1 mkdir -p $MOUNT1/${tdir}
- do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+ do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
CLIENT_PID=$!
rm -rf $MOUNT1/$tdir || error "rmdir remote_dir failed"
return 0
}
-run_test 22b "c1 lfs mkdir -i 1 d1, M0 drop reply & fail M0/M1, c2 mkdir d1/dir"
+run_test 22b "c1 lfs mkdir -i 1 d1, M1 drop reply & fail M0/M1, c2 mkdir d1/dir"
test_22c () {
[ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
do_node $CLIENT1 mkdir -p $MOUNT1/${tdir}
# OBD_FAIL_UPDATE_OBJ_NET_REP 0x1701
- do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
+ do_facet mds$MDTIDX lctl set_param fail_loc=0x1701
do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
CLIENT_PID=$!
- do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
+ do_facet mds$MDTIDX lctl set_param fail_loc=0
- fail mds$((MDTIDX+1))
+ fail mds$MDTIDX
wait $CLIENT_PID || error "lfs mkdir failed"
replay_barrier mds$MDTIDX
do_node $CLIENT1 mkdir -p $MOUNT1/${tdir}
# OBD_FAIL_UPDATE_OBJ_NET_REP 0x1701
- do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
+ do_facet mds$MDTIDX lctl set_param fail_loc=0x1701
do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
CLIENT_PID=$!
- do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
+ do_facet mds$MDTIDX lctl set_param fail_loc=0
fail mds${MDTIDX},mds$((MDTIDX + 1))
wait $CLIENT_PID || error "lfs mkdir failed"
mkdir -p $DIR/$tdir
#define OBD_FAIL_UPDATE_OBJ_NET_REP 0x1701
- do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
+ do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
$LFS mkdir -i $MDTIDX $remote_dir &
local CLIENT_PID=$!
- fail mds$((MDTIDX + 1))
+ fail mds${MDTIDX}
wait $CLIENT_PID || error "remote creation failed"
return 0
}
-run_test 80a "DNE: create remote dir, drop update rep from MDT1, fail MDT1"
+run_test 80a "DNE: create remote dir, drop update rep from MDT0, fail MDT0"
test_80b() {
[ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
mkdir -p $DIR/$tdir
#define OBD_FAIL_UPDATE_OBJ_NET_REP 0x1701
- do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
+ do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
$LFS mkdir -i $MDTIDX $remote_dir &
local CLIENT_PID=$!
- fail mds${MDTIDX}
+ fail mds$((MDTIDX + 1))
wait $CLIENT_PID || error "remote creation failed"
return 0
}
-run_test 80b "DNE: create remote dir, drop update rep from MDT1, fail MDT0"
+run_test 80b "DNE: create remote dir, drop update rep from MDT0, fail MDT1"
test_80c() {
[ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
mkdir -p $DIR/$tdir
#define OBD_FAIL_UPDATE_OBJ_NET_REP 0x1701
- do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
+ do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
$LFS mkdir -i $MDTIDX $remote_dir &
local CLIENT_PID=$!
mkdir -p $DIR/$tdir
#define OBD_FAIL_UPDATE_OBJ_NET_REP 0x1701
- do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
+ do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
$LFS mkdir -i $MDTIDX $remote_dir &
local CLIENT_PID=$!
+ # sleep 3 seconds to make sure MDTs are failed after
+ # lfs mkdir -i has finished on all of MDTs.
+ sleep 3
fail mds${MDTIDX},mds$((MDTIDX + 1))
wait $CLIENT_PID || error "remote creation failed"
mkdir -p $DIR/$tdir
# OBD_FAIL_MDS_REINT_NET_REP 0x119
- do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+ do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
$LFS mkdir -i $MDTIDX $remote_dir &
local CLIENT_PID=$!
+ # sleep 3 seconds to make sure MDTs are failed after
+ # lfs mkdir -i has finished on all of MDTs.
+ sleep 3
+
fail mds${MDTIDX}
wait $CLIENT_PID || error "remote creation failed"
return 0
}
-run_test 80e "DNE: create remote dir, drop MDT0 rep, fail MDT0"
+run_test 80e "DNE: create remote dir, drop MDT1 rep, fail MDT0"
test_80f() {
[ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
mkdir -p $DIR/$tdir
# OBD_FAIL_MDS_REINT_NET_REP 0x119
- do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+ do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
$LFS mkdir -i $MDTIDX $remote_dir &
local CLIENT_PID=$!
return 0
}
-run_test 80f "DNE: create remote dir, drop MDT0 rep, fail MDT1"
+run_test 80f "DNE: create remote dir, drop MDT1 rep, fail MDT1"
test_80g() {
[ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
mkdir -p $DIR/$tdir
# OBD_FAIL_MDS_REINT_NET_REP 0x119
- do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+ do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
$LFS mkdir -i $MDTIDX $remote_dir &
local CLIENT_PID=$!
+ # sleep 3 seconds to make sure MDTs are failed after
+ # lfs mkdir -i has finished on all of MDTs.
+ sleep 3
+
fail mds${MDTIDX}
fail mds$((MDTIDX + 1))
return 0
}
-run_test 80g "DNE: create remote dir, drop MDT0 rep, fail MDT0, then MDT1"
+run_test 80g "DNE: create remote dir, drop MDT1 rep, fail MDT0, then MDT1"
test_80h() {
[ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
mkdir -p $DIR/$tdir
# OBD_FAIL_MDS_REINT_NET_REP 0x119
- do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+ do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
$LFS mkdir -i $MDTIDX $remote_dir &
local CLIENT_PID=$!
+ # sleep 3 seconds to make sure MDTs are failed after
+ # lfs mkdir -i has finished on all of MDTs.
+ sleep 3
+
fail mds${MDTIDX},mds$((MDTIDX + 1))
wait $CLIENT_PID || return 1
return 0
}
-run_test 80h "DNE: create remote dir, drop MDT0 rep, fail 2 MDTs"
+run_test 80h "DNE: create remote dir, drop MDT1 rep, fail 2 MDTs"
test_81a() {
[ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
local cmd
local rc=0
- for mdt_index in $(seq 1 $MDSCOUNT); do
+ # create/unlink in 17n only change 2 MDTs(MDT1/MDT2),
+ # so it only check MDT1/MDT2 instead of all of MDTs.
+ for mdt_index in $(seq 1 2); do
devname=$(mdsdevname $mdt_index)
cmd="$E2FSCK -fnvd $devname"
}
run_test 230a "Create remote directory and files under the remote directory"
-test_230b() {
- [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
- [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
- local MDTIDX=1
- local remote_dir=$DIR/$tdir/remote_dir
- local rc=0
-
- mkdir -p $DIR/$tdir
- $LFS mkdir -i $MDTIDX $remote_dir ||
- error "create remote directory failed"
-
- $LFS mkdir -i 0 $remote_dir/new_dir &&
- error "nested remote directory create succeed!"
-
- do_facet mds$((MDTIDX + 1)) lctl set_param mdt.*.enable_remote_dir=1
- $LFS mkdir -i 0 $remote_dir/new_dir || rc=$?
- do_facet mds$((MDTIDX + 1)) lctl set_param mdt.*.enable_remote_dir=0
-
- [ $rc -ne 0 ] &&
- error "create remote directory failed after set enable_remote_dir"
-
- rm -rf $remote_dir || error "first unlink remote directory failed"
-
- $RUNAS -G$RUNAS_GID $LFS mkdir -i $MDTIDX $DIR/$tfile &&
- error "chown worked"
-
- do_facet mds$MDTIDX lctl set_param \
- mdt.*.enable_remote_dir_gid=$RUNAS_GID
- $LFS mkdir -i $MDTIDX $remote_dir || rc=$?
- do_facet mds$MDTIDX lctl set_param mdt.*.enable_remote_dir_gid=0
-
- [ $rc -ne 0 ] &&
- error "create remote dir failed after set enable_remote_dir_gid"
-
- rm -r $DIR/$tdir || error "second unlink remote directory failed"
-}
-run_test 230b "nested remote directory should be failed"
-
test_231a()
{
# For simplicity this test assumes that max_pages_per_rpc
rmdir $DIR2/$tfile-3
check_pdo_conflict $PID1 || error "unlink is blocked"
- if [ $MDSCOUNT -ge 2 ]; then
- $LFS mkdir -i 1 $DIR2/$tfile-6
- check_pdo_conflict $PID1 || error "remote mkdir is blocked"
- fi
-
# all operations above shouldn't wait the first one
check_pdo_conflict $PID1 || error "parallel operation is blocked"
wait $PID1
check_pdo_conflict $PID1 || error "unlink is blocked"
# all operations above shouldn't wait the first one
- if [ $MDSCOUNT -ge 2 ]; then
- $LFS mkdir -i 1 $DIR2/$tfile-6
- check_pdo_conflict $PID1 || error "remote mkdir is blocked"
- fi
-
check_pdo_conflict $PID1 || error "parallel operation is blocked"
wait $PID1
rm -r $DIR1/*
rmdir $DIR2/$tfile-3
check_pdo_conflict $PID1 || error "unlink is blocked"
- if [ $MDSCOUNT -ge 2 ]; then
- $LFS mkdir -i 1 $DIR2/$tfile-6
- check_pdo_conflict $PID1 || error "remote mkdir is blocked"
- fi
-
# all operations above shouldn't wait the first one
check_pdo_conflict $PID1 || error "parallel operation is blocked"
wait $PID1
rmdir $DIR2/$tfile-3
check_pdo_conflict $PID1 || error "unlink is blocked"
- if [ $MDSCOUNT -ge 2 ]; then
- $LFS mkdir -i 1 $DIR2/$tfile-6
- check_pdo_conflict $PID1 || error "remote mkdir is blocked"
- fi
-
# all operations above shouldn't wait the first one
check_pdo_conflict $PID1 || error "parallel operation is blocked"
wait $PID1
rmdir $DIR2/$tfile-3
check_pdo_conflict $PID1 || error "unlink is blocked"
- if [ $MDSCOUNT -ge 2 ]; then
- $LFS mkdir -i 1 $DIR2/$tfile-6
- check_pdo_conflict $PID1 || error "remote mkdir is blocked"
- fi
-
# all operations above shouldn't wait the first one
check_pdo_conflict $PID1 || error "parallel operation is blocked"
wait $PID1
" <directory|filename> ..."},
{"setdirstripe", lfs_setdirstripe, 0,
"To create a remote directory on a specified MDT.\n"
- "usage: setdirstripe <--index|-i mdt_index> <dir>\n"
- "\tmdt_index: MDT index of first stripe\n"},
+ "usage: setdirstripe <--count|-c stripe_count>\n"
+ "[--index|-i mdt_index] [--hash-type|-t hash_type] <dir>\n"
+ "\tstripe_count: stripe count of the striped directory\n"
+ "\tmdt_index: MDT index of first stripe\n"
+ "\thash_type: hash type of the striped directory\n"},
{"getdirstripe", lfs_getdirstripe, 0,
"To list the striping info for a given directory\n"
"or recursively for all directories in a directory tree.\n"
/* functions */
static int lfs_setdirstripe(int argc, char **argv)
{
- char *dname;
- int result;
- int st_offset, st_count;
- char *end;
- int c;
- char *stripe_off_arg = NULL;
- int flags = 0;
+ char *dname;
+ int result;
+ unsigned int stripe_offset = -1;
+ unsigned int stripe_count = 1;
+ enum lmv_hash_type hash_type;
+ char *end;
+ int c;
+ char *stripe_offset_opt = NULL;
+ char *stripe_count_opt = NULL;
+ char *stripe_hash_opt = NULL;
+ int flags = 0;
struct option long_opts[] = {
- {"index", required_argument, 0, 'i'},
+ {"count", required_argument, 0, 'c'},
+ {"index", required_argument, 0, 'i'},
+ {"hash-type", required_argument, 0, 't'},
{0, 0, 0, 0}
};
- st_offset = -1;
- st_count = 1;
optind = 0;
- while ((c = getopt_long(argc, argv, "i:o",
- long_opts, NULL)) >= 0) {
+
+ while ((c = getopt_long(argc, argv, "c:i:t:", long_opts, NULL)) >= 0) {
switch (c) {
case 0:
/* Long options. */
break;
+ case 'c':
+ stripe_count_opt = optarg;
+ break;
case 'i':
- stripe_off_arg = optarg;
+ stripe_offset_opt = optarg;
+ break;
+ case 't':
+ stripe_hash_opt = optarg;
break;
default:
fprintf(stderr, "error: %s: option '%s' "
return CMD_HELP;
}
- dname = argv[optind];
- if (stripe_off_arg == NULL) {
- fprintf(stderr, "error: %s: missing stripe_off.\n",
+ if (stripe_offset_opt == NULL && stripe_count_opt == NULL) {
+ fprintf(stderr, "error: %s: missing stripe offset and count.\n",
argv[0]);
return CMD_HELP;
}
- /* get the stripe offset */
- st_offset = strtoul(stripe_off_arg, &end, 0);
- if (*end != '\0') {
- fprintf(stderr, "error: %s: bad stripe offset '%s'\n",
- argv[0], stripe_off_arg);
+
+ if (stripe_offset_opt != NULL) {
+ /* get the stripe offset */
+ stripe_offset = strtoul(stripe_offset_opt, &end, 0);
+ if (*end != '\0') {
+ fprintf(stderr, "error: %s: bad stripe offset '%s'\n",
+ argv[0], stripe_offset_opt);
+ return CMD_HELP;
+ }
+ }
+
+ if (stripe_hash_opt == NULL ||
+ strcmp(stripe_hash_opt, LMV_HASH_NAME_FNV_1A_64) == 0) {
+ hash_type = LMV_HASH_TYPE_FNV_1A_64;
+ } else if (strcmp(stripe_hash_opt, LMV_HASH_NAME_ALL_CHARS) == 0) {
+ hash_type = LMV_HASH_TYPE_ALL_CHARS;
+ } else {
+ fprintf(stderr, "error: %s: bad stripe hash type '%s'\n",
+ argv[0], stripe_hash_opt);
return CMD_HELP;
}
+
+ /* get the stripe count */
+ if (stripe_count_opt != NULL) {
+ stripe_count = strtoul(stripe_count_opt, &end, 0);
+ if (*end != '\0') {
+ fprintf(stderr, "error: %s: bad stripe count '%s'\n",
+ argv[0], stripe_count_opt);
+ return CMD_HELP;
+ }
+ }
+
+ dname = argv[optind];
do {
- result = llapi_dir_create_pool(dname, flags, st_offset,
- st_count, 0, NULL);
+ result = llapi_dir_create_pool(dname, flags, stripe_offset,
+ stripe_count, hash_type, NULL);
if (result) {
fprintf(stderr, "error: %s: create stripe dir '%s' "
"failed\n", argv[0], dname);
return 0;
}
-/**
- * In DNE phase I, only stripe_offset will be used in this function.
- * stripe_count, stripe_pattern and pool_name will be supported later.
- */
int llapi_dir_create_pool(const char *name, int flags, int stripe_offset,
- int stripe_count, int stripe_pattern, char *pool_name)
+ int stripe_count, int stripe_pattern,
+ const char *pool_name)
{
struct lmv_user_md lmu = { 0 };
struct obd_ioctl_data data = { 0 };
}
filename = basename(namepath);
- lmu.lum_type = LMV_STRIPE_TYPE;
dir = dirname(dirpath);
data.ioc_inlbuf1 = (char *)filename;