removed cwd "./" (refer to Bugzilla 14399).
* File join has been disabled in this release, refer to Bugzilla 16929.
+
+Severity : enhancement
+Bugzilla : 11826
+Description: Interoperability at server side (Disk interoperability)
+
Severity : enhancement
Bugzilla : 17458
Description: Update to SLES10 SP2 kernel-2.6.16.60-0.31.
RETURN(rc);
}
+static int cmm_prepare(const struct lu_env *env,
+ struct lu_device *pdev,
+ struct lu_device *dev)
+{
+ struct cmm_device *cmm = lu2cmm_dev(dev);
+ struct lu_device *next = md2lu_dev(cmm->cmm_child);
+ int rc;
+
+ ENTRY;
+ rc = next->ld_ops->ldo_prepare(env, dev, next);
+ RETURN(rc);
+}
+
static const struct lu_device_operations cmm_lu_ops = {
.ldo_object_alloc = cmm_object_alloc,
.ldo_process_config = cmm_process_config,
- .ldo_recovery_complete = cmm_recovery_complete
+ .ldo_recovery_complete = cmm_recovery_complete,
+ .ldo_prepare = cmm_prepare,
};
/* --- lu_device_type operations --- */
cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
+static struct lu_local_obj_desc llod_seq_srv = {
+ .llod_name = LUSTRE_SEQ_SRV_NAME,
+ .llod_oid = FID_SEQ_SRV_OID,
+ .llod_is_index = 0,
+};
+
+static struct lu_local_obj_desc llod_seq_ctl = {
+ .llod_name = LUSTRE_SEQ_CTL_NAME,
+ .llod_oid = FID_SEQ_CTL_OID,
+ .llod_is_index = 0,
+};
+
static int __init fid_mod_init(void)
{
seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME,
if (IS_ERR(seq_type_proc_dir))
return PTR_ERR(seq_type_proc_dir);
+ llo_local_obj_register(&llod_seq_srv);
+ llo_local_obj_register(&llod_seq_ctl);
+
LU_CONTEXT_KEY_INIT(&seq_thread_key);
lu_context_key_register(&seq_thread_key);
return 0;
* </pre>
*
* The first 0x400 sequences of normal FID are reserved for special purpose.
+ * FID_SEQ_START + 1 is for local file id generation.
*/
const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
FID_SEQ_START + 0x400ULL,
name = seq->lss_type == LUSTRE_SEQ_SERVER ?
LUSTRE_SEQ_SRV_NAME : LUSTRE_SEQ_CTL_NAME;
- dt_obj = dt_store_open(env, dt, name, &fid);
+ dt_obj = dt_store_open(env, dt, "", name, &fid);
if (!IS_ERR(dt_obj)) {
seq->lss_obj = dt_obj;
rc = 0;
#include <lprocfs_status.h>
#include <md_object.h>
+#include <lustre_fid.h>
#include <lustre_req_layout.h>
#include "fld_internal.h"
cfs_proc_dir_entry_t *fld_type_proc_dir = NULL;
+static struct lu_local_obj_desc llod_fld_index = {
+ .llod_name = fld_index_name,
+ .llod_oid = FLD_INDEX_OID,
+ .llod_is_index = 1,
+ .llod_feat = &fld_index_features,
+};
+
static int __init fld_mod_init(void)
{
fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME,
if (IS_ERR(fld_type_proc_dir))
return PTR_ERR(fld_type_proc_dir);
+ llo_local_obj_register(&llod_fld_index);
+
LU_CONTEXT_KEY_INIT(&fld_thread_key);
lu_context_key_register(&fld_thread_key);
return 0;
#include "fld_internal.h"
const char fld_index_name[] = "fld";
+EXPORT_SYMBOL(fld_index_name);
-static const struct dt_index_features fld_index_features = {
+const struct dt_index_features fld_index_features = {
.dif_flags = DT_IND_UPDATE,
.dif_keysize_min = sizeof(seqno_t),
.dif_keysize_max = sizeof(seqno_t),
.dif_recsize_min = sizeof(mdsno_t),
- .dif_recsize_max = sizeof(mdsno_t)
+ .dif_recsize_max = sizeof(mdsno_t),
+ .dif_ptrsize = 4
};
+EXPORT_SYMBOL(fld_index_features);
+
/*
* number of blocks to reserve for particular operations. Should be function of
* ... something. Stub for now.
rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, rec,
fld_key(env, seq), BYPASS_CAPA);
- if (rc == 0)
+ if (rc > 0) {
*mds = be64_to_cpu(*(__u64 *)rec);
+ rc = 0;
+ } else
+ rc = -ENOENT;
RETURN(rc);
}
int rc;
ENTRY;
- dt_obj = dt_store_open(env, dt, fld_index_name, &fid);
+ dt_obj = dt_store_open(env, dt, "", fld_index_name, &fid);
if (!IS_ERR(dt_obj)) {
fld->lsf_obj = dt_obj;
rc = dt_obj->do_ops->do_index_try(env, dt_obj,
size_t dif_recsize_min;
/** maximal required record size, 0 if no limit */
size_t dif_recsize_max;
+ /** pointer size for record */
+ size_t dif_ptrsize;
};
enum dt_index_flags {
* It can contain any allocation hint in the future.
*/
struct dt_allocation_hint {
- struct dt_object *dah_parent;
- __u32 dah_mode;
+ struct dt_object *dah_parent;
+ __u32 dah_mode;
};
/**
+ * object type specifier.
+ */
+
+enum dt_format_type {
+ DFT_REGULAR,
+ DFT_DIR,
+ /** for mknod */
+ DFT_NODE,
+ /** for special index */
+ DFT_INDEX,
+ /** for symbolic link */
+ DFT_SYM,
+};
+
+/**
+ * object format specifier.
+ */
+struct dt_object_format {
+ /** type for dt object */
+ enum dt_format_type dof_type;
+ union {
+ struct dof_regular {
+ } dof_reg;
+ struct dof_dir {
+ } dof_dir;
+ struct dof_node {
+ } dof_node;
+ /**
+ * special index need feature as parameter to create
+ * special idx
+ */
+ struct dof_index {
+ const struct dt_index_features *di_feat;
+ } dof_idx;
+ } u;
+};
+
+enum dt_format_type dt_mode_to_dft(__u32 mode);
+
+/**
* Per-dt-object operations.
*/
struct dt_object_operations {
int (*do_create)(const struct lu_env *env, struct dt_object *dt,
struct lu_attr *attr,
struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
struct thandle *th);
/**
* precondition: dt_object_exists(dt);
*/
struct dt_it *(*init)(const struct lu_env *env,
- struct dt_object *dt, int writable,
+ struct dt_object *dt,
struct lustre_capa *capa);
void (*fini)(const struct lu_env *env,
struct dt_it *di);
const struct dt_key *key);
void (*put)(const struct lu_env *env,
struct dt_it *di);
- int (*del)(const struct lu_env *env,
- struct dt_it *di, struct thandle *th);
int (*next)(const struct lu_env *env,
struct dt_it *di);
struct dt_key *(*key)(const struct lu_env *env,
int dt_txn_hook_commit(const struct lu_env *env, struct thandle *txn);
int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj);
+
+/**
+ * Callback function used for parsing path.
+ * \see llo_store_resolve
+ */
+typedef int (*dt_entry_func_t)(const struct lu_env *env,
+ const char *name,
+ void *pvt);
+
+#define DT_MAX_PATH 1024
+
+int dt_path_parser(const struct lu_env *env,
+ char *local, dt_entry_func_t entry_func,
+ void *data);
+
struct dt_object *dt_store_open(const struct lu_env *env,
- struct dt_device *dt, const char *name,
+ struct dt_device *dt,
+ const char *dirname,
+ const char *filename,
struct lu_fid *fid);
-/** @} dt */
+struct dt_object *dt_locate(const struct lu_env *env,
+ struct dt_device *dev,
+ const struct lu_fid *fid);
+/** @} dt */
#endif /* __LUSTRE_DT_OBJECT_H */
#ifdef __KERNEL__
struct dentry *simple_mkdir(struct dentry *dir, struct vfsmount *mnt,
- char *name, int mode, int fix);
+ const char *name, int mode, int fix);
struct dentry *simple_mknod(struct dentry *dir, char *name, int mode, int fix);
int lustre_rename(struct dentry *dir, struct vfsmount *mnt, char *oldname,
char *newname);
int (*ldo_recovery_complete)(const struct lu_env *,
struct lu_device *);
+ /**
+ * initialize local objects for device. this method called after layer has
+ * been initialized (after LCFG_SETUP stage) and before it starts serving
+ * user requests.
+ */
+
+ int (*ldo_prepare)(const struct lu_env *,
+ struct lu_device *parent,
+ struct lu_device *dev);
+
};
/**
* Common name structure to be passed around for various name related methods.
*/
struct lu_name {
- char *ln_name;
- int ln_namelen;
+ const char *ln_name;
+ int ln_namelen;
};
/**
void lu_kmem_fini(struct lu_kmem_descr *caches);
/** @} lu */
-
#endif /* __LUSTRE_LU_OBJECT_H */
};
/**
+ * Following struct for MDT attributes, that will be kept inode's EA.
+ * Introduced in 2.0 release (please see b15993, for details)
+ */
+struct lustre_mdt_attrs {
+ /** FID of this inode */
+ struct lu_fid lma_self_fid;
+ /** SOM state, mdt/ost type, others */
+ __u64 lma_flags;
+ /** total sectors in objects */
+ __u64 lma_som_sectors;
+};
+
+
+/**
* fid constants
*/
enum {
- /* initial fid id value */
+ /** initial fid id value */
LUSTRE_FID_INIT_OID = 1UL
};
#define MDT_LOGS_DIR "LOGS" /* COMPAT_146 */
#define MOUNT_CONFIGS_DIR "CONFIGS"
-/* Persistent mount data are stored on the disk in this file. */
-#define MOUNT_DATA_FILE MOUNT_CONFIGS_DIR"/mountdata"
-#define LAST_RCVD "last_received"
+#define CONFIGS_FILE "mountdata"
+/** Persistent mount data are stored on the disk in this file. */
+#define MOUNT_DATA_FILE MOUNT_CONFIGS_DIR"/"CONFIGS_FILE
+#define LAST_RCVD "last_rcvd"
#define LOV_OBJID "lov_objid"
#define HEALTH_CHECK "health_check"
#define CAPA_KEYS "capa_keys"
#define LDD_F_SV_TYPE_MDT 0x0001
#define LDD_F_SV_TYPE_OST 0x0002
#define LDD_F_SV_TYPE_MGS 0x0004
-#define LDD_F_NEED_INDEX 0x0010 /* need an index assignment */
-#define LDD_F_VIRGIN 0x0020 /* never registered */
-#define LDD_F_UPDATE 0x0040 /* update the config logs for this server*/
-#define LDD_F_REWRITE_LDD 0x0080 /* rewrite the LDD */
-#define LDD_F_WRITECONF 0x0100 /* regenerate all logs for this fs */
-#define LDD_F_UPGRADE14 0x0200 /* COMPAT_14 */
-#define LDD_F_PARAM 0x0400 /* process as lctl conf_param */
+/** need an index assignment */
+#define LDD_F_NEED_INDEX 0x0010
+/** never registered */
+#define LDD_F_VIRGIN 0x0020
+/** update the config logs for this server*/
+#define LDD_F_UPDATE 0x0040
+/** rewrite the LDD */
+#define LDD_F_REWRITE_LDD 0x0080
+/** regenerate all logs for this fs */
+#define LDD_F_WRITECONF 0x0100
+/** COMPAT_14 */
+#define LDD_F_UPGRADE14 0x0200
+/** process as lctl conf_param */
+#define LDD_F_PARAM 0x0400
+/** backend fs make use of IAM directory format. */
+#define LDD_F_IAM_DIR 0x0800
enum ldd_mount_type {
LDD_MT_EXT3 = 0,
#define LR_MAX_CLIENTS (CFS_PAGE_SIZE * 8)
#endif
-/* COMPAT_146 */
-#define OBD_COMPAT_OST 0x00000002 /* this is an OST (temporary) */
-#define OBD_COMPAT_MDT 0x00000004 /* this is an MDT (temporary) */
-/* end COMPAT_146 */
-
-#define OBD_ROCOMPAT_LOVOBJID 0x00000001 /* MDS handles LOV_OBJID file */
-
-#define OBD_INCOMPAT_GROUPS 0x00000001 /* OST handles group subdirs */
-#define OBD_INCOMPAT_OST 0x00000002 /* this is an OST */
-#define OBD_INCOMPAT_MDT 0x00000004 /* this is an MDT */
-#define OBD_INCOMPAT_COMMON_LR 0x00000008 /* common last_rvcd format */
+/** COMPAT_146: this is an OST (temporary) */
+#define OBD_COMPAT_OST 0x00000002
+/** COMPAT_146: this is an MDT (temporary) */
+#define OBD_COMPAT_MDT 0x00000004
+
+/** MDS handles LOV_OBJID file */
+#define OBD_ROCOMPAT_LOVOBJID 0x00000001
+
+/** OST handles group subdirs */
+#define OBD_INCOMPAT_GROUPS 0x00000001
+/** this is an OST */
+#define OBD_INCOMPAT_OST 0x00000002
+/** this is an MDT */
+#define OBD_INCOMPAT_MDT 0x00000004
+/** common last_rvcd format */
+#define OBD_INCOMPAT_COMMON_LR 0x00000008
+/** FID is enabled */
+#define OBD_INCOMPAT_FID 0x00000010
+/**
+ * lustre disk using iam format to store directory entries
+ */
+#define OBD_INCOMPAT_IAM_DIR 0x00000020
/* Data stored per server at the head of the last_rcvd file. In le32 order.
LUSTRE_SEQ_SUPER_WIDTH = (LUSTRE_SEQ_META_WIDTH * LUSTRE_SEQ_META_WIDTH)
};
+/** special fid seq: used for local object create. */
+#define FID_SEQ_LOCAL_FILE (FID_SEQ_START + 1)
+
+/** special OID for local objects */
+enum {
+ /** \see osd_oi_index_create */
+ OSD_OI_FID_SMALL_OID = 1UL,
+ OSD_OI_FID_OTHER_OID = 2UL,
+ /** \see fld_mod_init */
+ FLD_INDEX_OID = 3UL,
+ /** \see fid_mod_init */
+ FID_SEQ_CTL_OID = 4UL,
+ FID_SEQ_SRV_OID = 5UL,
+ /** \see mdd_mod_init */
+ MDD_ROOT_INDEX_OID = 6UL,
+ MDD_ORPHAN_OID = 7UL,
+ MDD_LOV_OBJ_OID = 8UL,
+ MDD_CAPA_KEYS_OID = 9UL,
+ MDD_OBJECTS_OID = 10UL,
+ /** \see mdt_mod_init */
+ MDT_LAST_RECV_OID = 11UL,
+ /** \see osd_mod_init */
+ OSD_REM_OBJ_DIR_OID = 12UL,
+};
+
+static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid)
+{
+ fid->f_seq = FID_SEQ_LOCAL_FILE;
+ fid->f_oid = oid;
+ fid->f_ver = 0;
+}
+
enum lu_mgr_type {
LUSTRE_SEQ_SERVER,
LUSTRE_SEQ_CONTROLLER
struct lu_client_fld;
struct lu_server_fld;
+extern const struct dt_index_features fld_index_features;
+extern const char fld_index_name[];
+
+
struct fld_stats {
__u64 fst_count;
__u64 fst_cache;
/** Check for split */
int sp_ck_split;
+
+ /** to create directory */
+ const struct dt_index_features *sp_feat;
};
/**
}
}
-/** @} md */
+struct dt_device;
+/**
+ * Structure to hold object information. This is used to create object
+ */
+struct lu_local_obj_desc {
+ const char *llod_name;
+ __u32 llod_oid;
+ int llod_is_index;
+ const struct dt_index_features * llod_feat;
+ struct list_head llod_linkage;
+};
+
+struct md_object *llo_store_resolve(const struct lu_env *env,
+ struct md_device *md,
+ struct dt_device *dt,
+ const char *path,
+ struct lu_fid *fid);
+
+struct md_object *llo_store_open(const struct lu_env *env,
+ struct md_device *md,
+ struct dt_device *dt,
+ const char *dirname,
+ const char *objname,
+ struct lu_fid *fid);
+
+struct md_object *llo_store_create_index(const struct lu_env *env,
+ struct md_device *md,
+ struct dt_device *dt,
+ const char *dirname,
+ const char *objname,
+ const struct lu_fid *fid,
+ const struct dt_index_features *feat);
+
+struct md_object *llo_store_create(const struct lu_env *env,
+ struct md_device *md,
+ struct dt_device *dt,
+ const char *dirname,
+ const char *objname,
+ const struct lu_fid *fid);
+
+int llo_local_obj_register(struct lu_local_obj_desc *);
+
+int llo_local_objects_setup(const struct lu_env *env,
+ struct md_device * md,
+ struct dt_device * dt);
+/** @} md */
#endif /* _LINUX_MD_OBJECT_H */
};
enum filter_groups {
+ FILTER_GROUP_MDS0 = 0,
FILTER_GROUP_LLOG = 1,
- FILTER_GROUP_ECHO,
- FILTER_GROUP_MDS0
+ FILTER_GROUP_ECHO = 2 ,
+ FILTER_GROUP_MDS1_N_BASE = 3
};
+static inline __u64 obdo_mdsno(struct obdo *oa)
+{
+ if (oa->o_gr)
+ return oa->o_gr - FILTER_GROUP_MDS1_N_BASE;
+ return 0;
+}
+
+static inline int mdt_to_obd_objgrp(int mdtid)
+{
+ if (mdtid)
+ return FILTER_GROUP_MDS1_N_BASE + mdtid;
+ return 0;
+}
+
+/**
+ * In HEAD for CMD, the object is created in group number which is 3>=
+ * or indexing starts from 3. To test this assertions are added to disallow
+ * group 0. But to run 2.0 mds server on 1.8.x disk format (i.e. interop_mode)
+ * object in group 0 needs to be allowed.
+ * So for interop mode following changes needs to be done:
+ * 1. No need to assert on group 0 or allow group 0
+ * 2. The group number indexing starts from 0 instead of 3
+ */
+
+#define CHECK_MDS_GROUP(group) (group == FILTER_GROUP_MDS0 || \
+ group > FILTER_GROUP_MDS1_N_BASE)
+#define LASSERT_MDS_GROUP(group) LASSERT(CHECK_MDS_GROUP(group))
+
struct obd_llog_group {
struct list_head olg_list;
int olg_group;
static inline __u64 oinfo_mdsno(struct obd_info *oinfo)
{
- return oinfo->oi_oa->o_gr - FILTER_GROUP_MDS0;
+ return obdo_mdsno(oinfo->oi_oa);
}
static inline struct lustre_capa *oinfo_capa(struct obd_info *oinfo)
ldt = obd->obd_type->typ_lu;
if (ldt != NULL) {
+ struct lu_context session_ctx;
struct lu_env env;
+ lu_context_init(&session_ctx, LCT_SESSION);
+ session_ctx.lc_thread = NULL;
+ lu_context_enter(&session_ctx);
rc = lu_env_init(&env, ldt->ldt_ctx_tags);
if (rc == 0) {
+ env.le_ses = &session_ctx;
d = ldt->ldt_ops->ldto_device_alloc(&env, ldt, cfg);
lu_env_fini(&env);
if (!IS_ERR(d)) {
} else
rc = PTR_ERR(d);
}
+ lu_context_exit(&session_ctx);
+ lu_context_fini(&session_ctx);
+
} else {
OBD_CHECK_DT_OP(obd, setup, -EOPNOTSUPP);
OBD_COUNTER_INCREMENT(obd, setup);
stripes * sizeof(struct lov_ost_data_v1);
}
-
#define IOC_LOV_TYPE 'g'
#define IOC_LOV_MIN_NR 50
#define IOC_LOV_SET_OSC_ACTIVE _IOWR('g', 50, long)
ino = ino | 0x80000000;
RETURN(ino);
}
+
+__u32 ll_fid_build_gen(struct ll_sb_info *sbi,
+ struct lu_fid *fid)
+{
+ __u32 gen = 0;
+ ENTRY;
+
+ if (fid_is_igif(fid)) {
+ gen = lu_igif_gen(fid);
+ }
+ RETURN(gen);
+}
/* llite/llite_fid.c */
ino_t ll_fid_build_ino(struct ll_sb_info *sbi, struct lu_fid *fid);
+__u32 ll_fid_build_gen(struct ll_sb_info *sbi, struct lu_fid *fid);
/* llite/llite_capa.c */
extern cfs_timer_t ll_capa_timer;
}
#endif
inode->i_ino = ll_fid_build_ino(sbi, &body->fid1);
+ inode->i_generation = ll_fid_build_gen(sbi, &body->fid1);
if (body->valid & OBD_MD_FLATIME &&
body->atime > LTIME_S(inode->i_atime))
}
return rc;
- }
+
+ } else if (strcmp(name, "trusted.lma") == 0) /* b17288: ignore common_ea */
+ return 0;
return ll_setxattr_common(inode, name, value, size, flags,
OBD_MD_FLXATTR);
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
- LASSERT(lsm->lsm_object_gr > 0);
+ LASSERT_MDS_GROUP(lsm->lsm_object_gr);
lov = &exp->exp_obd->u.lov;
for (i = 0; i < lsm->lsm_stripe_count; i++) {
if (!exp || !exp->exp_obd)
RETURN(-ENODEV);
- LASSERT(lsm->lsm_object_gr > 0);
+ LASSERT_MDS_GROUP(lsm->lsm_object_gr);
LASSERT(lockh);
lov = &exp->exp_obd->u.lov;
rc = lov_prep_cancel_set(exp, &oinfo, lsm, mode, lockh, &set);
ASSERT_LSM_MAGIC(lsm);
- LASSERT(lsm->lsm_object_gr > 0);
+ LASSERT_MDS_GROUP(lsm->lsm_object_gr);
for (i = 0; i < lsm->lsm_stripe_count; i++) {
struct lov_stripe_md submd;
struct lov_oinfo *loi = lsm->lsm_oinfo[i];
memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
sizeof(*req->rq_oi.oi_oa));
req->rq_oi.oi_oa->o_id = loi->loi_id;
- LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP)
- || req->rq_oi.oi_oa->o_gr>0);
+ LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) ||
+ CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr),
+ "req->rq_oi.oi_oa->o_valid="LPX64" "
+ "req->rq_oi.oi_oa->o_gr="LPU64"\n",
+ req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr);
req->rq_oi.oi_oa->o_stripe_idx = i;
req->rq_oi.oi_cb_up = cb_setattr_update;
req->rq_oi.oi_capa = oinfo->oi_capa;
EXPORT_SYMBOL(simple_mknod);
/* utility to make a directory */
-struct dentry *simple_mkdir(struct dentry *dir, struct vfsmount *mnt,
- char *name, int mode, int fix)
+struct dentry *simple_mkdir(struct dentry *dir, struct vfsmount *mnt,
+ const char *name, int mode, int fix)
{
struct dentry *dchild;
int err = 0;
#include <obd_support.h>
#include <lprocfs_status.h>
+#include <lustre_disk.h>
+#include <lustre_fid.h>
#include <linux/ldiskfs_fs.h>
#include <lustre_mds.h>
#include <lustre/lustre_idl.h>
const struct md_device_operations mdd_ops;
-static const char *mdd_root_dir_name = "root";
+static const char mdd_root_dir_name[] = "ROOT";
+
static int mdd_device_init(const struct lu_env *env, struct lu_device *d,
const char *name, struct lu_device *next)
{
return next;
}
-static int mdd_mount(const struct lu_env *env, struct mdd_device *mdd)
-{
- int rc;
- struct dt_object *root;
- ENTRY;
-
- dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb);
- root = dt_store_open(env, mdd->mdd_child, mdd_root_dir_name,
- &mdd->mdd_root_fid);
- if (!IS_ERR(root)) {
- LASSERT(root != NULL);
- lu_object_put(env, &root->do_lu);
- rc = orph_index_init(env, mdd);
- } else
- rc = PTR_ERR(root);
-
- RETURN(rc);
-}
-
static void mdd_device_shutdown(const struct lu_env *env,
struct mdd_device *m, struct lustre_cfg *cfg)
{
CERROR("lov init error %d \n", rc);
GOTO(out, rc);
}
- rc = mdd_mount(env, m);
- if (rc)
- GOTO(out, rc);
rc = mdd_txn_init_credits(env, m);
break;
case LCFG_CLEANUP:
RETURN(rc);
}
+static int mdd_prepare(const struct lu_env *env,
+ struct lu_device *pdev,
+ struct lu_device *cdev)
+{
+ struct mdd_device *mdd = lu2mdd_dev(cdev);
+ struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
+ struct dt_object *root;
+ int rc;
+
+ ENTRY;
+ rc = next->ld_ops->ldo_prepare(env, cdev, next);
+ if (rc)
+ GOTO(out, rc);
+
+ dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb);
+ root = dt_store_open(env, mdd->mdd_child, "", mdd_root_dir_name,
+ &mdd->mdd_root_fid);
+ if (!IS_ERR(root)) {
+ LASSERT(root != NULL);
+ lu_object_put(env, &root->do_lu);
+ rc = orph_index_init(env, mdd);
+ } else
+ rc = PTR_ERR(root);
+
+out:
+ RETURN(rc);
+}
+
const struct lu_device_operations mdd_lu_ops = {
.ldo_object_alloc = mdd_object_alloc,
.ldo_process_config = mdd_process_config,
- .ldo_recovery_complete = mdd_recovery_complete
+ .ldo_recovery_complete = mdd_recovery_complete,
+ .ldo_prepare = mdd_prepare,
};
/*
/* context key: mdd_thread_key */
LU_CONTEXT_KEY_DEFINE(mdd, LCT_MD_THREAD);
+static struct lu_local_obj_desc llod_capa_key = {
+ .llod_name = CAPA_KEYS,
+ .llod_oid = MDD_CAPA_KEYS_OID,
+ .llod_is_index = 0,
+};
+
+static struct lu_local_obj_desc llod_mdd_orphan = {
+ .llod_name = orph_index_name,
+ .llod_oid = MDD_ORPHAN_OID,
+ .llod_is_index = 1,
+ .llod_feat = &dt_directory_features,
+};
+
+static struct lu_local_obj_desc llod_mdd_root = {
+ .llod_name = mdd_root_dir_name,
+ .llod_oid = MDD_ROOT_INDEX_OID,
+ .llod_is_index = 1,
+ .llod_feat = &dt_directory_features,
+};
+
static int __init mdd_mod_init(void)
{
struct lprocfs_static_vars lvars;
lprocfs_mdd_init_vars(&lvars);
+
+ llo_local_obj_register(&llod_capa_key);
+ llo_local_obj_register(&llod_mdd_orphan);
+ llo_local_obj_register(&llod_mdd_root);
+
return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
LUSTRE_MDD_NAME, &mdd_device_type);
}
__mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
const struct lu_name *lname, struct lu_fid* fid, int mask)
{
- char *name = lname->ln_name;
+ const char *name = lname->ln_name;
struct mdd_object *mdd_obj = md2mdd_obj(pobj);
struct dynlock_handle *dlh;
int rc;
RETURN(-ENOTDIR);
iops = &obj->do_index_ops->dio_it;
- it = iops->init(env, obj, 0, BYPASS_CAPA);
+ it = iops->init(env, obj, BYPASS_CAPA);
if (it != NULL) {
result = iops->get(env, it, (const void *)"");
if (result > 0) {
RETURN(rc);
}
-const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
- const struct lu_fid *fid)
-{
- struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack;
-
- fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2);
- return (const struct dt_rec *)pack;
-}
-
/**
* If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
* assign i_nlink to 1 which means the i_nlink for subdir count is incredible
struct md_object *src_obj, const struct lu_name *lname,
struct md_attr *ma)
{
- char *name = lname->ln_name;
+ const char *name = lname->ln_name;
struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
struct thandle *th)
{
int rc;
+ int reset = 1;
ENTRY;
rc = mdd_iattr_get(env, obj, ma);
if (rc == 0 && ma->ma_attr.la_nlink == 0) {
/* add new orphan and the object
- * will be deleted during the object_put() */
- if (__mdd_orphan_add(env, obj, th) == 0)
- obj->mod_flags |= ORPHAN_OBJ;
+ * will be deleted during mdd_close() */
+ if (obj->mod_count) {
+ rc = __mdd_orphan_add(env, obj, th);
+ if (rc == 0)
+ obj->mod_flags |= ORPHAN_OBJ;
+ }
obj->mod_flags |= DEAD_OBJ;
- if (obj->mod_count == 0)
+ if (!(obj->mod_flags & ORPHAN_OBJ)) {
rc = mdd_object_kill(env, obj, ma);
- else
- /* clear MA_LOV | MA_COOKIE, if we do not
- * unlink it in case we get it somewhere */
- ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
- } else
+ if (rc == 0)
+ reset = 0;
+ }
+
+ }
+ if (reset)
ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
RETURN(rc);
struct md_object *cobj, const struct lu_name *lname,
struct md_attr *ma)
{
- char *name = lname->ln_name;
+ const char *name = lname->ln_name;
struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
const struct lu_fid *fid,
const struct md_attr *ma)
{
- char *name = lname->ln_name;
+ const char *name = lname->ln_name;
struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
struct mdd_object *mdd_obj = md2mdd_obj(pobj);
struct mdd_device *mdd = mdo2mdd(pobj);
const struct lu_name *lname,
const struct md_attr *ma)
{
- char *name = lname->ln_name;
+ const char *name = lname->ln_name;
struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
struct mdd_object *mdd_obj = md2mdd_obj(pobj);
struct mdd_device *mdd = mdo2mdd(pobj);
const struct lu_fid *lf, const struct lu_name *lname,
struct md_attr *ma)
{
- char *name = lname->ln_name;
+ const char *name = lname->ln_name;
struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
__mdd_lookup(const struct lu_env *env, struct md_object *pobj,
const struct lu_name *lname, struct lu_fid* fid, int mask)
{
- char *name = lname->ln_name;
+ const char *name = lname->ln_name;
const struct dt_key *key = (const struct dt_key *)name;
struct mdd_object *mdd_obj = md2mdd_obj(pobj);
struct mdd_device *m = mdo2mdd(pobj);
rc = dir->do_index_ops->dio_lookup(env, dir,
(struct dt_rec *)pack, key,
mdd_object_capa(env, mdd_obj));
- if (rc == 0)
+ if (rc > 0)
rc = fid_unpack(pack, fid);
+ else if (rc == 0)
+ rc = -ENOENT;
} else
rc = -ENOTDIR;
int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
struct mdd_object *child, struct md_attr *ma,
- struct thandle *handle)
+ struct thandle *handle, const struct md_op_spec *spec)
{
int rc;
ENTRY;
struct lov_mds_md *lmm = NULL;
struct thandle *handle;
struct dynlock_handle *dlh;
- char *name = lname->ln_name;
+ const char *name = lname->ln_name;
int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
int got_def_acl = 0;
#ifdef HAVE_QUOTA_SUPPORT
GOTO(out_trans, rc = -ENOMEM);
mdd_write_lock(env, son, MOR_TGT_CHILD);
- rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle);
+ rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle, spec);
if (rc) {
mdd_write_unlock(env, son);
GOTO(cleanup, rc);
#endif
rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
- son, ma, handle);
+ son, ma, handle, spec);
mdd_write_unlock(env, son);
if (rc)
/*
struct md_object *tobj, const struct lu_name *ltname,
struct md_attr *ma)
{
- char *sname = lsname->ln_name;
- char *tname = ltname->ln_name;
+ const char *sname = lsname->ln_name;
+ const char *tname = ltname->ln_name;
struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
struct mdd_object *mdd_tobj = NULL;
struct dynlock_handle *sdlh, *tdlh;
struct thandle *handle;
+ const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
+ int is_dir;
+ int rc;
+
#ifdef HAVE_QUOTA_SUPPORT
struct obd_device *obd = mdd->mdd_obd_dev;
struct mds_obd *mds = &obd->u.mds;
unsigned int qtpids[MAXQUOTAS] = { 0, 0 };
int quota_opc = 0, rec_pending = 0;
#endif
- int rc, is_dir;
ENTRY;
LASSERT(ma->ma_attr.la_mode & S_IFMT);
if (rc)
GOTO(cleanup, rc);
+ /* "mv dir1 dir2" needs "dir1/.." link update */
+ if (is_dir) {
+ rc = __mdd_index_delete(env, mdd_sobj, dotdot, is_dir, handle,
+ mdd_object_capa(env, mdd_spobj));
+ if (rc)
+ GOTO(cleanup, rc);
+
+ rc = __mdd_index_insert(env, mdd_sobj, tpobj_fid, dotdot,
+ is_dir, handle,
+ mdd_object_capa(env, mdd_tpobj));
+ if (rc)
+ GOTO(cleanup, rc);
+ }
+
/*
* Here tobj can be remote one, so we do index_delete unconditionally
* and -ENOENT is allowed.
#endif
};
-struct orph_key {
- /* fid of the object*/
- struct lu_fid ok_fid;
- /* type of operation: unlink, truncate */
- __u32 ok_op;
-} __attribute__((packed));
-
struct mdd_thread_info {
struct txn_param mti_param;
struct lu_fid mti_fid;
struct md_attr mti_ma;
struct lu_attr mti_la_for_fix;
struct obd_info mti_oi;
- struct orph_key mti_orph_key;
+ char mti_orph_key[NAME_MAX + 1];
struct obd_trans_info mti_oti;
struct lu_buf mti_buf;
struct obdo mti_oa;
int mti_max_lmm_size;
struct llog_cookie *mti_max_cookie;
int mti_max_cookie_size;
+ struct dt_object_format mti_dof;
struct obd_quotactl mti_oqctl;
};
+extern const char orph_index_name[];
+
+extern const struct dt_index_features orph_index_features;
+
struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env,
struct mdd_device *mdd);
struct md_attr *ma);
int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
struct mdd_object *c, struct md_attr *ma,
- struct thandle *handle);
+ struct thandle *handle,
+ const struct md_op_spec *spec);
int mdd_attr_check_set_internal_locked(const struct lu_env *env,
struct mdd_object *obj,
struct lu_attr *attr,
struct md_attr *ma, struct thandle *th);
int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
struct mdd_object *child, struct md_attr *ma,
- struct thandle *handle);
+ struct thandle *handle, const struct md_op_spec *spec);
int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
const struct lu_name *lname, struct mdd_object *src_obj);
/* mdd_lov.c */
int mdd_setattr_txn_param_build(const struct lu_env *env, struct md_object *obj,
struct md_attr *ma, enum mdd_txn_op);
+int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
+ struct mdd_object *obj, struct lu_attr *la);
+
static inline void mdd_object_put(const struct lu_env *env,
struct mdd_object *o)
{
return lu_object_fid(&obj->mod_obj.mo_lu);
}
+static inline const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
+ const struct lu_fid *fid)
+{
+ struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack;
+
+ fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2);
+ return (const struct dt_rec *)pack;
+}
+
static inline umode_t mdd_object_type(const struct mdd_object *obj)
{
return lu_object_attr(&obj->mod_obj.mo_lu);
int mdo_create_obj(const struct lu_env *env, struct mdd_object *o,
struct lu_attr *attr,
struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
struct thandle *handle)
{
struct dt_object *next = mdd_object_child(o);
- return next->do_ops->do_create(env, next, attr, hint, handle);
+ return next->do_ops->do_create(env, next, attr, hint, dof, handle);
}
static inline struct obd_capa *mdo_capa_get(const struct lu_env *env,
{
struct obd_device *obd = mdd2obd_dev(mdd);
struct obd_export *lov_exp = obd->u.mds.mds_osc_exp;
+ struct lu_site *site = mdd2lu_dev(mdd)->ld_site;
struct obdo *oa;
struct lov_stripe_md *lsm = NULL;
const void *eadata = spec->u.sp_ea.eadata;
oa->o_uid = 0; /* must have 0 uid / gid on OST */
oa->o_gid = 0;
- oa->o_gr = FILTER_GROUP_MDS0 +
- lu_site2md(mdd2lu_dev(mdd)->ld_site)->ms_node_id;
+ oa->o_gr = mdt_to_obd_objgrp(lu_site2md(site)->ms_node_id);
oa->o_mode = S_IFREG | 0600;
oa->o_id = mdd_lov_create_id(mdd_object_fid(child));
oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
}
GOTO(out_oti, rc);
}
- LASSERT(lsm->lsm_object_gr >= FILTER_GROUP_MDS0);
+ LASSERT_MDS_GROUP(lsm->lsm_object_gr);
} else {
LASSERT(eadata != NULL);
rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm,
return rc;
}
+/*
+ * used when destroying orphans and from mds_reint_unlink() when MDS wants to
+ * destroy objects on OSS.
+ */
+static
+int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd,
+ struct mdd_object *obj, struct lu_attr *la,
+ struct lov_mds_md *lmm, int lmm_size,
+ struct llog_cookie *logcookies,
+ int log_unlink)
+{
+ struct obd_device *obd = mdd2obd_dev(mdd);
+ struct obd_export *lov_exp = obd->u.mds.mds_osc_exp;
+ struct lov_stripe_md *lsm = NULL;
+ struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
+ struct obdo *oa = &mdd_env_info(env)->mti_oa;
+ struct lu_site *site = mdd2lu_dev(mdd)->ld_site;
+ int rc;
+ ENTRY;
+
+ if (lmm_size == 0)
+ RETURN(0);
+
+ rc = obd_unpackmd(lov_exp, &lsm, lmm, lmm_size);
+ if (rc < 0) {
+ CERROR("Error unpack md %p\n", lmm);
+ RETURN(rc);
+ } else {
+ LASSERT(rc >= sizeof(*lsm));
+ rc = 0;
+ }
+
+ oa->o_id = lsm->lsm_object_id;
+ oa->o_gr = mdt_to_obd_objgrp(lu_site2md(site)->ms_node_id);
+ oa->o_mode = la->la_mode & S_IFMT;
+ oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
+
+ oti_init(oti, NULL);
+ if (log_unlink && logcookies) {
+ oa->o_valid |= OBD_MD_FLCOOKIE;
+ oti->oti_logcookies = logcookies;
+ }
+
+ CDEBUG(D_INFO, "destroying OSS object %d/%d\n",
+ (int)oa->o_id, (int)oa->o_gr);
+
+ rc = obd_destroy(lov_exp, oa, lsm, oti, NULL, BYPASS_CAPA);
+
+ obd_free_memmd(lov_exp, &lsm);
+ RETURN(rc);
+}
+
+/*
+ * called with obj not locked.
+ */
+
+int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
+ struct mdd_object *obj, struct lu_attr *la)
+{
+ struct md_attr *ma = &mdd_env_info(env)->mti_ma;
+ int rc;
+ ENTRY;
+
+ if (unlikely(la->la_nlink != 0)) {
+ CWARN("Attempt to destroy OSS object when nlink == %d\n",
+ la->la_nlink);
+ RETURN(0);
+ }
+
+ ma->ma_lmm_size = mdd_lov_mdsize(env, mdd);
+ ma->ma_lmm = mdd_max_lmm_get(env, mdd);
+ ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd);
+ ma->ma_cookie = mdd_max_cookie_get(env, mdd);
+ if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
+ RETURN(rc = -ENOMEM);
+
+ /* get lov ea */
+
+ rc = mdd_get_md_locked(env, obj, ma->ma_lmm, &ma->ma_lmm_size,
+ MDS_LOV_MD_NAME);
+
+ if (rc <= 0) {
+ CWARN("Get lov ea failed for "DFID" rc = %d\n",
+ PFID(mdo2fid(obj)), rc);
+ if (rc == 0)
+ rc = -ENOENT;
+ RETURN(rc);
+ }
+
+ ma->ma_valid = MA_LOV;
+
+ rc = mdd_unlink_log(env, mdd, obj, ma);
+ if (rc) {
+ CWARN("mds unlink log for "DFID" failed: %d\n",
+ PFID(mdo2fid(obj)), rc);
+ RETURN(rc);
+ }
+
+ if (ma->ma_valid & MA_COOKIE)
+ rc = mdd_lovobj_unlink(env, mdd, obj, la,
+ ma->ma_lmm, ma->ma_lmm_size,
+ ma->ma_cookie, 1);
+ RETURN(rc);
+}
+
int mdd_log_op_unlink(struct obd_device *obd,
struct lov_mds_md *lmm, int lmm_size,
struct llog_cookie *logcookies, int cookies_size)
OBD_FREE_PTR(mdd);
}
-/* orphan handling is here */
-static void mdd_object_delete(const struct lu_env *env, struct lu_object *o)
+static int mdd_object_print(const struct lu_env *env, void *cookie,
+ lu_printer_t p, const struct lu_object *o)
{
- struct mdd_object *mdd_obj = lu2mdd_obj(o);
- struct thandle *handle = NULL;
- ENTRY;
-
- if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL)
- return;
-
- if (mdd_obj->mod_flags & ORPHAN_OBJ) {
- mdd_txn_param_build(env, lu2mdd_dev(o->lo_dev),
- MDD_TXN_INDEX_DELETE_OP);
- handle = mdd_trans_start(env, lu2mdd_dev(o->lo_dev));
- if (IS_ERR(handle))
- CERROR("Cannot get thandle\n");
- else {
- mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
- /* let's remove obj from the orphan list */
- __mdd_orphan_del(env, mdd_obj, handle);
- mdd_write_unlock(env, mdd_obj);
- mdd_trans_stop(env, lu2mdd_dev(o->lo_dev),
- 0, handle);
- }
- }
+ return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o);
}
static const struct lu_object_operations mdd_lu_obj_ops = {
- .loo_object_init = mdd_object_init,
- .loo_object_start = mdd_object_start,
- .loo_object_free = mdd_object_free,
- .loo_object_delete = mdd_object_delete
+ .loo_object_init = mdd_object_init,
+ .loo_object_start = mdd_object_start,
+ .loo_object_free = mdd_object_free,
+ .loo_object_print = mdd_object_print,
};
struct mdd_object *mdd_object_find(const struct lu_env *env,
int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
struct mdd_object *c, struct md_attr *ma,
- struct thandle *handle)
+ struct thandle *handle,
+ const struct md_op_spec *spec)
{
struct lu_attr *attr = &ma->ma_attr;
struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
+ struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
+ const struct dt_index_features *feat = spec->sp_feat;
int rc;
ENTRY;
struct dt_object *next = mdd_object_child(c);
LASSERT(next);
+ if (feat != &dt_directory_features && feat != NULL)
+ dof->dof_type = DFT_INDEX;
+ else
+ dof->dof_type = dt_mode_to_dft(attr->la_mode);
+
+ dof->u.dof_idx.di_feat = feat;
+
/* @hint will be initialized by underlying device. */
next->do_ops->do_ah_init(env, hint,
p ? mdd_object_child(p) : NULL,
attr->la_mode & S_IFMT);
- rc = mdo_create_obj(env, c, attr, hint, handle);
+
+ rc = mdo_create_obj(env, c, attr, hint, dof, handle);
LASSERT(ergo(rc == 0, mdd_object_exists(c)));
} else
rc = -EEXIST;
if (rc)
GOTO(unlock, rc);
- rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle);
+ rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec);
if (rc)
GOTO(unlock, rc);
pfid = spec->u.sp_ea.fid;
}
#endif
- rc = mdd_object_initialize(env, pfid, mdd_obj, ma, handle);
+ rc = mdd_object_initialize(env, pfid, mdd_obj, ma, handle, spec);
}
EXIT;
unlock:
if (S_ISREG(mdd_object_type(obj))) {
/* Return LOV & COOKIES unconditionally here. We clean evth up.
* Caller must be ready for that. */
+
rc = __mdd_lmm_get(env, obj, ma);
if ((ma->ma_valid & MA_LOV))
rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj),
static int mdd_close(const struct lu_env *env, struct md_object *obj,
struct md_attr *ma)
{
- int rc;
struct mdd_object *mdd_obj = md2mdd_obj(obj);
struct thandle *handle;
+ int rc;
+ int reset = 1;
+
#ifdef HAVE_QUOTA_SUPPORT
struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
struct mds_obd *mds = &obd->u.mds;
/* release open count */
mdd_obj->mod_count --;
+ if (mdd_obj->mod_count == 0) {
+ /* remove link to object from orphan index */
+ if (mdd_obj->mod_flags & ORPHAN_OBJ)
+ __mdd_orphan_del(env, mdd_obj, handle);
+ }
+
rc = mdd_iattr_get(env, mdd_obj, ma);
- if (rc == 0 && mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
- rc = mdd_object_kill(env, mdd_obj, ma);
+ if (rc == 0) {
+ if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) {
+ rc = mdd_object_kill(env, mdd_obj, ma);
#ifdef HAVE_QUOTA_SUPPORT
- if (mds->mds_quota) {
- quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
- mdd_quota_wrapper(&ma->ma_attr, qids);
- }
+ if (mds->mds_quota) {
+ quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
+ mdd_quota_wrapper(&ma->ma_attr, qids);
+ }
#endif
- } else {
- ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
+ if (rc == 0)
+ reset = 0;
+ }
}
+ if (reset)
+ ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
+
mdd_write_unlock(env, mdd_obj);
mdd_trans_stop(env, mdo2mdd(obj), rc, handle);
#ifdef HAVE_QUOTA_SUPPORT
* iterate through directory and fill pages from @rdpg
*/
iops = &next->do_index_ops->dio_it;
- it = iops->init(env, next, 0, mdd_object_capa(env, obj));
+ it = iops->init(env, next, mdd_object_capa(env, obj));
if (IS_ERR(it))
return PTR_ERR(it);
* Orphan handling code
*
* Author: Mike Pershin <tappro@clusterfs.com>
+ * Pravin B Shelar <pravin.shelar@sun.com>
*/
#ifndef EXPORT_SYMTAB
#include <lustre_fid.h>
#include "mdd_internal.h"
-const char orph_index_name[] = "orphans";
-
-static const struct dt_index_features orph_index_features = {
- .dif_flags = DT_IND_UPDATE,
- .dif_keysize_min = sizeof(struct orph_key),
- .dif_keysize_max = sizeof(struct orph_key),
- .dif_recsize_min = sizeof(loff_t),
- .dif_recsize_max = sizeof(loff_t)
-};
+const char orph_index_name[] = "PENDING";
enum {
ORPH_OP_UNLINK,
ORPH_OP_TRUNCATE
};
-static struct orph_key *orph_key_fill(const struct lu_env *env,
- const struct lu_fid *lf, __u32 op)
+#define ORPHAN_FILE_NAME_FORMAT "%016llx:%08x:%08x:%2x"
+#define ORPHAN_FILE_NAME_FORMAT_18 "%llx:%08x"
+
+static struct dt_key* orph_key_fill(const struct lu_env *env,
+ const struct lu_fid *lf, __u32 op)
{
- struct orph_key *key = &mdd_env_info(env)->mti_orph_key;
+ char *key = mdd_env_info(env)->mti_orph_key;
+ int rc;
+
LASSERT(key);
- fid_cpu_to_be(&key->ok_fid, lf);
- key->ok_op = cpu_to_be32(op);
- return key;
+ rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT, fid_seq(lf),
+ fid_oid(lf), fid_ver(lf), op);
+ if (rc > 0)
+ return (struct dt_key*) key;
+ else
+ return ERR_PTR(rc);
+}
+
+static struct dt_key* orph_key_fill_18(const struct lu_env *env,
+ const struct lu_fid *lf)
+{
+ char *key = mdd_env_info(env)->mti_orph_key;
+ int rc;
+
+ LASSERT(key);
+ rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT_18, fid_seq(lf),
+ fid_oid(lf));
+ if (rc > 0)
+ return (struct dt_key*) key;
+ else
+ return ERR_PTR(rc);
+}
+
+static int orphan_key_to_fid(char *key, struct lu_fid *lf)
+{
+ int rc = 0;
+ unsigned int op;
+
+ rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT, &lf->f_seq, &lf->f_oid,
+ &lf->f_ver, &op);
+ if (rc == 4)
+ return 0;
+
+ /* build igif */
+ rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT_18,
+ &lf->f_seq, &lf->f_oid);
+ if (rc == 2) {
+ lf->f_ver = 0;
+ return 0;
+ }
+
+ CERROR("can not parse orphan file name %s\n",key);
+ return -EINVAL;
+}
+
+static inline void mdd_orphan_write_lock(const struct lu_env *env,
+ struct mdd_device *mdd)
+{
+
+ struct dt_object *dor = mdd->mdd_orphans;
+ dor->do_ops->do_write_lock(env, dor, MOR_TGT_CHILD);
+}
+
+static inline void mdd_orphan_write_unlock(const struct lu_env *env,
+ struct mdd_device *mdd)
+{
+
+ struct dt_object *dor = mdd->mdd_orphans;
+ dor->do_ops->do_write_unlock(env, dor);
+}
+
+static inline int mdd_orphan_insert_obj(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *obj,
+ __u32 op,
+ struct thandle *th)
+{
+ struct dt_object *dor = mdd->mdd_orphans;
+ const struct lu_fid *lf = mdo2fid(obj);
+ struct dt_key *key = orph_key_fill(env, lf, op);
+ ENTRY;
+
+ return dor->do_index_ops->dio_insert(env, dor,
+ __mdd_fid_rec(env, lf),
+ key, th,
+ BYPASS_CAPA, 1);
+}
+
+static inline int mdd_orphan_delete_obj(const struct lu_env *env,
+ struct mdd_device *mdd ,
+ struct dt_key *key,
+ struct thandle *th)
+{
+ struct dt_object *dor = mdd->mdd_orphans;
+
+ return dor->do_index_ops->dio_delete(env, dor,
+ key, th,
+ BYPASS_CAPA);
}
+static inline void mdd_orphan_ref_add(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct thandle *th)
+{
+ struct dt_object *dor = mdd->mdd_orphans;
+ dor->do_ops->do_ref_add(env, dor, th);
+}
+
+static inline void mdd_orphan_ref_del(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct thandle *th)
+{
+ struct dt_object *dor = mdd->mdd_orphans;
+ dor->do_ops->do_ref_del(env, dor, th);
+}
+
+
static int orph_index_insert(const struct lu_env *env,
- struct mdd_object *obj, __u32 op,
- loff_t *offset, struct thandle *th)
+ struct mdd_object *obj,
+ __u32 op,
+ struct thandle *th)
{
- struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
- struct dt_object *dor = mdd->mdd_orphans;
- struct orph_key *key = orph_key_fill(env, mdo2fid(obj), op);
+ struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+ struct dt_object *dor = mdd->mdd_orphans;
+ const struct lu_fid *lf_dor = lu_object_fid(&dor->do_lu);
+ struct dt_object *next = mdd_object_child(obj);
+ const struct dt_key *dotdot = (const struct dt_key *) "..";
int rc;
ENTRY;
- rc = dor->do_index_ops->dio_insert(env, dor, (struct dt_rec *)offset,
- (struct dt_key *)key, th,
- BYPASS_CAPA, 1);
+ mdd_orphan_write_lock(env, mdd);
+
+ rc = mdd_orphan_insert_obj(env, mdd, obj, op, th);
+ if (rc)
+ GOTO(out, rc);
+
+ mdo_ref_add(env, obj, th);
+ if (!S_ISDIR(mdd_object_type(obj)))
+ goto out;
+
+ mdo_ref_add(env, obj, th);
+ mdd_orphan_ref_add(env, mdd, th);
+
+ /* try best to fixup directory, dont return errors
+ * from here */
+ if (!dt_try_as_dir(env, next))
+ goto out;
+ next->do_index_ops->dio_delete(env, next,
+ dotdot, th, BYPASS_CAPA);
+
+ next->do_index_ops->dio_insert(env, next,
+ __mdd_fid_rec(env, lf_dor),
+ dotdot, th, BYPASS_CAPA, 1);
+
+out:
+ mdd_orphan_write_unlock(env, mdd);
+
RETURN(rc);
}
+/**
+ * destroy osd object on mdd and associated ost objects.
+ *
+ * \param obj orphan object
+ * \param mdd used for sending llog msg to osts
+ *
+ * \retval 0 success
+ * \retval -ve error
+ */
+static int orphan_object_kill(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct mdd_device *mdd,
+ struct thandle *th)
+{
+ struct lu_attr *la = &mdd_env_info(env)->mti_la;
+ int rc;
+
+ /* No need to lock this object as its recovery phase, and
+ * no other thread can access it. But we need to lock it
+ * as its precondition for osd api we using. */
+
+ mdd_write_lock(env, obj, MOR_TGT_CHILD);
+ mdo_ref_del(env, obj, th);
+ if (S_ISDIR(mdd_object_type(obj))) {
+ mdo_ref_del(env, obj, th);
+ mdd_orphan_ref_del(env, mdd, th);
+ mdd_write_unlock(env, obj);
+ } else {
+ /* regular file , cleanup linked ost objects */
+ rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
+ mdd_write_unlock(env, obj);
+ if (rc)
+ RETURN(rc);
+
+ mdd_lov_destroy(env, mdd, obj, la);
+ }
+ return 0;
+}
+
static int orph_index_delete(const struct lu_env *env,
- struct mdd_object *obj, __u32 op,
+ struct mdd_object *obj,
+ __u32 op,
struct thandle *th)
{
struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
struct dt_object *dor = mdd->mdd_orphans;
- struct orph_key *key = orph_key_fill(env, mdo2fid(obj), op);
+ struct dt_key *key;
int rc;
+
ENTRY;
+
LASSERT(dor);
- rc = dor->do_index_ops->dio_delete(env, dor,
- (struct dt_key *)key, th,
- BYPASS_CAPA);
- RETURN(rc);
+ key = orph_key_fill(env, mdo2fid(obj), op);
+ mdd_orphan_write_lock(env, mdd);
+
+ rc = mdd_orphan_delete_obj(env, mdd, key, th);
+
+ if (rc == -ENOENT) {
+ key = orph_key_fill_18(env, mdo2fid(obj));
+ rc = mdd_orphan_delete_obj(env, mdd, key, th);
+ }
+
+ if (!rc) {
+ /* lov objects will be destroyed by caller */
+ mdo_ref_del(env, obj, th);
+ if (S_ISDIR(mdd_object_type(obj))) {
+ mdo_ref_del(env, obj, th);
+ mdd_orphan_ref_del(env, mdd, th);
+ }
+ } else
+ CERROR("could not delete object: rc = %d\n",rc);
+
+ obj->mod_flags &= ~ORPHAN_OBJ;
+ mdd_orphan_write_unlock(env, mdd);
+ RETURN(rc);
}
-static inline struct orph_key *orph_key_empty(const struct lu_env *env,
- __u32 op)
+
+static int orphan_object_destroy(const struct lu_env *env,
+ struct mdd_object *obj,
+ struct dt_key *key)
{
- struct orph_key *key = &mdd_env_info(env)->mti_orph_key;
- LASSERT(key);
- fid_zero(&key->ok_fid);
- key->ok_op = cpu_to_be32(op);
- return key;
+ struct thandle *th = NULL;
+ struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+ int rc;
+ ENTRY;
+
+ mdd_txn_param_build(env, mdd, MDD_TXN_UNLINK_OP);
+ th = mdd_trans_start(env, mdd);
+ if (IS_ERR(th)) {
+ CERROR("Cannot get thandle\n");
+ RETURN(-ENOMEM);
+ }
+
+ mdd_orphan_write_lock(env, mdd);
+ rc = mdd_orphan_delete_obj(env, mdd, key, th);
+ if (!rc)
+ orphan_object_kill(env, obj, mdd, th);
+ else
+ CERROR("could not delete object: rc = %d\n",rc);
+
+ mdd_orphan_write_unlock(env, mdd);
+ mdd_trans_stop(env, mdd, 0, th);
+
+ RETURN(rc);
}
-static void orph_key_test_and_del(const struct lu_env *env,
- struct mdd_device *mdd,
- const struct orph_key *key)
+static int orph_key_test_and_del(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct lu_fid *lf,
+ struct dt_key *key)
{
struct mdd_object *mdo;
+ int rc;
+
+ mdo = mdd_object_find(env, mdd, lf);
- mdo = mdd_object_find(env, mdd, &key->ok_fid);
if (IS_ERR(mdo))
- CERROR("Invalid orphan!\n");
- else {
- mdd_write_lock(env, mdo, MOR_TGT_CHILD);
- if (mdo->mod_count == 0) {
- /* non-opened orphan, let's delete it */
- struct md_attr *ma = &mdd_env_info(env)->mti_ma;
- CWARN("Found orphan!\n");
- mdd_object_kill(env, mdo, ma);
- /* TODO: now handle OST objects */
- //mdd_ost_objects_destroy(env, ma);
- /* TODO: destroy index entry */
- }
- mdd_write_unlock(env, mdo);
- mdd_object_put(env, mdo);
+ return PTR_ERR(mdo);
+
+ rc = -EBUSY;
+ if (mdo->mod_count == 0) {
+ CWARN("Found orphan!\n");
+ rc = orphan_object_destroy(env, mdo, key);
+ } else {
+ mdo->mod_flags |= ORPHAN_OBJ;
}
+
+ mdd_object_put(env, mdo);
+ return rc;
}
static int orph_index_iterate(const struct lu_env *env,
struct mdd_device *mdd)
{
- struct dt_object *dt_obj = mdd->mdd_orphans;
- struct dt_it *it;
+ struct dt_object *dor = mdd->mdd_orphans;
+ char *mti_key = mdd_env_info(env)->mti_orph_key;
const struct dt_it_ops *iops;
- struct orph_key *key = orph_key_empty(env, 0);
- int result;
+ struct dt_it *it;
+ char *key;
+ struct lu_fid fid;
+ int result = 0;
+ int key_sz = 0;
+ int rc;
+ __u64 cookie;
ENTRY;
- iops = &dt_obj->do_index_ops->dio_it;
- it = iops->init(env, dt_obj, 1, BYPASS_CAPA);
+ /* In recovery phase, do not need for any lock here */
+
+ iops = &dor->do_index_ops->dio_it;
+ it = iops->init(env, dor, BYPASS_CAPA);
if (it != NULL) {
- result = iops->get(env, it, (const void *)key);
+ result = iops->get(env, it, (const void *)"");
if (result > 0) {
- int i;
/* main cycle */
- for (result = 0, i = 0; result == +1; ++i) {
+ do {
+
key = (void *)iops->key(env, it);
- fid_be_to_cpu(&key->ok_fid, &key->ok_fid);
- orph_key_test_and_del(env, mdd, key);
+ if (IS_ERR(key))
+ goto next;
+ key_sz = iops->key_size(env, it);
+
+ /* filter out "." and ".." entries from
+ * PENDING dir. */
+ if (key_sz < 8)
+ goto next;
+
+ memcpy(mti_key, key, key_sz);
+ mti_key[key_sz] = 0;
+
+ if (orphan_key_to_fid(mti_key, &fid))
+ goto next;
+ if (!fid_is_sane(&fid))
+ goto next;
+
+ /* kill orphan object */
+ cookie = iops->store(env, it);
+ iops->put(env, it);
+ rc = orph_key_test_and_del(env, mdd, &fid,
+ (struct dt_key *)mti_key);
+
+ /* after index delete reset iterator */
+ if (!rc)
+ result = iops->get(env, it,
+ (const void *)"");
+ else
+ result = iops->load(env, it, cookie);
+next:
result = iops->next(env, it);
- }
+ } while (result == 0);
+ result = 0;
} else if (result == 0)
/* Index contains no zero key? */
result = -EIO;
-
iops->put(env, it);
iops->fini(env, it);
} else
{
struct lu_fid fid;
struct dt_object *d;
- int rc;
+ int rc = 0;
ENTRY;
- d = dt_store_open(env, mdd->mdd_child, orph_index_name, &fid);
+ d = dt_store_open(env, mdd->mdd_child, "", orph_index_name, &fid);
if (!IS_ERR(d)) {
mdd->mdd_orphans = d;
- rc = d->do_ops->do_index_try(env, d, &orph_index_features);
- if (rc == 0)
- LASSERT(d->do_index_ops != NULL);
- else
- CERROR("\"%s\" is not an index!\n", orph_index_name);
+ if (!dt_try_as_dir(env, d)) {
+ rc = -ENOTDIR;
+ CERROR("\"%s\" is not an index! : rc = %d\n",
+ orph_index_name, rc);
+ }
} else {
CERROR("cannot find \"%s\" obj %d\n",
orph_index_name, (int)PTR_ERR(d));
EXIT;
}
+/**
+ * Iterate orphan index to cleanup orphan objects in case of recovery.
+ * \param d mdd device in recovery.
+ *
+ */
+
int __mdd_orphan_cleanup(const struct lu_env *env, struct mdd_device *d)
{
return orph_index_iterate(env, d);
}
+/**
+ * delete an orphan \a obj from orphan index.
+ * \param obj file or directory.
+ * \param th transaction for index insert.
+ *
+ * \pre obj nlink == 0 && obj->mod_count != 0
+ *
+ * \retval 0 success
+ * \retva -ve index operation error.
+ */
+
int __mdd_orphan_add(const struct lu_env *env,
struct mdd_object *obj, struct thandle *th)
{
- loff_t offset = 0;
- return orph_index_insert(env, obj, ORPH_OP_UNLINK, &offset, th);
+ return orph_index_insert(env, obj, ORPH_OP_UNLINK, th);
}
+/**
+ * delete an orphan \a obj from orphan index.
+ * \param obj file or directory.
+ * \param th transaction for index deletion and object destruction.
+ *
+ * \pre obj->mod_count == 0 && ORPHAN_OBJ is set for obj.
+ *
+ * \retval 0 success
+ * \retva -ve index operation error.
+ */
+
int __mdd_orphan_del(const struct lu_env *env,
struct mdd_object *obj, struct thandle *th)
{
*c = dt[DTO_INDEX_INSERT];
break;
case MDD_TXN_UNLINK_OP:
- /* delete index + Unlink log */
- *c = dt[DTO_INDEX_DELETE];
+ /* delete index + Unlink log +
+ * mdd orphan handling */
+ *c = dt[DTO_INDEX_DELETE] +
+ dt[DTO_INDEX_DELETE] +
+ dt[DTO_INDEX_INSERT] * 2 +
+ dt[DTO_XATTR_SET] * 3;
break;
case MDD_TXN_RENAME_OP:
/* 2 delete index + 1 insert + Unlink log */
*c = 2 * dt[DTO_INDEX_DELETE] +
- dt[DTO_INDEX_INSERT];
+ dt[DTO_INDEX_INSERT] +
+ dt[DTO_INDEX_DELETE] +
+ dt[DTO_INDEX_INSERT] * 2 +
+ dt[DTO_XATTR_SET] * 3;
break;
case MDD_TXN_RENAME_TGT_OP:
/* index insert + index delete */
*c = dt[DTO_INDEX_DELETE] +
- dt[DTO_INDEX_INSERT];
+ dt[DTO_INDEX_INSERT] +
+ dt[DTO_INDEX_DELETE] +
+ dt[DTO_INDEX_INSERT] * 2 +
+ dt[DTO_XATTR_SET] * 3;
break;
case MDD_TXN_CREATE_DATA_OP:
/* same as set xattr(lsm) */
LCONSOLE_WARN("%s: shutting down for failover; client state "
"will be preserved.\n", obd->obd_name);
+ if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
+ RETURN(0);
+
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
mds_lov_destroy_objids(obd);
err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
handle, 0);
if (!err) {
- oa->o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
+ oa->o_gr = mdt_to_obd_objgrp(mds->mds_id);
oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLGROUP;
} else if (!rc)
rc = err;
* objects above this ID, they will be removed. */
memset(&oa, 0, sizeof(oa));
oa.o_flags = OBD_FL_DELORPHAN;
- oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
+ oa.o_gr = mdt_to_obd_objgrp(mds->mds_id);
oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
if (ost_uuid != NULL)
oti.oti_ost_uuid = ost_uuid;
data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
#endif
data->ocd_version = LUSTRE_VERSION_CODE;
- data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0;
+ data->ocd_group = mdt_to_obd_objgrp(mds->mds_id);
/* NB: lov_connect() needs to fill in .ocd_index for each OST */
rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
OBD_FREE(data, sizeof(*data));
CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
GOTO(out, rc);
}
- mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
+ mgi.group = mdt_to_obd_objgrp(mds->mds_id);
mgi.uuid = uuid;
rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN),
ll_get_random_bytes(key->lk_key, sizeof(key->lk_key));
}
-enum {
- MDT_TXN_CAPA_KEYS_WRITE_CREDITS = 1
-};
-
static inline void lck_cpu_to_le(struct lustre_capa_key *tgt,
struct lustre_capa_key *src)
{
int i, rc;
mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-
- th = mdt_trans_start(env, mdt, MDT_TXN_CAPA_KEYS_WRITE_CREDITS);
+ mdt_trans_credit_init(env, mdt, MDT_TXN_CAPA_KEYS_WRITE_OP);
+ th = mdt_trans_start(env, mdt);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
}
static int mdt_stack_init(struct lu_env *env,
- struct mdt_device *m, struct lustre_cfg *cfg)
+ struct mdt_device *m,
+ struct lustre_cfg *cfg,
+ struct lustre_mount_info *lmi)
{
struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
struct lu_device *tmp;
struct md_device *md;
+ struct lu_device *child_lu_dev;
int rc;
ENTRY;
/* process setup config */
tmp = &m->mdt_md_dev.md_lu_dev;
rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg);
- GOTO(out, rc);
+ if (rc)
+ GOTO(out, rc);
+
+ /* initialize local objects */
+ child_lu_dev = &m->mdt_child->md_lu_dev;
+
+ rc = child_lu_dev->ld_ops->ldo_prepare(env,
+ &m->mdt_md_dev.md_lu_dev,
+ child_lu_dev);
out:
/* fini from last known good lu_device */
if (rc)
const char *num = lustre_cfg_string(cfg, 2);
struct lustre_mount_info *lmi = NULL;
struct lustre_sb_info *lsi;
+ struct lustre_disk_data *ldd;
struct lu_site *s;
struct md_site *mite;
const char *identity_upcall = "NONE";
struct md_device *next;
#endif
int rc;
+ int node_id;
ENTRY;
md_device_init(&m->mdt_md_dev, ldt);
} else {
lsi = s2lsi(lmi->lmi_sb);
fsoptions_to_mdt_flags(m, lsi->lsi_lmd->lmd_opts);
+ server_put_mount_2(dev, lmi->lmi_mnt);
+ /* CMD is supported only in IAM mode */
+ ldd = lsi->lsi_ldd;
+ LASSERT(num);
+ node_id = simple_strtol(num, NULL, 10);
+ if (!(ldd->ldd_flags & LDD_F_IAM_DIR) && node_id) {
+ CERROR("CMD Operation not allowed in IOP mode\n");
+ RETURN(-EINVAL);
+ }
}
rwlock_init(&m->mdt_sptlrpc_lock);
lprocfs_nid_stats_clear_write, obd, NULL);
/* set server index */
- LASSERT(num);
- lu_site2md(s)->ms_node_id = simple_strtol(num, NULL, 10);
+ lu_site2md(s)->ms_node_id = node_id;
/* failover is the default
* FIXME: we do not failout mds0/mgs, which may cause some problems.
- * assumed whose ls_node_id == 0 XXX
+ * assumed whose ms_node_id == 0 XXX
* */
obd->obd_replayable = 1;
/* No connection accepted until configurations will finish */
}
/* init the stack */
- rc = mdt_stack_init((struct lu_env *)env, m, cfg);
+ rc = mdt_stack_init((struct lu_env *)env, m, cfg, lmi);
if (rc) {
CERROR("Can't init device stack, rc %d\n", rc);
GOTO(err_fini_proc, rc);
if (rc)
GOTO(err_free_ns, rc);
- rc = mdt_fs_setup(env, m, obd);
+ rc = mdt_fs_setup(env, m, obd, lsi);
if (rc)
GOTO(err_capa, rc);
struct lprocfs_static_vars lvars;
struct obd_device *obd = d->ld_obd;
+ /*
+ * For interoperability between 1.8 and 2.0,
+ * skip old "mdt.group_upcall" param.
+ */
+ {
+ char *param = lustre_cfg_string(cfg, 1);
+ if (param && !strncmp("mdt.group_upcall", param, 16)) {
+ CWARN("For 1.8 interoperability, skip this"
+ " mdt.group_upcall. It is obsolete\n");
+ break;
+ }
+ }
+
lprocfs_mdt_init_vars(&lvars);
rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars,
cfg, obd);
static const struct lu_device_operations mdt_lu_ops = {
.ldo_object_alloc = mdt_object_alloc,
- .ldo_process_config = mdt_process_config
+ .ldo_process_config = mdt_process_config,
};
static const struct lu_object_operations mdt_obj_ops = {
.ldt_ctx_tags = LCT_MD_THREAD
};
+static struct lu_local_obj_desc mdt_last_recv = {
+ .llod_name = LAST_RCVD,
+ .llod_oid = MDT_LAST_RECV_OID,
+ .llod_is_index = 0,
+};
+
static int __init mdt_mod_init(void)
{
struct lprocfs_static_vars lvars;
int rc;
+ llo_local_obj_register(&mdt_last_recv);
+
mdt_num_threads = MDT_NUM_THREADS;
lprocfs_mdt_init_vars(&lvars);
rc = class_register_type(&mdt_obd_device_ops, NULL,
void *mdt_cb_data;
};
+enum mdt_txn_op {
+ MDT_TXN_CAPA_KEYS_WRITE_OP,
+ MDT_TXN_LAST_RCVD_WRITE_OP,
+};
+
+
/*
* Info allocated per-transaction.
*/
extern void target_recovery_init(struct obd_device *obd,
svc_handler_t handler);
int mdt_fs_setup(const struct lu_env *, struct mdt_device *,
- struct obd_device *);
+ struct obd_device *, struct lustre_sb_info *lsi);
void mdt_fs_cleanup(const struct lu_env *, struct mdt_device *);
int mdt_client_del(const struct lu_env *env,
int mdt_handle_last_unlink(struct mdt_thread_info *, struct mdt_object *,
const struct md_attr *);
void mdt_reconstruct_open(struct mdt_thread_info *, struct mdt_lock_handle *);
+
+void mdt_trans_credit_init(const struct lu_env *env,
+ struct mdt_device *mdt,
+ enum mdt_txn_op op);
struct thandle* mdt_trans_start(const struct lu_env *env,
- struct mdt_device *mdt, int credits);
+ struct mdt_device *mdt);
void mdt_trans_stop(const struct lu_env *env,
struct mdt_device *mdt, struct thandle *th);
int mdt_record_write(const struct lu_env *env,
* not exist.
*/
info->mti_spec.sp_cr_lookup = 0;
+ info->mti_spec.sp_feat = &dt_directory_features;
result = mdo_create(info->mti_env,
mdt_object_child(parent),
rc = -EFAULT;
return rc;
}
-/* only one record write */
-enum {
- MDT_TXN_LAST_RCVD_WRITE_CREDITS = 3
-};
+static inline int mdt_trans_credit_get(const struct lu_env *env,
+ struct mdt_device *mdt,
+ enum mdt_txn_op op)
+{
+ struct dt_device *dev = mdt->mdt_bottom;
+ int cr;
+ switch (op) {
+ case MDT_TXN_CAPA_KEYS_WRITE_OP:
+ case MDT_TXN_LAST_RCVD_WRITE_OP:
+ cr = dev->dd_ops->dt_credit_get(env,
+ dev,
+ DTO_WRITE_BLOCK);
+ break;
+ default:
+ LBUG();
+ }
+ return cr;
+}
+
+void mdt_trans_credit_init(const struct lu_env *env,
+ struct mdt_device *mdt,
+ enum mdt_txn_op op)
+{
+ struct mdt_thread_info *mti;
+ struct txn_param *p;
+ int cr;
+
+ mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ p = &mti->mti_txn_param;
+
+ cr = mdt_trans_credit_get(env, mdt, op);
+ txn_param_init(p, cr);
+}
struct thandle* mdt_trans_start(const struct lu_env *env,
- struct mdt_device *mdt, int credits)
+ struct mdt_device *mdt)
{
struct mdt_thread_info *mti;
struct txn_param *p;
mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
p = &mti->mti_txn_param;
- txn_param_init(p, credits);
/* export can require sync operations */
if (mti->mti_exp != NULL)
mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
+ mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
+ th = mdt_trans_start(env, mdt);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
}
static int mdt_server_data_init(const struct lu_env *env,
- struct mdt_device *mdt)
+ struct mdt_device *mdt,
+ struct lustre_sb_info *lsi)
{
struct lr_server_data *lsd = &mdt->mdt_lsd;
struct lsd_client_data *lcd = NULL;
struct mdt_thread_info *mti;
struct dt_object *obj;
struct lu_attr *la;
+ struct lustre_disk_data *ldd;
unsigned long last_rcvd_size;
__u64 mount_count;
int rc;
}
mount_count = lsd->lsd_mount_count;
+ ldd = lsi->lsi_ldd;
+
+ if (ldd->ldd_flags & LDD_F_IAM_DIR)
+ lsd->lsd_feature_incompat |= OBD_INCOMPAT_IAM_DIR;
+
lsd->lsd_feature_compat = OBD_COMPAT_MDT;
+ lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
spin_lock(&mdt->mdt_transno_lock);
mdt->mdt_last_transno = lsd->lsd_last_transno;
LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
/* write new client data */
off = med->med_lr_off;
- th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
+ mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
+ th = mdt_trans_start(env, mdt);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
* mdt->mdt_last_rcvd may be NULL that time.
*/
if (mdt->mdt_last_rcvd != NULL) {
- th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
+ mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
+ th = mdt_trans_start(env, mdt);
if (IS_ERR(th))
GOTO(free, rc = PTR_ERR(th));
static int mdt_txn_start_cb(const struct lu_env *env,
struct txn_param *param, void *cookie)
{
- param->tp_credits += MDT_TXN_LAST_RCVD_WRITE_CREDITS;
+ struct mdt_device *mdt = cookie;
+
+ param->tp_credits += mdt_trans_credit_get(env, mdt,
+ MDT_TXN_LAST_RCVD_WRITE_OP);
return 0;
}
}
int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
- struct obd_device *obd)
+ struct obd_device *obd,
+ struct lustre_sb_info *lsi)
{
struct lu_fid fid;
struct dt_object *o;
dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
- o = dt_store_open(env, mdt->mdt_bottom, LAST_RCVD, &fid);
+ o = dt_store_open(env, mdt->mdt_bottom, "", LAST_RCVD, &fid);
if (!IS_ERR(o)) {
mdt->mdt_last_rcvd = o;
- rc = mdt_server_data_init(env, mdt);
+ rc = mdt_server_data_init(env, mdt, lsi);
if (rc)
GOTO(put_last_rcvd, rc);
} else {
RETURN(rc);
}
- o = dt_store_open(env, mdt->mdt_bottom, CAPA_KEYS, &fid);
+ o = dt_store_open(env, mdt->mdt_bottom, "", CAPA_KEYS, &fid);
if (!IS_ERR(o)) {
mdt->mdt_ck_obj = o;
rc = mdt_capa_keys_init(env, mdt);
* or not.
*/
info->mti_spec.sp_cr_lookup = 1;
+ info->mti_spec.sp_feat = &dt_directory_features;
lname = mdt_name(info->mti_env, (char *)rr->rr_name,
rr->rr_namelen);
obdclass-all-objs += lu_object.o dt_object.o hash.o capa.o lu_time.o
obdclass-all-objs += cl_object.o cl_page.o cl_lock.o cl_io.o lu_ref.o
obdclass-all-objs += acl.o idmap.o
+obdclass-all-objs += md_local_object.o
obdclass-objs := $(obdclass-linux-objs) $(obdclass-all-objs)
/* fid_be_to_cpu() */
#include <lustre_fid.h>
+struct dt_find_hint {
+ struct lu_fid *dfh_fid;
+ struct dt_device *dfh_dt;
+ struct dt_object *dfh_o;
+};
+
+struct dt_thread_info {
+ char dti_buf[DT_MAX_PATH];
+ struct lu_fid_pack dti_pack;
+ struct dt_find_hint dti_dfh;
+};
+
+/* context key constructor/destructor: dt_global_key_init, dt_global_key_fini */
+LU_KEY_INIT(dt_global, struct dt_thread_info);
+LU_KEY_FINI(dt_global, struct dt_thread_info);
+
+static struct lu_context_key dt_key = {
+ .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD,
+ .lct_init = dt_global_key_init,
+ .lct_fini = dt_global_key_fini
+};
+
/* no lock is necessary to protect the list, because call-backs
* are added during system startup. Please refer to "struct dt_device".
*/
}
EXPORT_SYMBOL(dt_try_as_dir);
-extern struct lu_context_key lu_global_key;
+enum dt_format_type dt_mode_to_dft(__u32 mode)
+{
+ enum dt_format_type result;
+
+ switch (mode & S_IFMT) {
+ case S_IFDIR:
+ result = DFT_DIR;
+ break;
+ case S_IFREG:
+ result = DFT_REGULAR;
+ break;
+ case S_IFLNK:
+ result = DFT_SYM;
+ break;
+ case S_IFCHR:
+ case S_IFBLK:
+ case S_IFIFO:
+ case S_IFSOCK:
+ result = DFT_NODE;
+ break;
+ default:
+ LBUG();
+ break;
+ }
+ return result;
+}
+
+EXPORT_SYMBOL(dt_mode_to_dft);
+/**
+ * lookup fid for object named \a name in directory \a dir.
+ */
static int dt_lookup(const struct lu_env *env, struct dt_object *dir,
const char *name, struct lu_fid *fid)
{
- struct lu_fid_pack *pack = lu_context_key_get(&env->le_ctx,
- &lu_global_key);
+ struct dt_thread_info *info = lu_context_key_get(&env->le_ctx,
+ &dt_key);
+ struct lu_fid_pack *pack = &info->dti_pack;
struct dt_rec *rec = (struct dt_rec *)pack;
const struct dt_key *key = (const struct dt_key *)name;
int result;
if (dt_try_as_dir(env, dir)) {
result = dir->do_index_ops->dio_lookup(env, dir, rec, key,
BYPASS_CAPA);
- if (result == 0)
+ if (result > 0)
result = fid_unpack(pack, fid);
+ else if (result == 0)
+ result = -ENOENT;
} else
result = -ENOTDIR;
return result;
}
-static struct dt_object *dt_locate(const struct lu_env *env,
- struct dt_device *dev,
- const struct lu_fid *fid)
+/**
+ * get object for given \a fid.
+ */
+struct dt_object *dt_locate(const struct lu_env *env,
+ struct dt_device *dev,
+ const struct lu_fid *fid)
{
struct lu_object *obj;
struct dt_object *dt;
LASSERT(obj != NULL);
dt = container_of(obj, struct dt_object, do_lu);
} else
- dt = (void *)obj;
+ dt = (struct dt_object *)obj;
return dt;
}
+EXPORT_SYMBOL(dt_locate);
-struct dt_object *dt_store_open(const struct lu_env *env,
- struct dt_device *dt, const char *name,
- struct lu_fid *fid)
+/**
+ * find a object named \a entry in given \a dfh->dfh_o directory.
+ */
+static int dt_find_entry(const struct lu_env *env, const char *entry, void *data)
{
+ struct dt_find_hint *dfh = data;
+ struct dt_device *dt = dfh->dfh_dt;
+ struct lu_fid *fid = dfh->dfh_fid;
+ struct dt_object *obj = dfh->dfh_o;
+ int result;
+
+ result = dt_lookup(env, obj, entry, fid);
+ lu_object_put(env, &obj->do_lu);
+ if (result == 0) {
+ obj = dt_locate(env, dt, fid);
+ if (IS_ERR(obj))
+ result = PTR_ERR(obj);
+ }
+ dfh->dfh_o = obj;
+ return result;
+}
+
+/**
+ * Abstract function which parses path name. This function feeds
+ * path component to \a entry_func.
+ */
+int dt_path_parser(const struct lu_env *env,
+ char *path, dt_entry_func_t entry_func,
+ void *data)
+{
+ char *e;
+ int rc = 0;
+
+ while (1) {
+ e = strsep(&path, "/");
+ if (e == NULL)
+ break;
+
+ if (e[0] == 0) {
+ if (!path || path[0] == '\0')
+ break;
+ continue;
+ }
+ rc = entry_func(env, e, data);
+ if (rc)
+ break;
+ }
+
+ return rc;
+}
+
+static struct dt_object *dt_store_resolve(const struct lu_env *env,
+ struct dt_device *dt,
+ const char *path,
+ struct lu_fid *fid)
+{
+ struct dt_thread_info *info = lu_context_key_get(&env->le_ctx,
+ &dt_key);
+ struct dt_find_hint *dfh = &info->dti_dfh;
+ struct dt_object *obj = dfh->dfh_o;
+ char *local = info->dti_buf;
int result;
- struct dt_object *root;
- struct dt_object *child;
+ dfh->dfh_dt = dt;
+ dfh->dfh_fid = fid;
+
+ strncpy(local, path, DT_MAX_PATH);
+ local[DT_MAX_PATH - 1] = '\0';
result = dt->dd_ops->dt_root_get(env, dt, fid);
if (result == 0) {
- root = dt_locate(env, dt, fid);
- if (!IS_ERR(root)) {
- result = dt_lookup(env, root, name, fid);
- if (result == 0)
- child = dt_locate(env, dt, fid);
- else
- child = ERR_PTR(result);
- lu_object_put(env, &root->do_lu);
- } else {
- CERROR("No root\n");
- child = (void *)root;
+ obj = dt_locate(env, dt, fid);
+ if (!IS_ERR(obj)) {
+ dfh->dfh_o = obj;
+ result = dt_path_parser(env, local, dt_find_entry, dfh);
+ if (result != 0)
+ obj = ERR_PTR(result);
}
- } else
- child = ERR_PTR(result);
- return child;
+ } else {
+ obj = ERR_PTR(result);
+ }
+ return obj;
+}
+
+static struct dt_object *dt_reg_open(const struct lu_env *env,
+ struct dt_device *dt,
+ struct dt_object *p,
+ const char *name,
+ struct lu_fid *fid)
+{
+ struct dt_object *o;
+ int result;
+
+ result = dt_lookup(env, p, name, fid);
+ if (result == 0){
+ o = dt_locate(env, dt, fid);
+ }
+ else
+ o = ERR_PTR(result);
+
+ return o;
+}
+
+/**
+ * Open dt object named \a filename from \a dirname directory.
+ * \param dt dt device
+ * \param fid on success, object fid is stored in *fid
+ */
+struct dt_object *dt_store_open(const struct lu_env *env,
+ struct dt_device *dt,
+ const char *dirname,
+ const char *filename,
+ struct lu_fid *fid)
+{
+ struct dt_object *file;
+ struct dt_object *dir;
+
+ dir = dt_store_resolve(env, dt, dirname, fid);
+ if (!IS_ERR(dir)) {
+ file = dt_reg_open(env, dt, dir,
+ filename, fid);
+ lu_object_put(env, &dir->do_lu);
+ } else {
+ file = dir;
+ }
+ return file;
}
EXPORT_SYMBOL(dt_store_open);
+/* dt class init function. */
+int dt_global_init(void)
+{
+ int result;
+
+ LU_CONTEXT_KEY_INIT(&dt_key);
+ result = lu_context_key_register(&dt_key);
+ return result;
+}
+
+void dt_global_fini(void)
+{
+ lu_context_key_degister(&dt_key);
+}
+
const struct dt_index_features dt_directory_features;
EXPORT_SYMBOL(dt_directory_features);
int lu_ref_global_init(void);
void lu_ref_global_fini(void);
+int dt_global_init(void);
+void dt_global_fini(void);
+
+int llo_global_init(void);
+void llo_global_fini(void);
+
/**
* Initialization of global lu_* data.
*/
return -ENOMEM;
result = lu_time_global_init();
- if (result != 0)
- return result;
+ if (result)
+ GOTO(out, result);
+
+#ifdef __KERNEL__
+ result = dt_global_init();
+ if (result)
+ GOTO(out, result);
- return cl_global_init();
+ result = llo_global_init();
+ if (result)
+ GOTO(out, result);
+#endif
+ result = cl_global_init();
+out:
+
+ return result;
}
/**
void lu_global_fini(void)
{
cl_global_fini();
+#ifdef __KERNEL__
+ llo_global_fini();
+ dt_global_fini();
+#endif
lu_time_global_fini();
if (lu_site_shrinker != NULL) {
remove_shrinker(lu_site_shrinker);
}
}
EXPORT_SYMBOL(lu_kmem_fini);
-
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/obdclass/md_local_object.c
+ *
+ * Lustre Local Object create APIs
+ * 'create on first mount' facility. Files registed under llo module will
+ * be created on first mount.
+ *
+ * Author: Pravin Shelar <pravin.shelar@sun.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_CLASS
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#include <obd_support.h>
+#include <lustre_disk.h>
+#include <lustre_fid.h>
+#include <lu_object.h>
+#include <libcfs/list.h>
+#include <md_object.h>
+
+
+/** List head to hold list of objects to be created. */
+static struct list_head llo_lobj_list;
+
+/** Lock to protect list manipulations */
+static struct mutex llo_lock;
+
+/**
+ * Structure used to maintain state of path parsing.
+ * \see llo_find_entry, llo_store_resolve
+ */
+struct llo_find_hint {
+ struct lu_fid *lfh_cfid;
+ struct md_device *lfh_md;
+ struct md_object *lfh_pobj;
+};
+
+/**
+ * Thread Local storage for this module.
+ */
+struct llo_thread_info {
+ /** buffer to resolve path */
+ char lti_buf[DT_MAX_PATH];
+ /** used for path resolve */
+ struct lu_fid lti_fid;
+ /** used to pass child object fid */
+ struct lu_fid lti_cfid;
+ struct llo_find_hint lti_lfh;
+ struct md_op_spec lti_spc;
+ struct md_attr lti_ma;
+ struct lu_name lti_lname;
+};
+
+LU_KEY_INIT(llod_global, struct llo_thread_info);
+LU_KEY_FINI(llod_global, struct llo_thread_info);
+
+static struct lu_context_key llod_key = {
+ .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD,
+ .lct_init = llod_global_key_init,
+ .lct_fini = llod_global_key_fini
+};
+
+static inline struct llo_thread_info * llo_env_info(const struct lu_env *env)
+{
+ return lu_context_key_get(&env->le_ctx, &llod_key);
+}
+
+/**
+ * Search md object for given fid.
+ */
+static struct md_object *llo_locate(const struct lu_env *env,
+ struct md_device *md,
+ const struct lu_fid *fid)
+{
+ struct lu_object *obj;
+ struct md_object *mdo;
+
+ obj = lu_object_find(env, &md->md_lu_dev, fid, NULL);
+ if (!IS_ERR(obj)) {
+ obj = lu_object_locate(obj->lo_header, md->md_lu_dev.ld_type);
+ LASSERT(obj != NULL);
+ mdo = (struct md_object *) obj;
+ } else
+ mdo = (struct md_object *)obj;
+ return mdo;
+}
+
+/**
+ * Lookup FID for object named \a name in directory \a pobj.
+ */
+static int llo_lookup(const struct lu_env *env,
+ struct md_object *pobj,
+ const char *name,
+ struct lu_fid *fid)
+{
+ struct llo_thread_info *info = llo_env_info(env);
+ struct lu_name *lname = &info->lti_lname;
+ struct md_op_spec *spec = &info->lti_spc;
+
+ spec->sp_feat = NULL;
+ spec->sp_cr_flags = 0;
+ spec->sp_cr_lookup = 1;
+ spec->sp_cr_mode = 0;
+ spec->sp_ck_split = 0;
+
+ lname->ln_name = name;
+ lname->ln_namelen = strlen(name);
+
+ return mdo_lookup(env, pobj, lname, fid, spec);
+}
+
+/**
+ * Function to look up path component, this is passed to parsing
+ * function. \see llo_store_resolve
+ */
+static int llo_find_entry(const struct lu_env *env,
+ const char *name, void *data)
+{
+ struct llo_find_hint *lfh = data;
+ struct md_device *md = lfh->lfh_md;
+ struct lu_fid *fid = lfh->lfh_cfid;
+ struct md_object *obj = lfh->lfh_pobj;
+ int result;
+
+ /* lookup fid for object */
+ result = llo_lookup(env, obj, name, fid);
+ lu_object_put(env, &obj->mo_lu);
+
+ if (result == 0) {
+ /* get md object for fid that we got in lookup */
+ obj = llo_locate(env, md, fid);
+ if (IS_ERR(obj))
+ result = PTR_ERR(obj);
+ }
+
+ lfh->lfh_pobj = obj;
+ return result;
+}
+
+static struct md_object *llo_reg_open(const struct lu_env *env,
+ struct md_device *md,
+ struct md_object *p,
+ const char *name,
+ struct lu_fid *fid)
+{
+ struct md_object *o;
+ int result;
+
+ result = llo_lookup(env, p, name, fid);
+ if (result == 0)
+ o = llo_locate(env, md, fid);
+ else
+ o = ERR_PTR(result);
+
+ return o;
+}
+
+/**
+ * Resolve given \a path, on success function returns
+ * md object for last directory and \a fid points to
+ * its fid.
+ */
+struct md_object *llo_store_resolve(const struct lu_env *env,
+ struct md_device *md,
+ struct dt_device *dt,
+ const char *path,
+ struct lu_fid *fid)
+{
+ struct llo_thread_info *info = llo_env_info(env);
+ struct llo_find_hint *lfh = &info->lti_lfh;
+ char *local = info->lti_buf;
+ struct md_object *obj = lfh->lfh_pobj;
+ int result;
+
+ strncpy(local, path, DT_MAX_PATH);
+ local[DT_MAX_PATH - 1] = '\0';
+
+ lfh->lfh_md = md;
+ lfh->lfh_cfid = fid;
+ /* start path resolution from backend fs root. */
+ result = dt->dd_ops->dt_root_get(env, dt, fid);
+ if (result == 0) {
+ /* get md object for root */
+ obj = llo_locate(env, md, fid);
+ if (!IS_ERR(obj)) {
+ /* start path parser from root md */
+ lfh->lfh_pobj = obj;
+ result = dt_path_parser(env, local, llo_find_entry, lfh);
+ if (result != 0)
+ obj = ERR_PTR(result);
+ }
+ } else {
+ obj = ERR_PTR(result);
+ }
+ return obj;
+}
+EXPORT_SYMBOL(llo_store_resolve);
+
+/**
+ * Returns md object for \a objname in given \a dirname.
+ */
+struct md_object *llo_store_open(const struct lu_env *env,
+ struct md_device *md,
+ struct dt_device *dt,
+ const char *dirname,
+ const char *objname,
+ struct lu_fid *fid)
+{
+ struct md_object *obj;
+ struct md_object *dir;
+
+ /* search md object for parent dir */
+ dir = llo_store_resolve(env, md, dt, dirname, fid);
+ if (!IS_ERR(dir)) {
+ obj = llo_reg_open(env, md, dir, objname, fid);
+ lu_object_put(env, &dir->mo_lu);
+ } else
+ obj = dir;
+
+ return obj;
+}
+EXPORT_SYMBOL(llo_store_open);
+
+static struct md_object *llo_create_obj(const struct lu_env *env,
+ struct md_device *md,
+ struct md_object *dir,
+ const char *objname,
+ const struct lu_fid *fid,
+ const struct dt_index_features *feat)
+{
+ struct llo_thread_info *info = llo_env_info(env);
+ struct md_object *mdo;
+ struct md_attr *ma = &info->lti_ma;
+ struct md_op_spec *spec = &info->lti_spc;
+ struct lu_name *lname = &info->lti_lname;
+ struct lu_attr *la = &ma->ma_attr;
+ int rc;
+
+ mdo = llo_locate(env, md, fid);
+ if (IS_ERR(mdo))
+ return mdo;
+
+ lname->ln_name = objname;
+ lname->ln_namelen = strlen(objname);
+
+ spec->sp_feat = feat;
+ spec->sp_cr_flags = 0;
+ spec->sp_cr_lookup = 1;
+ spec->sp_cr_mode = 0;
+ spec->sp_ck_split = 0;
+
+ if (feat == &dt_directory_features)
+ la->la_mode = S_IFDIR;
+ else
+ la->la_mode = S_IFREG;
+
+ la->la_mode |= S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
+ la->la_uid = la->la_gid = 0;
+ la->la_valid = LA_MODE | LA_UID | LA_GID;
+
+ ma->ma_valid = 0;
+ ma->ma_need = 0;
+
+ rc = mdo_create(env, dir, lname, mdo, spec, ma);
+
+ if (rc) {
+ lu_object_put(env, &mdo->mo_lu);
+ mdo = ERR_PTR(rc);
+ }
+
+ return mdo;
+}
+
+/**
+ * Create md object, object could be diretcory or
+ * special index defined by \a feat in \a directory.
+ *
+ * \param md device
+ * \param dir parent directory
+ * \param objname file name
+ * \param fid object fid
+ * \param feat index features required for directory create
+ */
+
+struct md_object *llo_store_create_index(const struct lu_env *env,
+ struct md_device *md,
+ struct dt_device *dt,
+ const char *dirname,
+ const char *objname,
+ const struct lu_fid *fid,
+ const struct dt_index_features *feat)
+{
+ struct llo_thread_info *info = llo_env_info(env);
+ struct md_object *obj;
+ struct md_object *dir;
+ struct lu_fid *ignore = &info->lti_fid;
+
+ dir = llo_store_resolve(env, md, dt, dirname, ignore);
+ if (!IS_ERR(dir)) {
+ obj = llo_create_obj(env, md, dir, objname, fid, feat);
+ lu_object_put(env, &dir->mo_lu);
+ } else {
+ obj = dir;
+ }
+ return obj;
+}
+
+EXPORT_SYMBOL(llo_store_create_index);
+
+/**
+ * Create md object for regular file in \a directory.
+ *
+ * \param md device
+ * \param dir parent directory
+ * \param objname file name
+ * \param fid object fid.
+ */
+
+struct md_object *llo_store_create(const struct lu_env *env,
+ struct md_device *md,
+ struct dt_device *dt,
+ const char *dirname,
+ const char *objname,
+ const struct lu_fid *fid)
+{
+ return llo_store_create_index(env, md, dt, dirname,
+ objname, fid, NULL);
+}
+
+EXPORT_SYMBOL(llo_store_create);
+
+/**
+ * Register object for 'create on first mount' facility.
+ */
+
+int llo_local_obj_register(struct lu_local_obj_desc *llod)
+{
+ mutex_lock(&llo_lock);
+ list_add(&llod->llod_linkage, &llo_lobj_list);
+ mutex_unlock(&llo_lock);
+
+ return 0;
+}
+
+EXPORT_SYMBOL(llo_local_obj_register);
+
+/**
+ * Created registed objects.
+ */
+
+int llo_local_objects_setup(const struct lu_env *env,
+ struct md_device * md,
+ struct dt_device *dt)
+{
+ struct llo_thread_info *info = llo_env_info(env);
+ struct lu_fid *fid;
+ struct lu_local_obj_desc *scan;
+ struct md_object *mdo;
+ int rc = 0;
+
+ fid = &info->lti_cfid;
+
+ mutex_lock(&llo_lock);
+
+ list_for_each_entry(scan, &llo_lobj_list, llod_linkage) {
+
+ lu_local_obj_fid(fid, scan->llod_oid);
+
+ if (scan->llod_is_index)
+ mdo = llo_store_create_index(env, md, dt ,
+ "", scan->llod_name,
+ fid,
+ scan->llod_feat);
+ else
+ mdo = llo_store_create(env, md, dt,
+ "", scan->llod_name,
+ fid);
+ if (IS_ERR(mdo) && PTR_ERR(mdo) != -EEXIST) {
+ rc = PTR_ERR(mdo);
+ CERROR("creating obj [%s] fid = "DFID" rc = %d\n",
+ scan->llod_name, PFID(fid), rc);
+ goto out;
+ }
+
+ if (!IS_ERR(mdo))
+ lu_object_put(env, &mdo->mo_lu);
+ }
+
+out:
+ mutex_unlock(&llo_lock);
+ return rc;
+}
+
+EXPORT_SYMBOL(llo_local_objects_setup);
+
+int llo_global_init(void)
+{
+ int result;
+
+ CFS_INIT_LIST_HEAD(&llo_lobj_list);
+ mutex_init(&llo_lock);
+
+ LU_CONTEXT_KEY_INIT(&llod_key);
+ result = lu_context_key_register(&llod_key);
+ return result;
+}
+
+void llo_global_fini(void)
+{
+ lu_context_key_degister(&llod_key);
+}
break;
}
+ /*
+ * For interoperability between 1.8 and 2.0,
+ * rename "mds" obd device type to "mdt".
+ */
+ {
+ char *typename = lustre_cfg_string(lcfg, 1);
+ char *index = lustre_cfg_string(lcfg, 2);
+
+ if ((lcfg->lcfg_command == LCFG_ATTACH && typename &&
+ strcmp(typename, "mds") == 0)) {
+ CWARN("For 1.8 interoperability, rename obd "
+ "type from mds to mdt\n");
+ typename[2] = 't';
+ }
+ if ((lcfg->lcfg_command == LCFG_SETUP && index &&
+ strcmp(index, "type") == 0)) {
+ CWARN("For 1.8 interoperability, set this"
+ " index to '0'\n");
+ index[0] = '0';
+ index[1] = 0;
+ }
+ }
+
if ((clli->cfg_flags & CFG_F_EXCLUDE) &&
(lcfg->lcfg_command == LCFG_LOV_ADD_OBD))
/* Add inactive instead */
#include "filter_internal.h"
/* Group 0 is no longer a legal group, to catch uninitialized IDs */
-#define FILTER_MIN_GROUPS FILTER_GROUP_MDS0
+#define FILTER_MIN_GROUPS FILTER_GROUP_MDS1_N_BASE
static struct lvfs_callback_ops filter_lvfs_ops;
cfs_mem_cache_t *ll_fmd_cachep;
CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n",rc);
GOTO(cleanup, rc);
}
- LASSERT(off == 0 || last_group >= FILTER_MIN_GROUPS);
+ LASSERTF(off == 0 || CHECK_MDS_GROUP(last_group),
+ "off = %llu and last_group = %d\n", off, last_group);
+
CDEBUG(D_INODE, "%s: previous %d, new %d\n",
obd->obd_name, last_group, group);
down(&filter->fo_init_lock);
old_count = filter->fo_group_count;
for (group = old_count; group <= last_group; group++) {
- if (group == 0)
- continue; /* no group zero */
rc = filter_read_group_internal(obd, group, create);
if (rc != 0)
if (off == 0) {
last_group = FILTER_MIN_GROUPS;
} else {
- LASSERT(last_group >= FILTER_MIN_GROUPS);
+ LASSERT_MDS_GROUP(last_group);
}
CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name,
if (rc)
CERROR("error writing server data: rc = %d\n", rc);
- for (i = 1; i < filter->fo_group_count; i++) {
+ for (i = 0; i < filter->fo_group_count; i++) {
rc = filter_update_last_objid(obd, i,
(i == filter->fo_group_count - 1));
if (rc)
spin_lock(&filter->fo_objidlock);
id = filter->fo_last_objids[group];
spin_unlock(&filter->fo_objidlock);
-
return id;
}
struct filter_subdirs *subdirs;
LASSERT(group < filter->fo_group_count); /* FIXME: object groups */
- if ((group > 0 && group < FILTER_GROUP_MDS0) ||
+ if ((group > FILTER_GROUP_MDS0 && group < FILTER_GROUP_MDS1_N_BASE) ||
filter->fo_subdir_count == 0)
return filter->fo_dentry_O_groups[group];
}
group = data->ocd_group;
- if (group == 0)
- GOTO(cleanup, rc);
CWARN("%s: Received MDS connection ("LPX64"); group %d\n",
obd->obd_name, exp->exp_handle.h_cookie, group);
{
struct obd_llog_group *olg_min, *olg;
struct filter_obd *filter;
- int worked = 0, group;
+ int worked = -1, group;
struct llog_ctxt *ctxt;
ENTRY;
ENTRY;
LASSERT(oa);
- LASSERT(oa->o_gr != 0);
+ LASSERT_MDS_GROUP(oa->o_gr);
LASSERT(oa->o_valid & OBD_MD_FLGROUP);
LASSERT(down_trylock(&filter->fo_create_locks[oa->o_gr]) != 0);
obd->obd_name);
GOTO(out, rc = 0);
}
- /* only precreate if group == 0 and o_id is specified */
- if (group < FILTER_GROUP_MDS0 || oa->o_id == 0)
+ /* only precreate if group == 0 and o_id is specfied */
+ if (group == FILTER_GROUP_LLOG || oa->o_id == 0)
diff = 1;
else
diff = oa->o_id - filter_last_id(filter, group);
CDEBUG(D_INODE, "%s: filter_create(od->o_gr="LPU64",od->o_id="
LPU64")\n", obd->obd_name, oa->o_gr, oa->o_id);
- if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) {
+ if (!(oa->o_valid & OBD_MD_FLGROUP)) {
CERROR("!!! nid %s sent invalid object group %d\n",
obd_export_nid2str(exp), group);
RETURN(-EINVAL);
RETURN(-EINVAL);
}
+static inline int filter_setup_llog_group(struct obd_export *exp,
+ struct obd_device *obd,
+ int group)
+{
+ struct obd_llog_group *olg;
+ struct llog_ctxt *ctxt;
+ int rc;
+
+ olg = filter_find_create_olg(obd, group);
+ if (IS_ERR(olg))
+ RETURN(PTR_ERR(olg));
+
+ llog_group_set_export(olg, exp);
+
+ ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
+ LASSERTF(ctxt != NULL, "ctxt is null\n");
+
+ rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+ llog_ctxt_put(ctxt);
+ return rc;
+}
static int filter_set_info_async(struct obd_export *exp, __u32 keylen,
void *key, __u32 vallen, void *val,
struct ptlrpc_request_set *set)
{
struct obd_device *obd;
- struct obd_llog_group *olg;
- struct llog_ctxt *ctxt;
int rc = 0, group;
ENTRY;
/* setup llog imports */
LASSERT(val != NULL);
- group = (int)(*(__u32 *)val);
- LASSERT(group >= FILTER_GROUP_MDS0);
-
- olg = filter_find_create_olg(obd, group);
- if (IS_ERR(olg))
- RETURN(PTR_ERR(olg));
-
- llog_group_set_export(olg, exp);
- ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT);
- LASSERTF(ctxt != NULL, "ctxt is null\n");
-
- rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
- llog_ctxt_put(ctxt);
+ group = (int)(*(__u32 *)val);
+ LASSERT_MDS_GROUP(group);
+ rc = filter_setup_llog_group(exp, obd, group);
+ if (rc)
+ goto out;
lquota_setinfo(filter_quota_interface_ref, obd, exp);
+ if (group == FILTER_GROUP_MDS0) {
+ /* setup llog group 1 for interop */
+ filter_setup_llog_group(exp, obd, FILTER_GROUP_LLOG);
+ }
+out:
RETURN(rc);
}
/* Quota stuff */
extern quota_interface_t *filter_quota_interface_ref;
-/* Capability */
-static inline __u64 obdo_mdsno(struct obdo *oa)
-{
- return oa->o_gr - FILTER_GROUP_MDS0;
-}
-
int filter_update_capa_key(struct obd_device *obd, struct lustre_capa_key *key);
int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid,
struct lustre_capa *capa, __u64 opc);
if (obd == NULL)
return 0;
+ rc = snprintf(page, count, LPU64"\n",filter_last_id(filter, 0));
+ if (rc < 0)
+ return rc;
+ page += rc;
+ count -= rc;
+ retval += rc;
- for (i = FILTER_GROUP_MDS0; i < filter->fo_group_count; i++) {
+ for (i = FILTER_GROUP_MDS1_N_BASE + 1; i < filter->fo_group_count; i++) {
rc = snprintf(page, count, LPU64"\n",filter_last_id(filter, i));
if (rc < 0) {
retval = rc;
spin_lock(&oscc->oscc_lock);
body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
body->oa.o_gr = oscc->oscc_oa.o_gr;
- LASSERT(body->oa.o_gr > 0);
+ LASSERT_MDS_GROUP(body->oa.o_gr);
body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
spin_unlock(&oscc->oscc_lock);
CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n",
LASSERT(oa);
LASSERT(ea);
- LASSERT(oa->o_gr > 0);
+ LASSERT_MDS_GROUP(oa->o_gr);
LASSERT(oa->o_valid & OBD_MD_FLGROUP);
if ((oa->o_valid & OBD_MD_FLFLAGS) &&
if (lsm) {
LASSERT(lsm->lsm_object_id);
- LASSERT(lsm->lsm_object_gr);
+ LASSERT_MDS_GROUP(lsm->lsm_object_gr);
(*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
(*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
}
(*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
(*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
LASSERT((*lsmp)->lsm_object_id);
- LASSERT((*lsmp)->lsm_object_gr);
+ LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
}
(*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
int rc;
ENTRY;
- LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
- oinfo->oi_oa->o_gr > 0);
+ LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
+ CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
+ "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
+ oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
if (req == NULL)
oscc->oscc_oa.o_gr = (*(__u32 *)val);
oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
- LASSERT(oscc->oscc_oa.o_gr > 0);
+ LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
}
#include "osd_internal.h"
#include "osd_igif.h"
+/* llo_* api support */
+#include <md_object.h>
+
+static const char MDT_XATTR_NAME[] = "trusted.lma";
+static const char dot[] = ".";
+static const char dotdot[] = "..";
+static const char remote_obj_dir[] = "REM_OBJ_DIR";
+
struct osd_directory {
struct iam_container od_container;
struct iam_descr od_descr;
struct osd_directory *oo_dir;
/** protects inode attributes. */
spinlock_t oo_guard;
+ /**
+ * Following two members are used to indicate the presence of dot and
+ * dotdot in the given directory. This is required for interop mode
+ * (b11826).
+ */
+ int oo_compat_dot_created;
+ int oo_compat_dotdot_created;
+
const struct lu_env *oo_owner;
#ifdef CONFIG_LOCKDEP
struct lockdep_map oo_dep_map;
struct inode *inode, const struct lu_attr *attr);
static int osd_param_is_sane (const struct osd_device *dev,
const struct txn_param *param);
-static int osd_index_lookup (const struct lu_env *env,
- struct dt_object *dt,
- struct dt_rec *rec, const struct dt_key *key,
- struct lustre_capa *capa);
-static int osd_index_insert (const struct lu_env *env,
- struct dt_object *dt,
- const struct dt_rec *rec,
- const struct dt_key *key,
- struct thandle *handle,
- struct lustre_capa *capa,
- int ingore_quota);
-static int osd_index_delete (const struct lu_env *env,
- struct dt_object *dt, const struct dt_key *key,
- struct thandle *handle,
- struct lustre_capa *capa);
-static int osd_index_probe (const struct lu_env *env,
- struct osd_object *o,
- const struct dt_index_features *feat);
+static int osd_index_iam_lookup(const struct lu_env *env,
+ struct dt_object *dt,
+ struct dt_rec *rec, const struct dt_key *key,
+ struct lustre_capa *capa);
+static int osd_index_ea_lookup(const struct lu_env *env,
+ struct dt_object *dt,
+ struct dt_rec *rec, const struct dt_key *key,
+ struct lustre_capa *capa);
+static int osd_index_iam_insert(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_rec *rec,
+ const struct dt_key *key,
+ struct thandle *handle,
+ struct lustre_capa *capa,
+ int ingore_quota);
+static int osd_index_ea_insert (const struct lu_env *env,
+ struct dt_object *dt,
+ const struct dt_rec *rec,
+ const struct dt_key *key,
+ struct thandle *handle,
+ struct lustre_capa *capa,
+ int ingore_quota);
+static int osd_index_iam_delete(const struct lu_env *env,
+ struct dt_object *dt, const struct dt_key *key,
+ struct thandle *handle,
+ struct lustre_capa *capa);
+static int osd_index_ea_delete (const struct lu_env *env,
+ struct dt_object *dt, const struct dt_key *key,
+ struct thandle *handle,
+ struct lustre_capa *capa);
+
+static int osd_iam_index_probe (const struct lu_env *env,
+ struct osd_object *o,
+ const struct dt_index_features *feat);
static int osd_index_try (const struct lu_env *env,
struct dt_object *dt,
const struct dt_index_features *feat);
static void osd_index_fini (struct osd_object *o);
-static void osd_it_fini (const struct lu_env *env, struct dt_it *di);
-static int osd_it_get (const struct lu_env *env,
+static void osd_it_iam_fini (const struct lu_env *env, struct dt_it *di);
+static int osd_it_iam_get (const struct lu_env *env,
+ struct dt_it *di, const struct dt_key *key);
+static void osd_it_iam_put (const struct lu_env *env, struct dt_it *di);
+static int osd_it_iam_next (const struct lu_env *env, struct dt_it *di);
+static int osd_it_iam_key_size (const struct lu_env *env,
+ const struct dt_it *di);
+static void osd_it_ea_fini (const struct lu_env *env, struct dt_it *di);
+static int osd_it_ea_get (const struct lu_env *env,
struct dt_it *di, const struct dt_key *key);
-static void osd_it_put (const struct lu_env *env, struct dt_it *di);
-static int osd_it_next (const struct lu_env *env, struct dt_it *di);
-static int osd_it_del (const struct lu_env *env, struct dt_it *di,
- struct thandle *th);
-static int osd_it_key_size (const struct lu_env *env,
+static void osd_it_ea_put (const struct lu_env *env, struct dt_it *di);
+static int osd_it_ea_next (const struct lu_env *env, struct dt_it *di);
+static int osd_it_ea_key_size(const struct lu_env *env,
const struct dt_it *di);
+
static void osd_conf_get (const struct lu_env *env,
const struct dt_device *dev,
struct dt_device_param *param);
struct osd_device *dev,
const struct osd_inode_id *id);
static struct super_block *osd_sb (const struct osd_device *dev);
-static struct dt_it *osd_it_init (const struct lu_env *env,
- struct dt_object *dt, int wable,
+static struct dt_it *osd_it_iam_init (const struct lu_env *env,
+ struct dt_object *dt,
+ struct lustre_capa *capa);
+static struct dt_key *osd_it_iam_key (const struct lu_env *env,
+ const struct dt_it *di);
+static struct dt_rec *osd_it_iam_rec (const struct lu_env *env,
+ const struct dt_it *di);
+static struct dt_it *osd_it_ea_init (const struct lu_env *env,
+ struct dt_object *dt,
struct lustre_capa *capa);
-static struct dt_key *osd_it_key (const struct lu_env *env,
+static struct dt_key *osd_it_ea_key (const struct lu_env *env,
const struct dt_it *di);
-static struct dt_rec *osd_it_rec (const struct lu_env *env,
+static struct dt_rec *osd_it_ea_rec (const struct lu_env *env,
const struct dt_it *di);
+
static struct timespec *osd_inode_time (const struct lu_env *env,
struct inode *inode,
__u64 seconds);
struct txn_param *p);
static journal_t *osd_journal (const struct osd_device *dev);
+static int __osd_ea_add_rec(struct osd_thread_info *info,
+ struct osd_object *pobj,
+ struct osd_object *cobj,
+ const char *name,
+ struct thandle *th);
+
static const struct lu_device_type_operations osd_device_type_ops;
static struct lu_device_type osd_device_type;
static const struct lu_object_operations osd_lu_obj_ops;
static const struct lu_device_operations osd_lu_ops;
static struct lu_context_key osd_key;
static const struct dt_object_operations osd_obj_ops;
+static const struct dt_object_operations osd_obj_ea_ops;
static const struct dt_body_operations osd_body_ops;
-static const struct dt_index_operations osd_index_ops;
-static const struct dt_index_operations osd_index_compat_ops;
+static const struct dt_index_operations osd_index_iam_ops;
+static const struct dt_index_operations osd_index_ea_ops;
struct osd_thandle {
struct thandle ot_super;
l = &mo->oo_dt.do_lu;
dt_object_init(&mo->oo_dt, NULL, d);
- mo->oo_dt.do_ops = &osd_obj_ops;
+ if (osd_dev(d)->od_iop_mode)
+ mo->oo_dt.do_ops = &osd_obj_ea_ops;
+ else
+ mo->oo_dt.do_ops = &osd_obj_ops;
+
l->lo_ops = &osd_lu_obj_ops;
init_rwsem(&mo->oo_sem);
spin_lock_init(&mo->oo_guard);
OBD_FREE_PTR(obj);
}
-static struct iam_path_descr *osd_ipd_get(const struct lu_env *env,
- const struct iam_container *bag)
+static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env,
+ const struct iam_container *bag)
+{
+ return bag->ic_descr->id_ops->id_ipd_alloc(bag,
+ osd_oti_get(env)->oti_it_ipd);
+}
+
+static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env,
+ const struct iam_container *bag)
{
return bag->ic_descr->id_ops->id_ipd_alloc(bag,
- osd_oti_get(env)->oti_ipd);
+ osd_oti_get(env)->oti_idx_ipd);
}
static void osd_ipd_put(const struct lu_env *env,
/*
* If object is unlinked remove fid->ino mapping from object index.
- *
- * File body will be deleted by iput().
*/
osd_index_fini(obj);
"Failed to cleanup: %d\n",
result);
}
+
iput(inode);
obj->oo_inode = NULL;
}
param->ddp_block_shift = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
}
+/**
+ * Helper function to get and fill the buffer with input values.
+ */
+static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len)
+{
+ struct lu_buf *buf;
+
+ buf = &osd_oti_get(env)->oti_buf;
+ buf->lb_buf = area;
+ buf->lb_len = len;
+ return buf;
+}
+
/*
* Journal
*/
EXIT;
}
+
/*
* Concurrency: serialization provided by callers.
*/
extern struct inode *ldiskfs_create_inode(handle_t *handle,
struct inode * dir, int mode);
+extern int ldiskfs_add_entry(handle_t *handle, struct dentry *dentry,
+ struct inode *inode);
+extern int ldiskfs_delete_entry(handle_t *handle,
+ struct inode * dir,
+ struct ldiskfs_dir_entry_2 * de_del,
+ struct buffer_head * bh);
+extern struct buffer_head * ldiskfs_find_entry(struct dentry *dentry,
+ struct ldiskfs_dir_entry_2
+ ** res_dir);
+extern int ldiskfs_add_dot_dotdot(handle_t *handle, struct inode *dir,
+ struct inode *inode);
+
+extern int ldiskfs_xattr_set_handle(handle_t *handle, struct inode *inode,
+ int name_index, const char *name,
+ const void *value, size_t value_len,
+ int flags);
+
+static struct dentry * osd_child_dentry_get(const struct lu_env *env,
+ struct osd_object *obj,
+ const char *name,
+ const int namelen)
+{
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct dentry *child_dentry = &info->oti_child_dentry;
+ struct dentry *obj_dentry = &info->oti_obj_dentry;
+
+ obj_dentry->d_inode = obj->oo_inode;
+ obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
+ obj_dentry->d_name.hash = 0;
+
+ child_dentry->d_name.hash = 0;
+ child_dentry->d_parent = obj_dentry;
+ child_dentry->d_name.name = name;
+ child_dentry->d_name.len = namelen;
+ return child_dentry;
+}
+
static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
umode_t mode,
int result;
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oth;
- struct inode *parent;
+ struct dt_object *parent;
struct inode *inode;
#ifdef HAVE_QUOTA_SUPPORT
struct osd_ctxt *save = &info->oti_ctxt;
LINVRNT(osd_invariant(obj));
LASSERT(obj->oo_inode == NULL);
- LASSERT(osd->od_obj_area != NULL);
oth = container_of(th, struct osd_thandle, ot_super);
LASSERT(oth->ot_handle->h_transaction != NULL);
if (hint && hint->dah_parent)
- parent = osd_dt_obj(hint->dah_parent)->oo_inode;
+ parent = hint->dah_parent;
else
- parent = osd->od_obj_area->d_inode;
- LASSERT(parent->i_op != NULL);
+ parent = osd->od_obj_area;
+
+ LASSERT(parent != NULL);
+ LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
#ifdef HAVE_QUOTA_SUPPORT
osd_push_ctxt(info->oti_env, save);
#endif
- inode = ldiskfs_create_inode(oth->ot_handle, parent, mode);
+ inode = ldiskfs_create_inode(oth->ot_handle,
+ osd_dt_obj(parent)->oo_inode, mode);
#ifdef HAVE_QUOTA_SUPPORT
osd_pop_ctxt(save);
#endif
extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize,
int recsize, handle_t *handle);
+extern int iam_lfix_create(struct inode *obj, int keysize, int ptrsize,
+ int recsize, handle_t *handle);
+
+
enum {
OSD_NAME_LEN = 255
};
static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
struct lu_attr *attr,
struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
struct thandle *th)
{
int result;
struct osd_thandle *oth;
+ struct osd_device *osd = osd_obj2dev(obj);
+ __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
LASSERT(S_ISDIR(attr->la_mode));
oth = container_of(th, struct osd_thandle, ot_super);
LASSERT(oth->ot_handle->h_transaction != NULL);
- result = osd_mkfile(info, obj, (attr->la_mode &
- (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th);
- if (result == 0) {
+ result = osd_mkfile(info, obj, mode, hint, th);
+ if (result == 0 && osd->od_iop_mode == 0) {
LASSERT(obj->oo_inode != NULL);
/*
* XXX uh-oh... call low-level iam function directly.
*/
+
result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4,
sizeof (struct lu_fid_pack),
oth->ot_handle);
return result;
}
+static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
+ struct lu_attr *attr,
+ struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
+ struct thandle *th)
+{
+ int result;
+ struct osd_thandle *oth;
+ const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
+
+ __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
+
+ LASSERT(S_ISREG(attr->la_mode));
+
+ oth = container_of(th, struct osd_thandle, ot_super);
+ LASSERT(oth->ot_handle->h_transaction != NULL);
+
+ result = osd_mkfile(info, obj, mode, hint, th);
+ if (result == 0) {
+ LASSERT(obj->oo_inode != NULL);
+ if (feat->dif_flags & DT_IND_VARKEY)
+ result = iam_lvar_create(obj->oo_inode,
+ feat->dif_keysize_max,
+ feat->dif_ptrsize,
+ feat->dif_recsize_max,
+ oth->ot_handle);
+ else
+ result = iam_lfix_create(obj->oo_inode,
+ feat->dif_keysize_max,
+ feat->dif_ptrsize,
+ feat->dif_recsize_max,
+ oth->ot_handle);
+
+ }
+ return result;
+}
+
static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
struct lu_attr *attr,
struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
struct thandle *th)
{
LASSERT(S_ISREG(attr->la_mode));
static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
struct lu_attr *attr,
struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
struct thandle *th)
{
LASSERT(S_ISLNK(attr->la_mode));
static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
struct lu_attr *attr,
struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
struct thandle *th)
{
- int result;
- struct osd_device *osd = osd_obj2dev(obj);
- struct inode *dir;
umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
+ int result;
LINVRNT(osd_invariant(obj));
LASSERT(obj->oo_inode == NULL);
- LASSERT(osd->od_obj_area != NULL);
LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
S_ISFIFO(mode) || S_ISSOCK(mode));
- dir = osd->od_obj_area->d_inode;
- LASSERT(dir->i_op != NULL);
-
result = osd_mkfile(info, obj, mode, hint, th);
if (result == 0) {
LASSERT(obj->oo_inode != NULL);
typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
struct lu_attr *,
struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
struct thandle *);
-static osd_obj_type_f osd_create_type_f(__u32 mode)
+static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
{
osd_obj_type_f result;
- switch (mode) {
- case S_IFDIR:
+ switch (type) {
+ case DFT_DIR:
result = osd_mkdir;
break;
- case S_IFREG:
+ case DFT_REGULAR:
result = osd_mkreg;
break;
- case S_IFLNK:
+ case DFT_SYM:
result = osd_mksym;
break;
- case S_IFCHR:
- case S_IFBLK:
- case S_IFIFO:
- case S_IFSOCK:
+ case DFT_NODE:
result = osd_mknod;
break;
+ case DFT_INDEX:
+ result = osd_mk_index;
+ break;
+
default:
LBUG();
break;
ah->dah_mode = child_mode;
}
+/**
+ * Helper function for osd_object_create()
+ *
+ * \retval 0, on success
+ */
+static int __osd_object_create(struct osd_thread_info *info,
+ struct osd_object *obj, struct lu_attr *attr,
+ struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
+ struct thandle *th)
+{
+
+ int result;
-/*
- * Concurrency: @dt is write locked.
+ result = osd_create_pre(info, obj, attr, th);
+ if (result == 0) {
+ result = osd_create_type_f(dof->dof_type)(info, obj,
+ attr, hint, dof, th);
+ if (result == 0)
+ result = osd_create_post(info, obj, attr, th);
+ }
+ return result;
+}
+
+/**
+ * Helper function for osd_object_create()
+ *
+ * \retval 0, on success
*/
+static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
+ const struct lu_fid *fid, struct thandle *th)
+{
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct osd_inode_id *id = &info->oti_id;
+ struct osd_device *osd = osd_obj2dev(obj);
+ struct md_ucred *uc = md_ucred(env);
+
+ LASSERT(obj->oo_inode != NULL);
+ LASSERT(uc != NULL);
+
+ id->oii_ino = obj->oo_inode->i_ino;
+ id->oii_gen = obj->oo_inode->i_generation;
+
+ return osd_oi_insert(info, &osd->od_oi, fid, id, th,
+ uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
+}
+
static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
struct lu_attr *attr,
struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
struct thandle *th)
{
- const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
- struct osd_object *obj = osd_dt_obj(dt);
- struct osd_device *osd = osd_obj2dev(obj);
- struct osd_thread_info *info = osd_oti_get(env);
+ const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_thread_info *info = osd_oti_get(env);
int result;
ENTRY;
LASSERT(osd_write_locked(env, obj));
LASSERT(th != NULL);
- /*
- * XXX missing: Quote handling.
- */
+ result = __osd_object_create(info, obj, attr, hint, dof, th);
+ if (result == 0)
+ result = __osd_oi_insert(env, obj, fid, th);
- result = osd_create_pre(info, obj, attr, th);
- if (result == 0) {
- result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj,
- attr, hint, th);
- if (result == 0)
- result = osd_create_post(info, obj, attr, th);
+ LASSERT(ergo(result == 0, dt_object_exists(dt)));
+ LASSERT(osd_invariant(obj));
+ RETURN(result);
+}
+
+/**
+ * Helper function for osd_xattr_set()
+ */
+static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name, int fl)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct inode *inode = obj->oo_inode;
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct dentry *dentry = &info->oti_child_dentry;
+ struct timespec *t = &info->oti_time;
+ int fs_flags = 0;
+ int rc;
+
+ LASSERT(dt_object_exists(dt));
+ LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
+ LASSERT(osd_write_locked(env, obj));
+
+ if (fl & LU_XATTR_REPLACE)
+ fs_flags |= XATTR_REPLACE;
+
+ if (fl & LU_XATTR_CREATE)
+ fs_flags |= XATTR_CREATE;
+
+ dentry->d_inode = inode;
+ *t = inode->i_ctime;
+ rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
+ buf->lb_len, fs_flags);
+ if (likely(rc == 0)) {
+ spin_lock(&obj->oo_guard);
+ inode->i_ctime = *t;
+ spin_unlock(&obj->oo_guard);
+ mark_inode_dirty(inode);
}
- if (result == 0) {
- struct osd_inode_id *id = &info->oti_id;
- struct md_ucred *uc = md_ucred(env);
+ return rc;
+}
- LASSERT(obj->oo_inode != NULL);
- LASSERT(uc != NULL);
+/**
+ * Put the fid into lustre_mdt_attrs, and then place the structure
+ * inode's ea. This fid should not be altered during the life time
+ * of the inode.
+ *
+ * \retval +ve, on success
+ * \retval -ve, on error
+ *
+ * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
+ */
+static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_fid *fid)
+{
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
+
+ fid_cpu_to_be(&mdt_attrs->lma_self_fid, fid);
+
+ return __osd_xattr_set(env, dt,
+ osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
+ MDT_XATTR_NAME, LU_XATTR_CREATE);
+
+}
+
+/**
+ * Helper function to form igif
+ */
+static inline void osd_igif_get(const struct lu_env *env, struct dentry *dentry,
+ struct lu_fid *fid)
+{
+ struct inode *inode = dentry->d_inode;
+ lu_igif_build(fid, inode->i_ino, inode->i_generation);
+}
+
+/**
+ * Helper function to pack the fid
+ */
+static inline void osd_fid_pack(const struct lu_env *env, const struct lu_fid *fid,
+ struct lu_fid_pack *pack)
+{
+ fid_pack(pack, fid, &osd_oti_get(env)->oti_fid);
+}
+
+/**
+ * Try to read the fid from inode ea into dt_rec, if return value
+ * i.e. rc is +ve, then we got fid, otherwise we will have to form igif
+ *
+ * \param rec, the data-structure into which fid/igif is read
+ *
+ * \retval 0, on success
+ */
+static int osd_ea_fid_get(const struct lu_env *env, struct dentry *dentry,
+ struct dt_rec *rec)
+{
+ struct inode *inode = dentry->d_inode;
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
+ struct lu_fid *fid = &info->oti_fid;
+ int rc;
+
+ LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
- id->oii_ino = obj->oo_inode->i_ino;
- id->oii_gen = obj->oo_inode->i_generation;
+ rc = inode->i_op->getxattr(dentry, MDT_XATTR_NAME, (void *)mdt_attrs,
+ sizeof *mdt_attrs);
- result = osd_oi_insert(info, &osd->od_oi, fid, id, th,
- uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
+ if (rc > 0) {
+ fid_be_to_cpu(fid, &mdt_attrs->lma_self_fid);
+ rc = 0;
+ } else if (rc == -ENODATA) {
+ osd_igif_get(env, dentry, fid);
+ rc = 0;
}
+ if (rc == 0)
+ osd_fid_pack(env, fid, (struct lu_fid_pack*)rec);
+
+ return rc;
+}
+
+/**
+ * OSD layer object create function for interoperability mode (b11826).
+ * This is mostly similar to osd_object_create(). Only difference being, fid is
+ * inserted into inode ea here.
+ *
+ * \retval 0, on success
+ * \retval -ve, on error
+ */
+static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
+ struct lu_attr *attr,
+ struct dt_allocation_hint *hint,
+ struct dt_object_format *dof,
+ struct thandle *th)
+{
+ const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_thread_info *info = osd_oti_get(env);
+ int result;
+ int is_root = 0;
+
+ ENTRY;
+
+ LASSERT(osd_invariant(obj));
+ LASSERT(!dt_object_exists(dt));
+ LASSERT(osd_write_locked(env, obj));
+ LASSERT(th != NULL);
+
+ result = __osd_object_create(info, obj, attr, hint, dof, th);
+
+ if (hint && hint->dah_parent)
+ is_root = osd_object_is_root(osd_dt_obj(hint->dah_parent));
+
+ /* objects under osd root shld have igif fid, so dont add fid EA */
+ if (result == 0 && is_root == 0)
+ result = osd_ea_fid_set(env, dt, fid);
+
+ if (result == 0)
+ result = __osd_oi_insert(env, obj, fid, th);
+
LASSERT(ergo(result == 0, dt_object_exists(dt)));
LINVRNT(osd_invariant(obj));
RETURN(result);
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
struct osd_thread_info *info = osd_oti_get(env);
- struct dentry *dentry = &info->oti_dentry;
+ struct dentry *dentry = &info->oti_obj_dentry;
LASSERT(dt_object_exists(dt));
LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len);
}
+
/*
* Concurrency: @dt is write locked.
*/
const struct lu_buf *buf, const char *name, int fl,
struct thandle *handle, struct lustre_capa *capa)
{
- struct osd_object *obj = osd_dt_obj(dt);
- struct inode *inode = obj->oo_inode;
- struct osd_thread_info *info = osd_oti_get(env);
- struct dentry *dentry = &info->oti_dentry;
- struct timespec *t = &info->oti_time;
- int fs_flags = 0, rc;
-
- LASSERT(dt_object_exists(dt));
- LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
- LASSERT(osd_write_locked(env, obj));
LASSERT(handle != NULL);
if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
return -EACCES;
- if (fl & LU_XATTR_REPLACE)
- fs_flags |= XATTR_REPLACE;
-
- if (fl & LU_XATTR_CREATE)
- fs_flags |= XATTR_CREATE;
-
- dentry->d_inode = inode;
- *t = inode->i_ctime;
- rc = inode->i_op->setxattr(dentry, name,
- buf->lb_buf, buf->lb_len, fs_flags);
- if (likely(rc == 0)) {
- /* ctime should not be updated with server-side time. */
- spin_lock(&obj->oo_guard);
- inode->i_ctime = *t;
- spin_unlock(&obj->oo_guard);
- mark_inode_dirty(inode);
- }
- return rc;
+ return __osd_xattr_set(env, dt, buf, name, fl);
}
/*
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
struct osd_thread_info *info = osd_oti_get(env);
- struct dentry *dentry = &info->oti_dentry;
+ struct dentry *dentry = &info->oti_obj_dentry;
LASSERT(dt_object_exists(dt));
LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
struct osd_thread_info *info = osd_oti_get(env);
- struct dentry *dentry = &info->oti_dentry;
+ struct dentry *dentry = &info->oti_obj_dentry;
struct timespec *t = &info->oti_time;
int rc;
struct osd_object *obj = osd_dt_obj(dt);
struct inode *inode = obj->oo_inode;
struct osd_thread_info *info = osd_oti_get(env);
- struct dentry *dentry = &info->oti_dentry;
+ struct dentry *dentry = &info->oti_obj_dentry;
struct file *file = &info->oti_file;
ENTRY;
.do_object_sync = osd_object_sync,
};
+/**
+ * dt_object_operations for interoperability mode
+ * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
+ */
+static const struct dt_object_operations osd_obj_ea_ops = {
+ .do_read_lock = osd_object_read_lock,
+ .do_write_lock = osd_object_write_lock,
+ .do_read_unlock = osd_object_read_unlock,
+ .do_write_unlock = osd_object_write_unlock,
+ .do_attr_get = osd_attr_get,
+ .do_attr_set = osd_attr_set,
+ .do_ah_init = osd_ah_init,
+ .do_create = osd_object_ea_create,
+ .do_index_try = osd_index_try,
+ .do_ref_add = osd_object_ref_add,
+ .do_ref_del = osd_object_ref_del,
+ .do_xattr_get = osd_xattr_get,
+ .do_xattr_set = osd_xattr_set,
+ .do_xattr_del = osd_xattr_del,
+ .do_xattr_list = osd_xattr_list,
+ .do_capa_get = osd_capa_get,
+ .do_object_sync = osd_object_sync,
+};
+
/*
* Body operations.
*/
return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode;
}
-static int osd_index_probe(const struct lu_env *env, struct osd_object *o,
+static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
const struct dt_index_features *feat)
{
struct iam_descr *descr;
+ struct dt_object *dt = &o->oo_dt;
if (osd_object_is_root(o))
return feat == &dt_directory_features;
LASSERT(o->oo_dir != NULL);
descr = o->oo_dir->od_container.ic_descr;
- if (feat == &dt_directory_features)
- return descr == &iam_htree_compat_param ||
- (descr->id_rec_size == sizeof(struct lu_fid_pack) &&
- 1 /*
- * XXX check that index looks like directory.
- */
- );
- else
+ if (feat == &dt_directory_features) {
+ if (descr->id_rec_size == sizeof(struct lu_fid_pack))
+ return 1;
+
+ if (descr == &iam_htree_compat_param) {
+ /* if it is a HTREE dir then there is good chance that,
+ * we dealing with ext3 directory here with no FIDs. */
+
+ if (descr->id_rec_size ==
+ sizeof ((struct ldiskfs_dir_entry_2 *)NULL)->inode) {
+
+ dt->do_index_ops = &osd_index_ea_ops;
+ return 1;
+ }
+ }
+ return 0;
+ } else {
return
feat->dif_keysize_min <= descr->id_key_size &&
descr->id_key_size <= feat->dif_keysize_max &&
ergo(feat->dif_flags & DT_IND_UPDATE,
1 /* XXX check that object (and file system) is
* writable */);
+ }
}
-static int osd_container_init(const struct lu_env *env,
- struct osd_object *obj,
- struct osd_directory *dir)
+static int osd_iam_container_init(const struct lu_env *env,
+ struct osd_object *obj,
+ struct osd_directory *dir)
{
int result;
struct iam_container *bag;
if (result == 0) {
result = iam_container_setup(bag);
if (result == 0)
- obj->oo_dt.do_index_ops = &osd_index_ops;
+ obj->oo_dt.do_index_ops = &osd_index_iam_ops;
else
iam_container_fini(bag);
}
const struct dt_index_features *feat)
{
int result;
+ int ea_dir = 0;
struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_device *osd = osd_obj2dev(obj);
LINVRNT(osd_invariant(obj));
LASSERT(dt_object_exists(dt));
if (osd_object_is_root(obj)) {
- dt->do_index_ops = &osd_index_compat_ops;
+ dt->do_index_ops = &osd_index_ea_ops;
result = 0;
- } else if (!osd_has_index(obj)) {
- struct osd_directory *dir;
+ } else if (feat == &dt_directory_features && osd->od_iop_mode) {
+ dt->do_index_ops = &osd_index_ea_ops;
+ if (S_ISDIR(obj->oo_inode->i_mode))
+ result = 0;
+ else
+ result = -ENOTDIR;
+ ea_dir = 1;
+ } else if (!osd_has_index(obj)) {
+ struct osd_directory *dir;
OBD_ALLOC_PTR(dir);
if (dir != NULL) {
* recheck under lock.
*/
if (!osd_has_index(obj))
- result = osd_container_init(env, obj, dir);
+ result = osd_iam_container_init(env, obj, dir);
else
result = 0;
up(&obj->oo_dir->od_sem);
} else
result = 0;
- if (result == 0) {
- if (!osd_index_probe(env, obj, feat))
+ if (result == 0 && ea_dir == 0) {
+ if (!osd_iam_index_probe(env, obj, feat))
result = -ENOTDIR;
}
LINVRNT(osd_invariant(obj));
return result;
}
-static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
- const struct dt_key *key, struct thandle *handle,
- struct lustre_capa *capa)
+/**
+ * delete a (key, value) pair from index \a dt specified by \a key
+ *
+ * \param dt_object osd index object
+ * \param key key for index
+ * \param rec record reference
+ * \param handle transaction handler
+ *
+ * \retval 0 success
+ * \retval -ve failure
+ */
+
+static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
+ const struct dt_key *key, struct thandle *handle,
+ struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
struct osd_thandle *oh;
if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
RETURN(-EACCES);
- ipd = osd_ipd_get(env, bag);
+ ipd = osd_idx_ipd_get(env, bag);
if (unlikely(ipd == NULL))
RETURN(-ENOMEM);
RETURN(rc);
}
-static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
- struct dt_rec *rec, const struct dt_key *key,
- struct lustre_capa *capa)
+/**
+ * Index delete function for interoperability mode (b11826).
+ * It will remove the directory entry added by osd_index_ea_insert().
+ * This entry is needed to maintain name->fid mapping.
+ *
+ * \param key, key i.e. file entry to be deleted
+ *
+ * \retval 0, on success
+ * \retval -ve, on error
+ */
+static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
+ const struct dt_key *key, struct thandle *handle,
+ struct lustre_capa *capa)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct inode *dir = obj->oo_inode;
+ struct dentry *dentry;
+ struct osd_thandle *oh;
+ struct ldiskfs_dir_entry_2 *de;
+ struct buffer_head *bh;
+
+ int rc;
+
+ ENTRY;
+
+ LINVRNT(osd_invariant(obj));
+ LASSERT(dt_object_exists(dt));
+ LASSERT(handle != NULL);
+
+ oh = container_of(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle != NULL);
+ LASSERT(oh->ot_handle->h_transaction != NULL);
+
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
+ RETURN(-EACCES);
+
+ dentry = osd_child_dentry_get(env, obj,
+ (char *)key, strlen((char *)key));
+ bh = ldiskfs_find_entry(dentry, &de);
+ if (bh) {
+ rc = ldiskfs_delete_entry(oh->ot_handle,
+ dir, de, bh);
+ if (!rc)
+ mark_inode_dirty(dir);
+ brelse(bh);
+ } else
+ rc = -ENOENT;
+
+ LASSERT(osd_invariant(obj));
+ RETURN(rc);
+}
+
+/**
+ * Lookup index for \a key and copy record to \a rec.
+ *
+ * \param dt_object osd index object
+ * \param key key for index
+ * \param rec record reference
+ *
+ * \retval +ve success : exact mach
+ * \retval 0 return record with key not greater than \a key
+ * \retval -ve failure
+ */
+static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
+ struct dt_rec *rec, const struct dt_key *key,
+ struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
struct iam_path_descr *ipd;
struct iam_container *bag = &obj->oo_dir->od_container;
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct iam_iterator *it = &oti->oti_idx_it;
int rc;
-
ENTRY;
- LINVRNT(osd_invariant(obj));
+ LASSERT(osd_invariant(obj));
LASSERT(dt_object_exists(dt));
LASSERT(bag->ic_object == obj->oo_inode);
if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
- return -EACCES;
+ RETURN(-EACCES);
- ipd = osd_ipd_get(env, bag);
- if (unlikely(ipd == NULL))
+ ipd = osd_idx_ipd_get(env, bag);
+ if (IS_ERR(ipd))
RETURN(-ENOMEM);
- rc = iam_lookup(bag, (const struct iam_key *)key,
- (struct iam_rec *)rec, ipd);
+ /* got ipd now we can start iterator. */
+ iam_it_init(it, bag, 0, ipd);
+
+ rc = iam_it_get(it, (struct iam_key *)key);
+ if (rc >= 0)
+ iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)rec);
+
+ iam_it_put(it);
+ iam_it_fini(it);
osd_ipd_put(env, bag, ipd);
+
LINVRNT(osd_invariant(obj));
RETURN(rc);
}
-static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
- const struct dt_rec *rec, const struct dt_key *key,
- struct thandle *th, struct lustre_capa *capa,
- int ignore_quota)
+/**
+ * Inserts (key, value) pair in \a dt index object.
+ *
+ * \param dt osd index object
+ * \param key key for index
+ * \param rec record reference
+ * \param th transaction handler
+ *
+ * \retval 0 success
+ * \retval -ve failure
+ */
+static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
+ const struct dt_rec *rec, const struct dt_key *key,
+ struct thandle *th, struct lustre_capa *capa,
+ int ignore_quota)
{
struct osd_object *obj = osd_dt_obj(dt);
struct iam_path_descr *ipd;
if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
return -EACCES;
- ipd = osd_ipd_get(env, bag);
+ ipd = osd_idx_ipd_get(env, bag);
if (unlikely(ipd == NULL))
RETURN(-ENOMEM);
RETURN(rc);
}
-/*
- * Iterator operations.
+/**
+ * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
+ * into the directory.Also sets flags into osd object to
+ * indicate dot and dotdot are created. This is required for
+ * interoperability mode (b11826)
+ *
+ * \param dir directory for dot and dotdot fixup.
+ * \param obj child object for linking
+ *
+ * \retval 0, on success
+ * \retval -ve, on error
*/
-struct osd_it {
- struct osd_object *oi_obj;
- struct iam_path_descr *oi_ipd;
- struct iam_iterator oi_it;
-};
+static int osd_add_dot_dotdot(struct osd_thread_info *info,
+ struct osd_object *dir,
+ struct osd_object *obj, const char *name,
+ struct thandle *th)
+{
+ struct inode *parent_dir = obj->oo_inode;
+ struct inode *inode = dir->oo_inode;
+ struct osd_thandle *oth;
+ int result = 0;
+
+ oth = container_of(th, struct osd_thandle, ot_super);
+ LASSERT(oth->ot_handle->h_transaction != NULL);
+ LASSERT(S_ISDIR(dir->oo_inode->i_mode));
+
+ if (strcmp(name, dot) == 0) {
+ if (dir->oo_compat_dot_created) {
+ result = -EEXIST;
+ } else {
+ LASSERT(obj == dir);
+ dir->oo_compat_dot_created = 1;
+ result = 0;
+ }
+ } else if(strcmp(name, dotdot) == 0) {
+ if (!dir->oo_compat_dot_created)
+ return -EINVAL;
+ if (dir->oo_compat_dotdot_created)
+ return __osd_ea_add_rec(info, dir, obj, name, th);
+
+ result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode);
+ if (result == 0)
+ dir->oo_compat_dotdot_created = 1;
+ }
+
+ return result;
+}
+
+/**
+ * Calls ldiskfs_add_entry() to add directory entry
+ * into the directory. This is required for
+ * interoperability mode (b11826)
+ *
+ * \retval 0, on success
+ * \retval -ve, on error
+ */
+static int __osd_ea_add_rec(struct osd_thread_info *info,
+ struct osd_object *pobj,
+ struct osd_object *cobj,
+ const char *name,
+ struct thandle *th)
+{
+ struct dentry *child;
+ struct osd_thandle *oth;
+ struct inode *cinode = cobj->oo_inode;
+ int rc;
+
+ oth = container_of(th, struct osd_thandle, ot_super);
+ LASSERT(oth->ot_handle != NULL);
+ LASSERT(oth->ot_handle->h_transaction != NULL);
+
+ child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
+ rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
+
+ RETURN(rc);
+}
+
+/**
+ * It will call the appropriate osd_add* function and return the
+ * value, return by respective functions.
+ */
+static int osd_ea_add_rec(const struct lu_env *env,
+ struct osd_object *pobj,
+ struct osd_object *cobj,
+ const char *name,
+ struct thandle *th)
+{
+ struct osd_thread_info *info = osd_oti_get(env);
+ int rc;
-static struct dt_it *osd_it_init(const struct lu_env *env,
- struct dt_object *dt, int writable,
+ if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
+ name[2] =='\0')))
+ rc = osd_add_dot_dotdot(info, pobj, cobj, name, th);
+ else
+ rc = __osd_ea_add_rec(info, pobj, cobj, name, th);
+
+ return rc;
+}
+
+/**
+ * Calls ->lookup() to find dentry. From dentry get inode and
+ * read inode's ea to get fid. This is required for interoperability
+ * mode (b11826)
+ *
+ * \retval 0, on success
+ * \retval -ve, on error
+ */
+static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
+ struct dt_rec *rec, const struct dt_key *key)
+{
+ struct inode *dir = obj->oo_inode;
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct dentry *dentry;
+ struct osd_device *dev = osd_dev(obj->oo_dt.do_lu.lo_dev);
+ struct osd_inode_id *id = &info->oti_id;
+ struct ldiskfs_dir_entry_2 *de;
+ struct buffer_head *bh;
+ struct inode *inode;
+ int ino;
+ int rc;
+
+ LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
+
+ dentry = osd_child_dentry_get(env, obj,
+ (char *)key, strlen((char *)key));
+ bh = ldiskfs_find_entry(dentry, &de);
+ if (bh) {
+ ino = le32_to_cpu(de->inode);
+ brelse(bh);
+ id->oii_ino = ino;
+ id->oii_gen = OSD_OII_NOGEN;
+
+ inode = osd_iget(info, dev, id);
+ if (!IS_ERR(inode)) {
+ dentry->d_inode = inode;
+
+ rc = osd_ea_fid_get(env, dentry, rec);
+ iput(inode);
+ } else
+ rc = -ENOENT;
+ } else
+ rc = -ENOENT;
+
+ RETURN (rc);
+}
+
+/**
+ * Find the osd object for given fid.
+ *
+ * \param fid, need to find the osd object having this fid
+ *
+ * \retval osd_object, on success
+ * \retval -ve, on error
+ */
+struct osd_object *osd_object_find(const struct lu_env *env,
+ struct dt_object *dt,
+ const struct lu_fid *fid)
+{
+ struct lu_device *ludev = dt->do_lu.lo_dev;
+ struct osd_object *child = NULL;
+ struct lu_object *luch;
+ struct lu_object *lo;
+
+ luch = lu_object_find(env, ludev, fid, NULL);
+ if (!IS_ERR(luch)) {
+ if (lu_object_exists(luch)) {
+ lo = lu_object_locate(luch->lo_header, ludev->ld_type);
+ if (lo != NULL)
+ child = osd_obj(lo);
+ else
+ LU_OBJECT_DEBUG(D_ERROR, env, luch,
+ "lu_object can't be located"
+ ""DFID"\n", PFID(fid));
+
+ if (child == NULL) {
+ lu_object_put(env, luch);
+ CERROR("Unable to get osd_object\n");
+ child = ERR_PTR(-ENOENT);
+ }
+ } else {
+ LU_OBJECT_DEBUG(D_ERROR, env, luch,
+ "lu_object does not exists "DFID"\n",
+ PFID(fid));
+ child = ERR_PTR(-ENOENT);
+ }
+ } else
+ child = (void *)luch;
+
+ return child;
+}
+
+/**
+ * Put the osd object once done with it.
+ *
+ * \param obj, osd object that needs to be put
+ */
+static inline void osd_object_put(const struct lu_env *env,
+ struct osd_object *obj)
+{
+ lu_object_put(env, &obj->oo_dt.do_lu);
+}
+
+/**
+ * Index add function for interoperability mode (b11826).
+ * It will add the directory entry.This entry is needed to
+ * maintain name->fid mapping.
+ *
+ * \param key, it is key i.e. file entry to be inserted
+ * \param rec, it is value of given key i.e. fid
+ *
+ * \retval 0, on success
+ * \retval -ve, on error
+ */
+static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
+ const struct dt_rec *rec,
+ const struct dt_key *key, struct thandle *th,
+ struct lustre_capa *capa, int ignore_quota)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct lu_fid *fid = &osd_oti_get(env)->oti_fid;
+ const struct lu_fid_pack *pack = (const struct lu_fid_pack *)rec;
+ const char *name = (const char *)key;
+ struct osd_object *child;
+
+ int rc;
+
+ ENTRY;
+
+ LASSERT(osd_invariant(obj));
+ LASSERT(dt_object_exists(dt));
+ LASSERT(th != NULL);
+
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
+ RETURN(-EACCES);
+
+ rc = fid_unpack(pack, fid);
+ if (rc != 0)
+ RETURN(rc);
+ child = osd_object_find(env, dt, fid);
+ if (!IS_ERR(child)) {
+ rc = osd_ea_add_rec(env, obj, child, name, th);
+ osd_object_put(env, child);
+ } else {
+ rc = PTR_ERR(child);
+ }
+
+ LASSERT(osd_invariant(obj));
+ RETURN(rc);
+}
+
+/**
+ * Initialize osd Iterator for given osd index object.
+ *
+ * \param dt osd index object
+ */
+
+static struct dt_it *osd_it_iam_init(const struct lu_env *env,
+ struct dt_object *dt,
struct lustre_capa *capa)
{
- struct osd_it *it;
+ struct osd_it_iam *it;
+ struct osd_thread_info *oti = osd_oti_get(env);
struct osd_object *obj = osd_dt_obj(dt);
struct lu_object *lo = &dt->do_lu;
struct iam_path_descr *ipd;
struct iam_container *bag = &obj->oo_dir->od_container;
- __u32 flags;
LASSERT(lu_object_exists(lo));
- if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE :
- CAPA_OPC_BODY_READ))
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
return ERR_PTR(-EACCES);
- flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE;
- OBD_ALLOC_PTR(it);
- if (it != NULL) {
- /*
- * XXX: as ipd is allocated within osd_thread_info, assignment
- * below implies that iterator usage is confined within single
- * environment.
- */
- ipd = osd_ipd_get(env, bag);
- if (likely(ipd != NULL)) {
- it->oi_obj = obj;
- it->oi_ipd = ipd;
- lu_object_get(lo);
- iam_it_init(&it->oi_it, bag, flags, ipd);
- return (struct dt_it *)it;
- } else
- OBD_FREE_PTR(it);
+ it = &oti->oti_it;
+ ipd = osd_it_ipd_get(env, bag);
+ if (likely(ipd != NULL)) {
+ it->oi_obj = obj;
+ it->oi_ipd = ipd;
+ lu_object_get(lo);
+ iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
+ return (struct dt_it *)it;
}
return ERR_PTR(-ENOMEM);
}
-static void osd_it_fini(const struct lu_env *env, struct dt_it *di)
+/**
+ * free given Iterator.
+ */
+
+static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
{
- struct osd_it *it = (struct osd_it *)di;
+ struct osd_it_iam *it = (struct osd_it_iam *)di;
struct osd_object *obj = it->oi_obj;
iam_it_fini(&it->oi_it);
osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
lu_object_put(env, &obj->oo_dt.do_lu);
- OBD_FREE_PTR(it);
}
-static int osd_it_get(const struct lu_env *env,
+/**
+ * Move Iterator to record specified by \a key
+ *
+ * \param di osd iterator
+ * \param key key for index
+ *
+ * \retval +ve di points to record with least key not larger than key
+ * \retval 0 di points to exact matched key
+ * \retval -ve failure
+ */
+
+static int osd_it_iam_get(const struct lu_env *env,
struct dt_it *di, const struct dt_key *key)
{
- struct osd_it *it = (struct osd_it *)di;
+ struct osd_it_iam *it = (struct osd_it_iam *)di;
return iam_it_get(&it->oi_it, (const struct iam_key *)key);
}
-static void osd_it_put(const struct lu_env *env, struct dt_it *di)
+/**
+ * Release Iterator
+ *
+ * \param di osd iterator
+ */
+
+static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
{
- struct osd_it *it = (struct osd_it *)di;
+ struct osd_it_iam *it = (struct osd_it_iam *)di;
iam_it_put(&it->oi_it);
}
-static int osd_it_next(const struct lu_env *env, struct dt_it *di)
+/**
+ * Move iterator by one record
+ *
+ * \param di osd iterator
+ *
+ * \retval +1 end of container reached
+ * \retval 0 success
+ * \retval -ve failure
+ */
+
+static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
{
- struct osd_it *it = (struct osd_it *)di;
+ struct osd_it_iam *it = (struct osd_it_iam *)di;
return iam_it_next(&it->oi_it);
}
-static int osd_it_del(const struct lu_env *env, struct dt_it *di,
- struct thandle *th)
-{
- struct osd_it *it = (struct osd_it *)di;
- struct osd_thandle *oh;
-
- LASSERT(th != NULL);
-
- oh = container_of0(th, struct osd_thandle, ot_super);
- LASSERT(oh->ot_handle != NULL);
- LASSERT(oh->ot_handle->h_transaction != NULL);
-
- return iam_it_rec_delete(oh->ot_handle, &it->oi_it);
-}
+/**
+ * Return pointer to the key under iterator.
+ */
-static struct dt_key *osd_it_key(const struct lu_env *env,
+static struct dt_key *osd_it_iam_key(const struct lu_env *env,
const struct dt_it *di)
{
- struct osd_it *it = (struct osd_it *)di;
+ struct osd_it_iam *it = (struct osd_it_iam *)di;
return (struct dt_key *)iam_it_key_get(&it->oi_it);
}
-static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di)
+/**
+ * Return size of key under iterator (in bytes)
+ */
+
+static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
{
- struct osd_it *it = (struct osd_it *)di;
+ struct osd_it_iam *it = (struct osd_it_iam *)di;
return iam_it_key_size(&it->oi_it);
}
-static struct dt_rec *osd_it_rec(const struct lu_env *env,
+/**
+ * Return pointer to the record under iterator.
+ */
+static struct dt_rec *osd_it_iam_rec(const struct lu_env *env,
const struct dt_it *di)
{
- struct osd_it *it = (struct osd_it *)di;
+ struct osd_it_iam *it = (struct osd_it_iam *)di;
return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
}
-static __u64 osd_it_store(const struct lu_env *env, const struct dt_it *di)
+/**
+ * Returns cookie for current Iterator position.
+ */
+static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
{
- struct osd_it *it = (struct osd_it *)di;
+ struct osd_it_iam *it = (struct osd_it_iam *)di;
return iam_it_store(&it->oi_it);
}
-static int osd_it_load(const struct lu_env *env,
+/**
+ * Restore iterator from cookie.
+ *
+ * \param di osd iterator
+ * \param hash Iterator location cookie
+ *
+ * \retval +ve di points to record with least key not larger than key.
+ * \retval 0 di points to exact matched key
+ * \retval -ve failure
+ */
+
+static int osd_it_iam_load(const struct lu_env *env,
const struct dt_it *di, __u64 hash)
{
- struct osd_it *it = (struct osd_it *)di;
+ struct osd_it_iam *it = (struct osd_it_iam *)di;
return iam_it_load(&it->oi_it, hash);
}
-static const struct dt_index_operations osd_index_ops = {
- .dio_lookup = osd_index_lookup,
- .dio_insert = osd_index_insert,
- .dio_delete = osd_index_delete,
+static const struct dt_index_operations osd_index_iam_ops = {
+ .dio_lookup = osd_index_iam_lookup,
+ .dio_insert = osd_index_iam_insert,
+ .dio_delete = osd_index_iam_delete,
.dio_it = {
- .init = osd_it_init,
- .fini = osd_it_fini,
- .get = osd_it_get,
- .put = osd_it_put,
- .del = osd_it_del,
- .next = osd_it_next,
- .key = osd_it_key,
- .key_size = osd_it_key_size,
- .rec = osd_it_rec,
- .store = osd_it_store,
- .load = osd_it_load
+ .init = osd_it_iam_init,
+ .fini = osd_it_iam_fini,
+ .get = osd_it_iam_get,
+ .put = osd_it_iam_put,
+ .next = osd_it_iam_next,
+ .key = osd_it_iam_key,
+ .key_size = osd_it_iam_key_size,
+ .rec = osd_it_iam_rec,
+ .store = osd_it_iam_store,
+ .load = osd_it_iam_load
}
};
-static int osd_index_compat_delete(const struct lu_env *env,
- struct dt_object *dt,
- const struct dt_key *key,
- struct thandle *handle,
- struct lustre_capa *capa)
+/**
+ * Creates or initializes iterator context.
+ *
+ * \retval struct osd_it_ea, iterator structure on success
+ *
+ */
+static struct dt_it *osd_it_ea_init(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lustre_capa *capa)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct osd_it_ea *it = &info->oti_it_ea;
+ struct lu_object *lo = &dt->do_lu;
+ struct dentry *obj_dentry = &info->oti_obj_dentry;
+ ENTRY;
+ LASSERT(lu_object_exists(lo));
+
+ obj_dentry->d_inode = obj->oo_inode;
+ obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
+ obj_dentry->d_name.hash = 0;
+
+ it->oie_namelen = 0;
+ it->oie_curr_pos = 0;
+ it->oie_next_pos = 0;
+ it->oie_obj = obj;
+ it->oie_file.f_dentry = obj_dentry;
+ it->oie_file.f_mapping = obj->oo_inode->i_mapping;
+ it->oie_file.f_op = obj->oo_inode->i_fop;
+ it->oie_file.private_data = NULL;
+ lu_object_get(lo);
+
+ RETURN((struct dt_it*) it);
+}
+
+/**
+ * Destroy or finishes iterator context.
+ *
+ * \param di, struct osd_it_ea, iterator structure to be destroyed
+ */
+static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
{
- struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_it_ea *it = (struct osd_it_ea *)di;
+ struct osd_object *obj = it->oie_obj;
+
- LASSERT(handle != NULL);
- LASSERT(S_ISDIR(obj->oo_inode->i_mode));
ENTRY;
+ lu_object_put(env, &obj->oo_dt.do_lu);
+ EXIT;
+}
-#if 0
- if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
- RETURN(-EACCES);
-#endif
+/**
+ * It position the iterator at given key, so that next lookup continues from
+ * that key Or it is similar to dio_it->load() but based on a key,
+ * rather than file position.
+ *
+ * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
+ * to the beginning.
+ *
+ * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
+ */
+static int osd_it_ea_get(const struct lu_env *env,
+ struct dt_it *di, const struct dt_key *key)
+{
+ struct osd_it_ea *it = (struct osd_it_ea *)di;
- RETURN(-EOPNOTSUPP);
+ ENTRY;
+ LASSERT(((const char *)key)[0] == '\0');
+ it->oie_namelen = 0;
+ it->oie_curr_pos = 0;
+ it->oie_next_pos = 0;
+
+ RETURN(+1);
}
-/*
- * Compatibility index operations.
+/**
+ * Does nothing
*/
+static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
+{
+}
-
-static void osd_build_pack(const struct lu_env *env, struct osd_device *osd,
- struct dentry *dentry, struct lu_fid_pack *pack)
+/**
+ * It is called internally by ->readdir(). It fills the
+ * iterator's in-memory data structure with required
+ * information i.e. name, namelen, rec_size etc.
+ *
+ * \param buf, in which information to be filled in.
+ * \param name, name of the file in given dir
+ *
+ * \retval 0, on success
+ * \retval 1, on buffer full
+ */
+static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
+ loff_t offset, ino_t ino,
+ unsigned int d_type)
{
- struct inode *inode = dentry->d_inode;
- struct lu_fid *fid = &osd_oti_get(env)->oti_fid;
+ struct osd_it_ea *it = (struct osd_it_ea *)buf;
+ struct dirent64 *dirent = &it->oie_dirent64;
+ int reclen = LDISKFS_DIR_REC_LEN(namelen);
- lu_igif_build(fid, inode->i_ino, inode->i_generation);
- fid_cpu_to_be(fid, fid);
- pack->fp_len = sizeof *fid + 1;
- memcpy(pack->fp_area, fid, sizeof *fid);
+
+ ENTRY;
+ if (it->oie_namelen)
+ RETURN(-ENOENT);
+
+ if (namelen == 0 || namelen > LDISKFS_NAME_LEN)
+ RETURN(-EIO);
+
+ strncpy(dirent->d_name, name, LDISKFS_NAME_LEN);
+ dirent->d_name[namelen] = 0;
+ dirent->d_ino = ino;
+ dirent->d_off = offset;
+ dirent->d_reclen = reclen;
+ it->oie_namelen = namelen;
+ it->oie_curr_pos = offset;
+
+ RETURN(0);
}
-static int osd_index_compat_lookup(const struct lu_env *env,
- struct dt_object *dt,
- struct dt_rec *rec, const struct dt_key *key,
- struct lustre_capa *capa)
+/**
+ * Calls ->readdir() to load a directory entry at a time
+ * and stored it in iterator's in-memory data structure.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval 0, on success
+ * \retval -ve, on error
+ */
+int osd_ldiskfs_it_fill(const struct dt_it *di)
{
- struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_it_ea *it = (struct osd_it_ea *)di;
+ struct osd_object *obj = it->oie_obj;
+ struct inode *inode = obj->oo_inode;
+ int result = 0;
- struct osd_device *osd = osd_obj2dev(obj);
- struct osd_thread_info *info = osd_oti_get(env);
- struct inode *dir;
+ ENTRY;
+ it->oie_namelen = 0;
+ it->oie_file.f_pos = it->oie_curr_pos;
- int result;
+ result = inode->i_fop->readdir(&it->oie_file, it,
+ (filldir_t) osd_ldiskfs_filldir);
- /*
- * XXX temporary solution.
- */
- struct dentry *dentry;
- struct dentry *parent;
+ it->oie_next_pos = it->oie_file.f_pos;
- LINVRNT(osd_invariant(obj));
- LASSERT(S_ISDIR(obj->oo_inode->i_mode));
- LASSERT(osd_has_index(obj));
+ if(!result && it->oie_namelen == 0)
+ result = -EIO;
- if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
- return -EACCES;
+ RETURN(result);
+}
- info->oti_str.name = (const char *)key;
- info->oti_str.len = strlen((const char *)key);
+/**
+ * It calls osd_ldiskfs_it_fill() which will use ->readdir()
+ * to load a directory entry at a time and stored it in
+ * iterator's in-memory data structure.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval +ve, iterator reached to end
+ * \retval 0, iterator not reached to end
+ * \retval -ve, on error
+ */
+static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
+{
+ struct osd_it_ea *it = (struct osd_it_ea *)di;
+ int rc;
- dir = obj->oo_inode;
- LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
+ ENTRY;
+ it->oie_curr_pos = it->oie_next_pos;
- parent = d_alloc_root(dir);
- if (parent == NULL)
- return -ENOMEM;
- igrab(dir);
- dentry = d_alloc(parent, &info->oti_str);
- if (dentry != NULL) {
- struct dentry *d;
+ if (it->oie_curr_pos == LDISKFS_HTREE_EOF)
+ rc = +1;
+ else
+ rc = osd_ldiskfs_it_fill(di);
- /*
- * XXX passing NULL for nameidata should work for
- * ext3/ldiskfs.
- */
- d = dir->i_op->lookup(dir, dentry, NULL);
- if (d == NULL) {
- /*
- * normal case, result is in @dentry.
- */
- if (dentry->d_inode != NULL) {
- osd_build_pack(env, osd, dentry,
- (struct lu_fid_pack *)rec);
- result = 0;
- } else
- result = -ENOENT;
- } else {
- /* What? Disconnected alias? Ppheeeww... */
- CERROR("Aliasing where not expected\n");
- result = -EIO;
- dput(d);
- }
- dput(dentry);
- } else
- result = -ENOMEM;
- dput(parent);
- LINVRNT(osd_invariant(obj));
- return result;
+ RETURN(rc);
+}
+
+/**
+ * Returns the key at current position from iterator's in memory structure.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval key i.e. struct dt_key on success
+ */
+static struct dt_key *osd_it_ea_key(const struct lu_env *env,
+ const struct dt_it *di)
+{
+ struct osd_it_ea *it = (struct osd_it_ea *)di;
+ ENTRY;
+ RETURN((struct dt_key *)it->oie_dirent64.d_name);
}
-static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev,
- struct inode *dir, struct inode *inode, const char *name)
+/**
+ * Returns the key's size at current position from iterator's in memory structure.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval key_size i.e. struct dt_key on success
+ */
+static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
{
- struct dentry *old;
- struct dentry *new;
- struct dentry *parent;
+ struct osd_it_ea *it = (struct osd_it_ea *)di;
+ ENTRY;
+ RETURN(it->oie_namelen);
+}
- int result;
+/**
+ * Returns the value (i.e. fid/igif) at current position from iterator's
+ * in memory structure.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval value i.e. struct dt_rec on success
+ */
+static struct dt_rec *osd_it_ea_rec(const struct lu_env *env,
+ const struct dt_it *di)
+{
+ struct osd_it_ea *it = (struct osd_it_ea *)di;
+ struct osd_object *obj = it->oie_obj;
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct osd_inode_id *id = &info->oti_id;
+ struct lu_fid_pack *rec = &info->oti_pack;
+ struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev;
+ struct dentry *dentry = &info->oti_child_dentry;
+ struct osd_device *dev;
+ struct inode *inode;
+ int rc;
- info->oti_str.name = name;
- info->oti_str.len = strlen(name);
-
- LASSERT(atomic_read(&dir->i_count) > 0);
- result = -ENOMEM;
- old = d_alloc(dev->od_obj_area, &info->oti_str);
- if (old != NULL) {
- d_instantiate(old, inode);
- igrab(inode);
- LASSERT(atomic_read(&dir->i_count) > 0);
- parent = d_alloc_root(dir);
- if (parent != NULL) {
- igrab(dir);
- LASSERT(atomic_read(&dir->i_count) > 1);
- new = d_alloc(parent, &info->oti_str);
- LASSERT(atomic_read(&dir->i_count) > 1);
- if (new != NULL) {
- LASSERT(atomic_read(&dir->i_count) > 1);
- result = dir->i_op->link(old, dir, new);
- LASSERT(atomic_read(&dir->i_count) > 1);
- dput(new);
- LASSERT(atomic_read(&dir->i_count) > 1);
- }
- LASSERT(atomic_read(&dir->i_count) > 1);
- dput(parent);
- LASSERT(atomic_read(&dir->i_count) > 0);
- }
- dput(old);
+ ENTRY;
+ dev = osd_dev(ldev);
+ id->oii_ino = it->oie_dirent64.d_ino;
+ id->oii_gen = OSD_OII_NOGEN;
+ inode = osd_iget(info, dev, id);
+ if (!IS_ERR(inode)) {
+ dentry->d_inode = inode;
+ LASSERT(dentry->d_inode->i_sb == osd_sb(dev));
+ } else {
+ CERROR("Error getting inode for ino =%d", id->oii_ino);
+ RETURN((struct dt_rec *) PTR_ERR(inode));
}
- LASSERT(atomic_read(&dir->i_count) > 0);
- return result;
+
+ rc = osd_ea_fid_get(env, dentry, (struct dt_rec*) rec);
+
+ iput(inode);
+ RETURN((struct dt_rec *)rec);
+
}
+/**
+ * Returns a cookie for current position of the iterator head, so that
+ * user can use this cookie to load/start the iterator next time.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval cookie for current position, on success
+ */
+static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
+{
+ struct osd_it_ea *it = (struct osd_it_ea *)di;
+ ENTRY;
+ RETURN(it->oie_curr_pos);
+}
-/*
- * XXX Temporary stuff.
+/**
+ * It calls osd_ldiskfs_it_fill() which will use ->readdir()
+ * to load a directory entry at a time and stored it i inn,
+ * in iterator's in-memory data structure.
+ *
+ * \param di, struct osd_it_ea, iterator's in memory structure
+ *
+ * \retval +ve, on success
+ * \retval -ve, on error
*/
-static int osd_index_compat_insert(const struct lu_env *env,
- struct dt_object *dt,
- const struct dt_rec *rec,
- const struct dt_key *key, struct thandle *th,
- struct lustre_capa *capa,
- int ignore_quota)
+static int osd_it_ea_load(const struct lu_env *env,
+ const struct dt_it *di, __u64 hash)
{
- struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_it_ea *it = (struct osd_it_ea *)di;
+ int rc;
- const char *name = (const char *)key;
+ ENTRY;
+ it->oie_curr_pos = it->oie_next_pos = hash;
- struct lu_device *ludev = dt->do_lu.lo_dev;
- struct lu_object *luch;
+ rc = osd_ldiskfs_it_fill(di);
+ if (rc == 0)
+ rc = +1;
- struct osd_thread_info *info = osd_oti_get(env);
- const struct lu_fid_pack *pack = (const struct lu_fid_pack *)rec;
- struct lu_fid *fid = &osd_oti_get(env)->oti_fid;
+ RETURN(rc);
+}
+/**
+ * Index and Iterator operations for interoperability
+ * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
+ */
+static const struct dt_index_operations osd_index_ea_ops = {
+ .dio_lookup = osd_index_ea_lookup,
+ .dio_insert = osd_index_ea_insert,
+ .dio_delete = osd_index_ea_delete,
+ .dio_it = {
+ .init = osd_it_ea_init,
+ .fini = osd_it_ea_fini,
+ .get = osd_it_ea_get,
+ .put = osd_it_ea_put,
+ .next = osd_it_ea_next,
+ .key = osd_it_ea_key,
+ .key_size = osd_it_ea_key_size,
+ .rec = osd_it_ea_rec,
+ .store = osd_it_ea_store,
+ .load = osd_it_ea_load
+ }
+};
- int result;
+/**
+ * Index lookup function for interoperability mode (b11826).
+ *
+ * \param key, key i.e. file name to be searched
+ *
+ * \retval +ve, on success
+ * \retval -ve, on error
+ */
+static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
+ struct dt_rec *rec, const struct dt_key *key,
+ struct lustre_capa *capa)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+ int rc = 0;
+
+ ENTRY;
LASSERT(S_ISDIR(obj->oo_inode->i_mode));
LINVRNT(osd_invariant(obj));
- LASSERT(th != NULL);
- if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
return -EACCES;
- result = fid_unpack(pack, fid);
- if (result != 0)
- return result;
+ rc = osd_ea_lookup_rec(env, obj, rec, key);
- luch = lu_object_find(env, ludev, fid, NULL);
- if (!IS_ERR(luch)) {
- if (lu_object_exists(luch)) {
- struct osd_object *child;
-
- child = osd_obj(lu_object_locate(luch->lo_header,
- ludev->ld_type));
- if (child != NULL)
- result = osd_add_rec(info, osd_obj2dev(obj),
- obj->oo_inode,
- child->oo_inode, name);
- else {
- CERROR("No osd slice.\n");
- result = -ENOENT;
- }
- LINVRNT(osd_invariant(obj));
- LINVRNT(osd_invariant(child));
- } else {
- CERROR("Sorry.\n");
- result = -ENOENT;
- }
- lu_object_put(env, luch);
- } else
- result = PTR_ERR(luch);
- LINVRNT(osd_invariant(obj));
- return result;
+ if (rc == 0)
+ rc = +1;
+ RETURN(rc);
}
-static const struct dt_index_operations osd_index_compat_ops = {
- .dio_lookup = osd_index_compat_lookup,
- .dio_insert = osd_index_compat_insert,
- .dio_delete = osd_index_compat_delete
-};
-
/* type constructor/destructor: osd_type_init, osd_type_fini */
LU_TYPE_INIT_FINI(osd, &osd_key);
struct osd_thread_info *info = osd_oti_get(env);
ENTRY;
if (o->od_obj_area != NULL) {
- dput(o->od_obj_area);
+ lu_object_put(env, &o->od_obj_area->do_lu);
o->od_obj_area = NULL;
}
osd_oi_fini(info, &o->od_oi);
{
struct lustre_mount_info *lmi;
const char *dev = lustre_cfg_string(cfg, 0);
- struct osd_thread_info *info = osd_oti_get(env);
- int result;
+ struct lustre_disk_data *ldd;
+ struct lustre_sb_info *lsi;
ENTRY;
/* save lustre_mount_info in dt_device */
o->od_mount = lmi;
- result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev);
- if (result == 0) {
- struct dentry *d;
+ lsi = s2lsi(lmi->lmi_sb);
+ ldd = lsi->lsi_ldd;
- d = simple_mkdir(osd_sb(o)->s_root, lmi->lmi_mnt, "*OBJ-TEMP*",
- 0777, 1);
- if (!IS_ERR(d)) {
- o->od_obj_area = d;
- } else
- result = PTR_ERR(d);
- }
- if (result != 0)
- osd_shutdown(env, o);
- RETURN(result);
+ if (ldd->ldd_flags & LDD_F_IAM_DIR) {
+ o->od_iop_mode = 0;
+ LCONSOLE_WARN("OSD: IAM mode enabled\n");
+ } else
+ o->od_iop_mode = 1;
+
+ o->od_obj_area = NULL;
+ RETURN(0);
}
static struct lu_device *osd_device_fini(const struct lu_env *env,
err = osd_shutdown(env, o);
break;
default:
- err = -ENOTTY;
+ err = -ENOSYS;
}
RETURN(err);
}
+
extern void ldiskfs_orphan_cleanup (struct super_block * sb,
struct ldiskfs_super_block * es);
RETURN(0);
}
+static int osd_prepare(const struct lu_env *env,
+ struct lu_device *pdev,
+ struct lu_device *dev)
+{
+ struct osd_device *osd = osd_dev(dev);
+ struct lustre_sb_info *lsi;
+ struct lustre_disk_data *ldd;
+ struct lustre_mount_info *lmi;
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct dt_object *d;
+ int result;
+
+ ENTRY;
+ /* 1. initialize oi before any file create or file open */
+ result = osd_oi_init(oti, &osd->od_oi,
+ &osd->od_dt_dev, lu2md_dev(pdev));
+ if (result != 0)
+ RETURN(result);
+
+ lmi = osd->od_mount;
+ lsi = s2lsi(lmi->lmi_sb);
+ ldd = lsi->lsi_ldd;
+
+ /* 2. setup local objects */
+ result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev));
+ if (result)
+ goto out;
+
+ /* 3. open remote object dir */
+ d = dt_store_open(env, lu2dt_dev(dev), "",
+ remote_obj_dir, &oti->oti_fid);
+ if (!IS_ERR(d)) {
+ osd->od_obj_area = d;
+ result = 0;
+ } else {
+ result = PTR_ERR(d);
+ osd->od_obj_area = NULL;
+ }
+
+out:
+ RETURN(result);
+}
+
static struct inode *osd_iget(struct osd_thread_info *info,
struct osd_device *dev,
const struct osd_inode_id *id)
CERROR("bad inode\n");
iput(inode);
inode = ERR_PTR(-ENOENT);
- } else if (inode->i_generation != id->oii_gen) {
+ } else if (id->oii_gen != OSD_OII_NOGEN &&
+ inode->i_generation != id->oii_gen) {
CERROR("stale inode\n");
iput(inode);
inode = ERR_PTR(-ESTALE);
if (!IS_ERR(inode)) {
obj->oo_inode = inode;
LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
+ if (dev->od_iop_mode) {
+ obj->oo_compat_dot_created = 1;
+ obj->oo_compat_dotdot_created = 1;
+ }
result = 0;
} else
/*
} else if (result == -ENOENT)
result = 0;
LINVRNT(osd_invariant(obj));
+
RETURN(result);
}
static const struct lu_device_operations osd_lu_ops = {
.ldo_object_alloc = osd_object_alloc,
.ldo_process_config = osd_process_config,
- .ldo_recovery_complete = osd_recovery_complete
+ .ldo_recovery_complete = osd_recovery_complete,
+ .ldo_prepare = osd_prepare,
};
static const struct lu_device_type_operations osd_device_type_ops = {
.o_owner = THIS_MODULE
};
+static struct lu_local_obj_desc llod_osd_rem_obj_dir = {
+ .llod_name = remote_obj_dir,
+ .llod_oid = OSD_REM_OBJ_DIR_OID,
+ .llod_is_index = 1,
+ .llod_feat = &dt_directory_features,
+};
+
static int __init osd_mod_init(void)
{
struct lprocfs_static_vars lvars;
+ osd_oi_mod_init();
+ llo_local_obj_register(&llod_osd_rem_obj_dir);
lprocfs_osd_init_vars(&lvars);
return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
LUSTRE_OSD_NAME, &osd_device_type);
/* struct dentry */
#include <linux/dcache.h>
#include <linux/lustre_iam.h>
+/* struct dirent64 */
+#include <linux/dirent.h>
/* LUSTRE_OSD_NAME */
#include <obd.h>
struct inode;
+#define OSD_OII_NOGEN (0)
#define OSD_COUNTERS (0)
#ifdef HAVE_QUOTA_SUPPORT
* XXX temporary stuff for object index: directory where every object
* is named by its fid.
*/
- struct dentry *od_obj_area;
+ struct dt_object *od_obj_area;
/* Environment for transaction commit callback.
* Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD
cfs_time_t od_osfs_age;
struct kstatfs od_kstatfs;
spinlock_t od_osfs_lock;
+
+ /**
+ * The following flag indicates, if it is interop mode or not.
+ * It will be initialized, using mount param.
+ */
+ __u32 od_iop_mode;
};
+/**
+ * This is iterator's in-memory data structure in interoperability
+ * mode (i.e. iterator over ldiskfs style directory)
+ */
+struct osd_it_ea {
+ struct osd_object *oie_obj;
+ /** used in ldiskfs iterator, to stored file pointer */
+ struct file oie_file;
+ /** used in ldiskfs iterator, to store directory entry */
+ struct dirent64 oie_dirent64;
+ /** current file position */
+ __u64 oie_curr_pos;
+ /** next file position */
+ __u64 oie_next_pos;
+ /** namelen of the file */
+ __u8 oie_namelen;
+
+};
+
+/**
+ * Iterator's in-memory data structure for IAM mode.
+ */
+struct osd_it_iam {
+ struct osd_object *oi_obj;
+ struct iam_path_descr *oi_ipd;
+ struct iam_iterator oi_it;
+};
struct osd_thread_info {
const struct lu_env *oti_env;
+ /**
+ * used for index operations.
+ */
+ struct dentry oti_obj_dentry;
+ struct dentry oti_child_dentry;
struct lu_fid oti_fid;
struct osd_inode_id oti_id;
/*
* XXX temporary: for ->i_op calls.
*/
- struct qstr oti_str;
struct txn_param oti_txn;
/*
* XXX temporary: fake dentry used by xattr calls.
*/
- struct dentry oti_dentry;
struct timespec oti_time;
/*
* XXX temporary: fake struct file for osd_object_sync
struct lu_fid_pack oti_pack;
- /* union to guarantee that ->oti_ipd[] has proper alignment. */
+ /**
+ * following ipd and it structures are used for osd_index_iam_lookup()
+ * these are defined separately as we might do index operation
+ * in open iterator session.
+ */
+
+ /** osd iterator context used for iterator session */
+
+ union {
+ struct osd_it_iam oti_it;
+ /** ldiskfs iterator data structure, see osd_it_ea_{init, fini} */
+ struct osd_it_ea oti_it_ea;
+ };
+
+
+ /** IAM iterator for index operation. */
+ struct iam_iterator oti_idx_it;
+
+ /** union to guarantee that ->oti_ipd[] has proper alignment. */
union {
- char oti_ipd[DX_IPD_MAX_SIZE];
+ char oti_it_ipd[DX_IPD_MAX_SIZE];
long long oti_alignment_lieutenant;
};
+
+ union {
+ char oti_idx_ipd[DX_IPD_MAX_SIZE];
+ long long oti_alignment_lieutenant_colonel;
+ };
+
+
int oti_r_locks;
int oti_w_locks;
int oti_txns;
+ /** used in osd_fid_set() to put xattr */
+ struct lu_buf oti_buf;
+ /** used in osd_ea_fid_set() to set fid into common ea */
+ struct lustre_mdt_attrs oti_mdt_attrs;
#ifdef HAVE_QUOTA_SUPPORT
struct osd_ctxt oti_ctxt;
#endif
struct oi_descr {
int fid_size;
char *name;
+ __u32 oid;
+};
+
+/** to serialize concurrent OI index initialization */
+static struct mutex oi_init_lock;
+
+static struct dt_index_features oi_feat = {
+ .dif_flags = DT_IND_UPDATE,
+ .dif_recsize_min = sizeof(struct osd_inode_id),
+ .dif_recsize_max = sizeof(struct osd_inode_id),
+ .dif_ptrsize = 4
};
static const struct oi_descr oi_descr[OSD_OI_FID_NR] = {
[OSD_OI_FID_SMALL] = {
.fid_size = 5,
- .name = "oi.5"
+ .name = "oi.5",
+ .oid = OSD_OI_FID_SMALL_OID
},
[OSD_OI_FID_OTHER] = {
.fid_size = sizeof(struct lu_fid),
- .name = "oi.16"
+ .name = "oi.16",
+ .oid = OSD_OI_FID_OTHER_OID
}
};
+static int osd_oi_index_create(struct osd_thread_info *info,
+ struct dt_device *dev,
+ struct md_device *mdev)
+{
+ const struct lu_env *env;
+ struct lu_fid *oi_fid = &info->oti_fid;
+ struct md_object *mdo;
+ int i;
+ int rc;
+
+ env = info->oti_env;
+
+ for (i = rc = 0; i < OSD_OI_FID_NR && rc == 0; ++i) {
+ char *name;
+ name = oi_descr[i].name;
+ lu_local_obj_fid(oi_fid, oi_descr[i].oid);
+ oi_feat.dif_keysize_min = oi_descr[i].fid_size,
+ oi_feat.dif_keysize_max = oi_descr[i].fid_size,
+
+ mdo = llo_store_create_index(env, mdev, dev,
+ "/", name,
+ oi_fid, &oi_feat);
+
+ if (IS_ERR(mdo))
+ RETURN(PTR_ERR(mdo));
+
+ lu_object_put(env, &mdo->mo_lu);
+ }
+ return 0;
+}
+
int osd_oi_init(struct osd_thread_info *info,
- struct osd_oi *oi, struct dt_device *dev)
+ struct osd_oi *oi,
+ struct dt_device *dev,
+ struct md_device *mdev)
{
+ const struct lu_env *env;
int rc;
int i;
- const struct lu_env *env;
CLASSERT(ARRAY_SIZE(oi->oi_dir) == ARRAY_SIZE(oi_descr));
env = info->oti_env;
-
+ mutex_lock(&oi_init_lock);
memset(oi, 0, sizeof *oi);
-
- for (i = rc = 0; i < ARRAY_SIZE(oi->oi_dir) && rc == 0; ++i) {
+retry:
+ for (i = rc = 0; i < OSD_OI_FID_NR && rc == 0; ++i) {
const char *name;
- /*
- * Allocate on stack---this is initialization.
- */
- const struct dt_index_features feat = {
- .dif_flags = DT_IND_UPDATE,
- .dif_keysize_min = oi_descr[i].fid_size,
- .dif_keysize_max = oi_descr[i].fid_size,
- .dif_recsize_min = sizeof(struct osd_inode_id),
- .dif_recsize_max = sizeof(struct osd_inode_id)
- };
struct dt_object *obj;
name = oi_descr[i].name;
- obj = dt_store_open(env, dev, name, &info->oti_fid);
+ oi_feat.dif_keysize_min = oi_descr[i].fid_size,
+ oi_feat.dif_keysize_max = oi_descr[i].fid_size,
+
+ obj = dt_store_open(env, dev, "", name, &info->oti_fid);
if (!IS_ERR(obj)) {
- rc = obj->do_ops->do_index_try(env, obj, &feat);
+ rc = obj->do_ops->do_index_try(env, obj, &oi_feat);
if (rc == 0) {
LASSERT(obj->do_index_ops != NULL);
oi->oi_dir[i] = obj;
}
} else {
rc = PTR_ERR(obj);
+ if (rc == -ENOENT) {
+ rc = osd_oi_index_create(info, dev, mdev);
+ if (!rc)
+ goto retry;
+ }
CERROR("Cannot open \"%s\": %d\n", name, rc);
}
}
if (rc != 0)
osd_oi_fini(info, oi);
+
+ mutex_unlock(&oi_init_lock);
return rc;
}
void osd_oi_fini(struct osd_thread_info *info, struct osd_oi *oi)
{
int i;
+
for (i = 0; i < ARRAY_SIZE(oi->oi_dir); ++i) {
if (oi->oi_dir[i] != NULL) {
lu_object_put(info->oti_env, &oi->oi_dir[i]->do_lu);
return NULL;
}
+static inline int fid_is_oi_fid(const struct lu_fid *fid)
+{
+ /* We need to filter-out oi obj's fid. As we can not store it, while
+ * oi-index create operation.
+ */
+ return (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE &&
+ (fid_oid(fid) == OSD_OI_FID_SMALL_OID ||
+ fid_oid(fid) == OSD_OI_FID_OTHER_OID)));
+}
+
int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
const struct lu_fid *fid, struct osd_inode_id *id)
{
struct dt_object *idx;
const struct dt_key *key;
+ if (fid_is_oi_fid(fid))
+ return -ENOENT;
+
key = oi_fid_key(info, oi, fid, &idx);
rc = idx->do_index_ops->dio_lookup(info->oti_env, idx,
(struct dt_rec *)id, key,
BYPASS_CAPA);
- id->oii_ino = be32_to_cpu(id->oii_ino);
- id->oii_gen = be32_to_cpu(id->oii_gen);
+ if (rc > 0) {
+ id->oii_ino = be32_to_cpu(id->oii_ino);
+ id->oii_gen = be32_to_cpu(id->oii_gen);
+ rc = 0;
+ } else if (rc == 0)
+ rc = -ENOENT;
}
return rc;
}
if (fid_is_igif(fid))
return 0;
+ if (fid_is_oi_fid(fid))
+ return 0;
+
key = oi_fid_key(info, oi, fid, &idx);
id = &info->oti_id;
id->oii_ino = cpu_to_be32(id0->oii_ino);
return idx->do_index_ops->dio_delete(info->oti_env, idx,
key, th, BYPASS_CAPA);
}
+
+int osd_oi_mod_init()
+{
+ mutex_init(&oi_init_lock);
+ return 0;
+}
/* struct rw_semaphore */
#include <linux/rwsem.h>
#include <lu_object.h>
+#include <md_object.h>
struct lu_fid;
struct osd_thread_info;
__u32 oii_gen; /* inode generation */
};
-int osd_oi_init(struct osd_thread_info *info,
- struct osd_oi *oi, struct dt_device *dev);
+int osd_oi_mod_init(void);
+int osd_oi_init(struct osd_thread_info *info,
+ struct osd_oi *oi,
+ struct dt_device *dev,
+ struct md_device *mdev);
void osd_oi_fini(struct osd_thread_info *info, struct osd_oi *oi);
int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi,
MDSDEV=${MDSDEV:-$TMP/${FSNAME}-mdt1}
MDSCOUNT=${MDSCOUNT:-3}
test $MDSCOUNT -gt 4 && MDSCOUNT=4
+MDSCOUNT=1
MDSDEVBASE=${MDSDEVBASE:-$TMP/${FSNAME}-mdt}
MDSSIZE=${MDSSIZE:-100000}
[ -z "$TUNEFS" ] && skip "No tunefs" && return
local DISK1_8=$LUSTRE/tests/disk1_8.tgz
[ ! -r $DISK1_8 ] && skip "Cannot find $DISK1_8" && return 0
- mkdir -p $TMP/$tdir
- tar xjvf $DISK1_8 -C $TMP/$tdir || \
+ local tmpdir=$TMP/$tdir
+ mkdir -p $tmpdir
+ tar xjvf $DISK1_8 -C $tmpdir || \
{ skip "Cannot untar $DISK1_8" && return ; }
load_modules
lctl set_param debug=$PTLDEBUG
- NEWNAME=sofia
+ NEWNAME=lustre
# writeconf will cause servers to register with their current nids
$TUNEFS --writeconf --fsname=$NEWNAME $tmpdir/mds || error "tunefs failed"
- start mds $tmpdir/mds "-o loop" || return 3
+ start mds1 $tmpdir/mds "-o loop" || return 3
local UUID=$(lctl get_param -n mdt.${NEWNAME}-MDT0000.uuid)
echo MDS uuid $UUID
- [ "$UUID" == "mdsA_UUID" ] || error "UUID is wrong: $UUID"
+ [ "$UUID" == "${NEWNAME}-MDT0000_UUID" ] || error "UUID is wrong: $UUID"
- $TUNEFS --mgsnode=`hostname` --fsname=$NEWNAME --writeconf $tmpdir/ost1 || error "tunefs failed"
+ $TUNEFS --mgsnode=`hostname` --writeconf --fsname=$NEWNAME $tmpdir/ost1 || error "tunefs failed"
start ost1 $tmpdir/ost1 "-o loop" || return 5
UUID=$(lctl get_param -n obdfilter.${NEWNAME}-OST0000.uuid)
echo OST uuid $UUID
- [ "$UUID" == "ost1_UUID" ] || error "UUID is wrong: $UUID"
+ [ "$UUID" == "${NEWNAME}-OST0000_UUID" ] || error "UUID is wrong: $UUID"
echo "OSC changes should succeed:"
$LCTL conf_param ${NEWNAME}-OST0000.osc.max_dirty_mb=15 || return 7
mount_client $MOUNT
FSNAME=$OLDFS
set_and_check client "lctl get_param -n mdc.*.max_rpcs_in_flight" "${NEWNAME}-MDT0000.mdc.max_rpcs_in_flight" || return 11
- [ "$(cksum $MOUNT/passwd | cut -d' ' -f 1,2)" == "2479747619 779" ] || return 12
+ [ "$(cksum $MOUNT/passwd | cut -d' ' -f 1,2)" == "94306271 1478" ] || return 12
echo "ok."
cleanup
export GSS_KRB5=false
export GSS_PIPEFS=false
export IDENTITY_UPCALL=default
+
#export PDSH="pdsh -S -Rssh -w"
# eg, assert_env LUSTRE MDSNODES OSTNODES CLIENTS
}
formatall() {
+ if [ "$IAMDIR" == "yes" ]; then
+ MDS_MKFS_OPTS="$MDS_MKFS_OPTS --iam-dir"
+ MDSn_MKFS_OPTS="$MDSn_MKFS_OPTS --iam-dir"
+ fi
+
[ "$FSTYPE" ] && FSTYPE_OPT="--backfstype $FSTYPE"
if [ ! -z $SEC ]; then
int verbose = 1;
static int print_only = 0;
static int failover = 0;
+static int upgrade_to_18 = 0;
void usage(FILE *out)
{
"\t\t--mkfsoptions=<opts> : format options\n"
"\t\t--reformat: overwrite an existing disk\n"
"\t\t--stripe-count-hint=#N : used for optimizing MDT inode size\n"
+ "\t\t--iam-dir: make use of IAM directory format on backfs, incompatible with ext3.\n"
#else
"\t\t--erase-params : erase all old parameter settings\n"
"\t\t--nomgs: turn off MGS service on this MDT\n"
printf("Lustre FS: %s\n", ldd->ldd_fsname);
printf("Mount type: %s\n", MT_STR(ldd));
printf("Flags: %#x\n", ldd->ldd_flags);
- printf(" (%s%s%s%s%s%s%s%s)\n",
+ printf(" (%s%s%s%s%s%s%s%s%s)\n",
IS_MDT(ldd) ? "MDT ":"",
IS_OST(ldd) ? "OST ":"",
IS_MGS(ldd) ? "MGS ":"",
ldd->ldd_flags & LDD_F_VIRGIN ? "first_time ":"",
ldd->ldd_flags & LDD_F_UPDATE ? "update ":"",
ldd->ldd_flags & LDD_F_WRITECONF ? "writeconf ":"",
+ ldd->ldd_flags & LDD_F_IAM_DIR ? "IAM_dir_format ":"",
ldd->ldd_flags & LDD_F_UPGRADE14 ? "upgrade1.4 ":"");
printf("Persistent mount opts: %s\n", ldd->ldd_mount_opts);
printf("Parameters:%s\n", ldd->ldd_params);
printf("\n");
}
+static int touch_file(char *filename)
+{
+ int fd;
+
+ if (filename == NULL) {
+ return 1;
+ }
+
+ fd = open(filename, O_CREAT | O_TRUNC, 0600);
+ if (fd < 0) {
+ return 1;
+ } else {
+ close(fd);
+ return 0;
+ }
+}
+
+/* keep it less than LL_FID_NAMELEN */
+#define DUMMY_FILE_NAME_LEN 25
+#define EXT3_DIRENT_SIZE DUMMY_FILE_NAME_LEN
+
+/* Need to add these many entries to this directory to make HTREE dir. */
+#define MIN_ENTRIES_REQ_FOR_HTREE ((L_BLOCK_SIZE / EXT3_DIRENT_SIZE))
+
+static int add_dummy_files(char *dir)
+{
+ char fpname[PATH_MAX];
+ int i;
+ int rc;
+
+ for (i = 0; i < MIN_ENTRIES_REQ_FOR_HTREE; i++) {
+ snprintf(fpname, PATH_MAX, "%s/%0*d", dir,
+ DUMMY_FILE_NAME_LEN, i);
+
+ rc = touch_file(fpname);
+ if (rc && rc != -EEXIST) {
+ fprintf(stderr,
+ "%s: Can't create dummy file %s: %s\n",
+ progname, fpname , strerror(errno));
+ return rc;
+ }
+ }
+ return 0;
+}
+
+static int __l_mkdir(char * filepnm, int mode , struct mkfs_opts *mop)
+{
+ int ret;
+
+ ret = mkdir(filepnm, mode);
+ if (ret && ret != -EEXIST)
+ return ret;
+
+ /* IAM mode supports ext3 directories of HTREE type only. So add dummy
+ * entries to new directory to create htree type of container for
+ * this directory. */
+ if (mop->mo_ldd.ldd_flags & LDD_F_IAM_DIR)
+ return add_dummy_files(filepnm);
+ return 0;
+}
+
/* Write the server config files */
int write_local_files(struct mkfs_opts *mop)
{
/* Set up initial directories */
sprintf(filepnm, "%s/%s", mntpt, MOUNT_CONFIGS_DIR);
- ret = mkdir(filepnm, 0777);
+ ret = __l_mkdir(filepnm, 0777, mop);
if ((ret != 0) && (errno != EEXIST)) {
fprintf(stderr, "%s: Can't make configs dir %s (%s)\n",
progname, filepnm, strerror(errno));
ret = 0;
}
- sprintf(filepnm, "%s/%s", mntpt, "ROOT");
- ret = mkdir(filepnm, 0777);
- if ((ret != 0) && (errno != EEXIST)) {
- fprintf(stderr, "%s: Can't make ROOT dir %s (%s)\n",
- progname, filepnm, strerror(errno));
- goto out_umnt;
- } else if (errno == EEXIST) {
- ret = 0;
- }
-
/* Save the persistent mount data into a file. Lustre must pre-read
this file to get the real mount options. */
vprint("Writing %s\n", MOUNT_DATA_FILE);
}
fwrite(&mop->mo_ldd, sizeof(mop->mo_ldd), 1, filep);
fclose(filep);
-
/* COMPAT_146 */
#ifdef TUNEFS
/* Check for upgrade */
#endif
/* end COMPAT_146 */
-
out_umnt:
umount(mntpt);
out_rmdir:
char **mountopts)
{
static struct option long_opt[] = {
+ {"iam-dir", 0, 0, 'a'},
{"backfstype", 1, 0, 'b'},
{"stripe-count-hint", 1, 0, 'c'},
{"comment", 1, 0, 'u'},
{"reformat", 0, 0, 'r'},
{"verbose", 0, 0, 'v'},
{"writeconf", 0, 0, 'w'},
+ {"upgrade_to_18", 0, 0, 'U'},
{0, 0, 0, 0}
};
char *optstring = "b:c:C:d:ef:Ghi:k:L:m:MnNo:Op:Pqru:vw";
while ((opt = getopt_long(argc, argv, optstring, long_opt, &longidx)) !=
EOF) {
switch (opt) {
+ case 'a': {
+ if (IS_MDT(&mop->mo_ldd))
+ mop->mo_ldd.ldd_flags |= LDD_F_IAM_DIR;
+ break;
+ }
case 'b': {
int i = 0;
while (i < LDD_MT_LAST) {
case 'w':
mop->mo_ldd.ldd_flags |= LDD_F_WRITECONF;
break;
+ case 'U':
+ upgrade_to_18 = 1;
+ break;
default:
if (opt != '?') {
fatal();
return 0;
}
-#include <lustre/libiam.h>
-
-#define LDISKFS_IOC_GETVERSION _IOR('f', 3, long)
-
-#ifndef TUNEFS /* mkfs.lustre */
-static int mkfs_iam_insert(int key_need_convert, char *keybuf,
- int rec_need_convert, char *recbuf, char *filename)
-{
- int fd;
- int ret;
- struct iam_uapi_info ua;
-
- fd = iam_open(filename, &ua);
- if (fd < 0) {
- fprintf(stderr, "failed to iam_open %s\n", filename);
- return 1;
- }
-
- ret = iam_insert(fd, &ua,
- key_need_convert, keybuf,
- rec_need_convert, recbuf);
- iam_close(fd);
- if (ret) {
- fprintf(stderr, "failed to iam_insert %s\n", filename);
- return 1;
- } else {
- return 0;
- }
-}
-
-static int touch_file(char *filename)
-{
- int fd;
-
- if (filename == NULL) {
- return 1;
- }
-
- fd = open(filename, O_CREAT | O_TRUNC, 0600);
- if (fd < 0) {
- return 1;
- } else {
- close(fd);
- return 0;
- }
-}
-
-static int get_generation(char *filename, unsigned long *result)
-{
- int fd;
- int ret;
-
- if (filename == NULL) {
- return 1;
- }
-
- fd = open(filename, O_RDONLY);
- if (fd < 0) {
- fprintf(stderr, "%s: failed to open %s\n",
- __FUNCTION__, filename);
- return 1;
- }
-
- ret = ioctl(fd, LDISKFS_IOC_GETVERSION, result);
- close(fd);
-
- return ((ret < 0) ? ret : 0);
-}
-
-static int mkfs_mdt(struct mkfs_opts *mop)
-{
- char mntpt[] = "/tmp/mntXXXXXX";
- char fstype[] = "ldiskfs";
- char filepnm[128];
- char recbuf[64];
- char *source;
- int ret;
- unsigned long generation;
- struct stat st;
-
- source = mop->mo_device;
- if (mop->mo_flags & MO_IS_LOOP) {
- source = mop->mo_loopdev;
- }
-
- if ((source == NULL) || (*source == 0)) {
- return 1;
- }
-
- if (!mkdtemp(mntpt)) {
- fprintf(stderr, "%s: failed to mkdtemp %s\n",
- __FUNCTION__, mntpt);
- return errno;
- }
-
- ret = mount(source, mntpt, fstype, 0, NULL);
- if (ret) {
- goto out_rmdir;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "seq_ctl");
- ret = touch_file(filepnm);
- if (ret) {
- goto out_umount;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "seq_srv");
- ret = touch_file(filepnm);
- if (ret) {
- goto out_umount;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "last_received");
- ret = touch_file(filepnm);
- if (ret) {
- goto out_umount;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "lov_objid");
- ret = touch_file(filepnm);
- if (ret) {
- goto out_umount;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "root");
- ret = iam_creat(filepnm, FMT_LVAR, L_BLOCK_SIZE, 4, 17, 4);
- if (ret) {
- goto out_umount;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "fld");
- ret = iam_creat(filepnm, FMT_LFIX, L_BLOCK_SIZE, 8, 8, 4);
- if (ret) {
- goto out_umount;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "orphans");
- ret = iam_creat(filepnm, FMT_LFIX, L_BLOCK_SIZE, 20, 8, 4);
- if (ret) {
- goto out_umount;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "oi.16");
- ret = iam_creat(filepnm, FMT_LFIX, L_BLOCK_SIZE, 16, 8, 4);
- if (ret) {
- goto out_umount;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "oi.5");
- ret = iam_creat(filepnm, FMT_LFIX, L_BLOCK_SIZE, 5, 8, 4);
- if (ret) {
- goto out_umount;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, CAPA_KEYS);
- ret = touch_file(filepnm);
- if (ret) {
- goto out_umount;
- }
-
- umount(mntpt);
- ret = mount(source, mntpt, fstype, 0, NULL);
- if (ret) {
- goto out_rmdir;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "root");
- ret = iam_polymorph(filepnm, 040755);
- if (ret) {
- perror("IAM_IOC_POLYMORPH");
- goto out_umount;
- }
-
- umount(mntpt);
- ret = mount(source, mntpt, fstype, 0, NULL);
- if (ret) {
- goto out_rmdir;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "fld");
- ret = mkfs_iam_insert(1, "0000000000000002", 1, "0000000000000000", filepnm);
- if (ret) {
- goto out_umount;
- }
-
- ret = mkfs_iam_insert(1, "0000000000000001", 1, "0000000000000000", filepnm);
- if (ret) {
- goto out_umount;
- }
-
- snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "root");
- ret = stat(filepnm, &st);
- if (ret) {
- goto out_umount;
- }
-
- ret = get_generation(filepnm, &generation);
- if (ret) {
- goto out_umount;
- }
-
- snprintf(recbuf, sizeof(recbuf) - 1, "110000000000000001%8.8x%8.8x",
- (unsigned int)st.st_ino, (unsigned int)generation);
- ret = mkfs_iam_insert(0, ".", 1, recbuf, filepnm);
- if (ret) {
- goto out_umount;
- }
-
- ret = mkfs_iam_insert(0, "..", 1, recbuf, filepnm);
- if (ret) {
- goto out_umount;
- }
-
-out_umount:
- umount(mntpt);
-out_rmdir:
- rmdir(mntpt);
- return ret;
-}
-#endif
-
int main(int argc, char *const argv[])
{
struct mkfs_opts mop;
goto out;
}
-#ifndef TUNEFS /* mkfs.lustre */
- if (IS_MDT(ldd)) {
- ret = mkfs_mdt(&mop);
- if (ret != 0) {
- fprintf(stderr, "failed to mkfs_mdt\n");
- goto out;
- }
- }
-#endif
-
out:
loop_cleanup(&mop);