From 90d8e7fd28746a572c8de488222f5312fe927fc3 Mon Sep 17 00:00:00 2001 From: deshmukh Date: Wed, 19 Nov 2008 08:55:54 +0000 Subject: [PATCH] Land b_head_interop_disk on HEAD (20081119_1314) b=11826 i=nikita i=adilger --- lustre/ChangeLog | 5 + lustre/cmm/cmm_device.c | 16 +- lustre/fid/fid_handler.c | 15 + lustre/fid/fid_lib.c | 1 + lustre/fid/fid_store.c | 2 +- lustre/fld/fld_handler.c | 10 + lustre/fld/fld_index.c | 15 +- lustre/include/dt_object.h | 75 +- lustre/include/linux/lvfs.h | 2 +- lustre/include/lu_object.h | 15 +- lustre/include/lustre/lustre_idl.h | 16 +- lustre/include/lustre_disk.h | 63 +- lustre/include/lustre_fid.h | 32 + lustre/include/lustre_fld.h | 4 + lustre/include/md_object.h | 50 +- lustre/include/obd.h | 35 +- lustre/include/obd_class.h | 8 + lustre/include/obd_lov.h | 1 - lustre/llite/llite_fid.c | 12 + lustre/llite/llite_internal.h | 1 + lustre/llite/llite_lib.c | 1 + lustre/llite/xattr.c | 4 +- lustre/lov/lov_obd.c | 6 +- lustre/lov/lov_request.c | 7 +- lustre/lvfs/lvfs_linux.c | 4 +- lustre/mdd/mdd_device.c | 83 +- lustre/mdd/mdd_dir.c | 83 +- lustre/mdd/mdd_internal.h | 34 +- lustre/mdd/mdd_lov.c | 111 ++- lustre/mdd/mdd_object.c | 88 +- lustre/mdd/mdd_orphans.c | 422 +++++++-- lustre/mdd/mdd_trans.c | 18 +- lustre/mds/handler.c | 3 + lustre/mds/mds_fs.c | 2 +- lustre/mds/mds_lov.c | 6 +- lustre/mdt/mdt_capa.c | 8 +- lustre/mdt/mdt_handler.c | 58 +- lustre/mdt/mdt_internal.h | 14 +- lustre/mdt/mdt_open.c | 1 + lustre/mdt/mdt_recovery.c | 73 +- lustre/mdt/mdt_reint.c | 1 + lustre/obdclass/Makefile.in | 1 + lustre/obdclass/dt_object.c | 228 ++++- lustre/obdclass/lu_object.c | 29 +- lustre/obdclass/md_local_object.c | 447 +++++++++ lustre/obdclass/obd_config.c | 23 + lustre/obdfilter/filter.c | 73 +- lustre/obdfilter/filter_internal.h | 6 - lustre/obdfilter/lproc_obdfilter.c | 8 +- lustre/osc/osc_create.c | 4 +- lustre/osc/osc_request.c | 12 +- lustre/osd/osd_handler.c | 1757 +++++++++++++++++++++++++++--------- lustre/osd/osd_internal.h | 78 +- lustre/osd/osd_oi.c | 115 ++- lustre/osd/osd_oi.h | 8 +- lustre/tests/cfg/lmv.sh | 1 + lustre/tests/conf-sanity.sh | 17 +- lustre/tests/disk1_8.tgz | Bin 0 -> 10506 bytes lustre/tests/test-framework.sh | 6 + lustre/utils/mkfs_lustre.c | 321 ++----- 60 files changed, 3479 insertions(+), 1060 deletions(-) create mode 100644 lustre/obdclass/md_local_object.c create mode 100644 lustre/tests/disk1_8.tgz diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 0026ea9..1a55ea8 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -13,6 +13,11 @@ tbd Sun Microsystems, Inc. removed cwd "./" (refer to Bugzilla 14399). * File join has been disabled in this release, refer to Bugzilla 16929. + +Severity : enhancement +Bugzilla : 11826 +Description: Interoperability at server side (Disk interoperability) + Severity : enhancement Bugzilla : 17458 Description: Update to SLES10 SP2 kernel-2.6.16.60-0.31. diff --git a/lustre/cmm/cmm_device.c b/lustre/cmm/cmm_device.c index 839322a..a74fdf4 100644 --- a/lustre/cmm/cmm_device.c +++ b/lustre/cmm/cmm_device.c @@ -613,10 +613,24 @@ static int cmm_recovery_complete(const struct lu_env *env, RETURN(rc); } +static int cmm_prepare(const struct lu_env *env, + struct lu_device *pdev, + struct lu_device *dev) +{ + struct cmm_device *cmm = lu2cmm_dev(dev); + struct lu_device *next = md2lu_dev(cmm->cmm_child); + int rc; + + ENTRY; + rc = next->ld_ops->ldo_prepare(env, dev, next); + RETURN(rc); +} + static const struct lu_device_operations cmm_lu_ops = { .ldo_object_alloc = cmm_object_alloc, .ldo_process_config = cmm_process_config, - .ldo_recovery_complete = cmm_recovery_complete + .ldo_recovery_complete = cmm_recovery_complete, + .ldo_prepare = cmm_prepare, }; /* --- lu_device_type operations --- */ diff --git a/lustre/fid/fid_handler.c b/lustre/fid/fid_handler.c index 774aacc..2b28571 100644 --- a/lustre/fid/fid_handler.c +++ b/lustre/fid/fid_handler.c @@ -556,6 +556,18 @@ EXPORT_SYMBOL(seq_server_fini); cfs_proc_dir_entry_t *seq_type_proc_dir = NULL; +static struct lu_local_obj_desc llod_seq_srv = { + .llod_name = LUSTRE_SEQ_SRV_NAME, + .llod_oid = FID_SEQ_SRV_OID, + .llod_is_index = 0, +}; + +static struct lu_local_obj_desc llod_seq_ctl = { + .llod_name = LUSTRE_SEQ_CTL_NAME, + .llod_oid = FID_SEQ_CTL_OID, + .llod_is_index = 0, +}; + static int __init fid_mod_init(void) { seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME, @@ -564,6 +576,9 @@ static int __init fid_mod_init(void) if (IS_ERR(seq_type_proc_dir)) return PTR_ERR(seq_type_proc_dir); + llo_local_obj_register(&llod_seq_srv); + llo_local_obj_register(&llod_seq_ctl); + LU_CONTEXT_KEY_INIT(&seq_thread_key); lu_context_key_register(&seq_thread_key); return 0; diff --git a/lustre/fid/fid_lib.c b/lustre/fid/fid_lib.c index 254a4e1..694ee78 100644 --- a/lustre/fid/fid_lib.c +++ b/lustre/fid/fid_lib.c @@ -69,6 +69,7 @@ * * * The first 0x400 sequences of normal FID are reserved for special purpose. + * FID_SEQ_START + 1 is for local file id generation. */ const struct lu_range LUSTRE_SEQ_SPACE_RANGE = { FID_SEQ_START + 0x400ULL, diff --git a/lustre/fid/fid_store.c b/lustre/fid/fid_store.c index 7a827da..de4bec3 100644 --- a/lustre/fid/fid_store.c +++ b/lustre/fid/fid_store.c @@ -167,7 +167,7 @@ int seq_store_init(struct lu_server_seq *seq, name = seq->lss_type == LUSTRE_SEQ_SERVER ? LUSTRE_SEQ_SRV_NAME : LUSTRE_SEQ_CTL_NAME; - dt_obj = dt_store_open(env, dt, name, &fid); + dt_obj = dt_store_open(env, dt, "", name, &fid); if (!IS_ERR(dt_obj)) { seq->lss_obj = dt_obj; rc = 0; diff --git a/lustre/fld/fld_handler.c b/lustre/fld/fld_handler.c index 3138a54..a5809bc 100644 --- a/lustre/fld/fld_handler.c +++ b/lustre/fld/fld_handler.c @@ -63,6 +63,7 @@ #include #include +#include #include #include "fld_internal.h" @@ -76,6 +77,13 @@ LU_CONTEXT_KEY_DEFINE(fld, LCT_MD_THREAD|LCT_DT_THREAD); cfs_proc_dir_entry_t *fld_type_proc_dir = NULL; +static struct lu_local_obj_desc llod_fld_index = { + .llod_name = fld_index_name, + .llod_oid = FLD_INDEX_OID, + .llod_is_index = 1, + .llod_feat = &fld_index_features, +}; + static int __init fld_mod_init(void) { fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME, @@ -84,6 +92,8 @@ static int __init fld_mod_init(void) if (IS_ERR(fld_type_proc_dir)) return PTR_ERR(fld_type_proc_dir); + llo_local_obj_register(&llod_fld_index); + LU_CONTEXT_KEY_INIT(&fld_thread_key); lu_context_key_register(&fld_thread_key); return 0; diff --git a/lustre/fld/fld_index.c b/lustre/fld/fld_index.c index aba0bb0..1b927ea 100644 --- a/lustre/fld/fld_index.c +++ b/lustre/fld/fld_index.c @@ -64,15 +64,19 @@ #include "fld_internal.h" const char fld_index_name[] = "fld"; +EXPORT_SYMBOL(fld_index_name); -static const struct dt_index_features fld_index_features = { +const struct dt_index_features fld_index_features = { .dif_flags = DT_IND_UPDATE, .dif_keysize_min = sizeof(seqno_t), .dif_keysize_max = sizeof(seqno_t), .dif_recsize_min = sizeof(mdsno_t), - .dif_recsize_max = sizeof(mdsno_t) + .dif_recsize_max = sizeof(mdsno_t), + .dif_ptrsize = 4 }; +EXPORT_SYMBOL(fld_index_features); + /* * number of blocks to reserve for particular operations. Should be function of * ... something. Stub for now. @@ -173,8 +177,11 @@ int fld_index_lookup(struct lu_server_fld *fld, rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, rec, fld_key(env, seq), BYPASS_CAPA); - if (rc == 0) + if (rc > 0) { *mds = be64_to_cpu(*(__u64 *)rec); + rc = 0; + } else + rc = -ENOENT; RETURN(rc); } @@ -187,7 +194,7 @@ int fld_index_init(struct lu_server_fld *fld, int rc; ENTRY; - dt_obj = dt_store_open(env, dt, fld_index_name, &fid); + dt_obj = dt_store_open(env, dt, "", fld_index_name, &fid); if (!IS_ERR(dt_obj)) { fld->lsf_obj = dt_obj; rc = dt_obj->do_ops->do_index_try(env, dt_obj, diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 0cd80c4..536273d 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -171,6 +171,8 @@ struct dt_index_features { size_t dif_recsize_min; /** maximal required record size, 0 if no limit */ size_t dif_recsize_max; + /** pointer size for record */ + size_t dif_ptrsize; }; enum dt_index_flags { @@ -196,11 +198,51 @@ extern const struct dt_index_features dt_directory_features; * It can contain any allocation hint in the future. */ struct dt_allocation_hint { - struct dt_object *dah_parent; - __u32 dah_mode; + struct dt_object *dah_parent; + __u32 dah_mode; }; /** + * object type specifier. + */ + +enum dt_format_type { + DFT_REGULAR, + DFT_DIR, + /** for mknod */ + DFT_NODE, + /** for special index */ + DFT_INDEX, + /** for symbolic link */ + DFT_SYM, +}; + +/** + * object format specifier. + */ +struct dt_object_format { + /** type for dt object */ + enum dt_format_type dof_type; + union { + struct dof_regular { + } dof_reg; + struct dof_dir { + } dof_dir; + struct dof_node { + } dof_node; + /** + * special index need feature as parameter to create + * special idx + */ + struct dof_index { + const struct dt_index_features *di_feat; + } dof_idx; + } u; +}; + +enum dt_format_type dt_mode_to_dft(__u32 mode); + +/** * Per-dt-object operations. */ struct dt_object_operations { @@ -297,6 +339,7 @@ struct dt_object_operations { int (*do_create)(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th); /** @@ -397,7 +440,7 @@ struct dt_index_operations { * precondition: dt_object_exists(dt); */ struct dt_it *(*init)(const struct lu_env *env, - struct dt_object *dt, int writable, + struct dt_object *dt, struct lustre_capa *capa); void (*fini)(const struct lu_env *env, struct dt_it *di); @@ -406,8 +449,6 @@ struct dt_index_operations { const struct dt_key *key); void (*put)(const struct lu_env *env, struct dt_it *di); - int (*del)(const struct lu_env *env, - struct dt_it *di, struct thandle *th); int (*next)(const struct lu_env *env, struct dt_it *di); struct dt_key *(*key)(const struct lu_env *env, @@ -536,10 +577,30 @@ int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn); int dt_txn_hook_commit(const struct lu_env *env, struct thandle *txn); int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj); + +/** + * Callback function used for parsing path. + * \see llo_store_resolve + */ +typedef int (*dt_entry_func_t)(const struct lu_env *env, + const char *name, + void *pvt); + +#define DT_MAX_PATH 1024 + +int dt_path_parser(const struct lu_env *env, + char *local, dt_entry_func_t entry_func, + void *data); + struct dt_object *dt_store_open(const struct lu_env *env, - struct dt_device *dt, const char *name, + struct dt_device *dt, + const char *dirname, + const char *filename, struct lu_fid *fid); -/** @} dt */ +struct dt_object *dt_locate(const struct lu_env *env, + struct dt_device *dev, + const struct lu_fid *fid); +/** @} dt */ #endif /* __LUSTRE_DT_OBJECT_H */ diff --git a/lustre/include/linux/lvfs.h b/lustre/include/linux/lvfs.h index 26959b5..17576c3 100644 --- a/lustre/include/linux/lvfs.h +++ b/lustre/include/linux/lvfs.h @@ -96,7 +96,7 @@ struct lvfs_run_ctxt { #ifdef __KERNEL__ struct dentry *simple_mkdir(struct dentry *dir, struct vfsmount *mnt, - char *name, int mode, int fix); + const char *name, int mode, int fix); struct dentry *simple_mknod(struct dentry *dir, char *name, int mode, int fix); int lustre_rename(struct dentry *dir, struct vfsmount *mnt, char *oldname, char *newname); diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index 9c2f283..1b00b02 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -156,6 +156,16 @@ struct lu_device_operations { int (*ldo_recovery_complete)(const struct lu_env *, struct lu_device *); + /** + * initialize local objects for device. this method called after layer has + * been initialized (after LCFG_SETUP stage) and before it starts serving + * user requests. + */ + + int (*ldo_prepare)(const struct lu_env *, + struct lu_device *parent, + struct lu_device *dev); + }; /** @@ -1268,8 +1278,8 @@ int lu_site_stats_print(const struct lu_site *s, char *page, int count); * Common name structure to be passed around for various name related methods. */ struct lu_name { - char *ln_name; - int ln_namelen; + const char *ln_name; + int ln_namelen; }; /** @@ -1320,5 +1330,4 @@ int lu_kmem_init(struct lu_kmem_descr *caches); void lu_kmem_fini(struct lu_kmem_descr *caches); /** @} lu */ - #endif /* __LUSTRE_LU_OBJECT_H */ diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 81fd3b7..59fcca8 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -269,10 +269,24 @@ struct lu_fid { }; /** + * Following struct for MDT attributes, that will be kept inode's EA. + * Introduced in 2.0 release (please see b15993, for details) + */ +struct lustre_mdt_attrs { + /** FID of this inode */ + struct lu_fid lma_self_fid; + /** SOM state, mdt/ost type, others */ + __u64 lma_flags; + /** total sectors in objects */ + __u64 lma_som_sectors; +}; + + +/** * fid constants */ enum { - /* initial fid id value */ + /** initial fid id value */ LUSTRE_FID_INIT_OID = 1UL }; diff --git a/lustre/include/lustre_disk.h b/lustre/include/lustre_disk.h index 0e253d0..6e0a0f6 100644 --- a/lustre/include/lustre_disk.h +++ b/lustre/include/lustre_disk.h @@ -50,9 +50,10 @@ #define MDT_LOGS_DIR "LOGS" /* COMPAT_146 */ #define MOUNT_CONFIGS_DIR "CONFIGS" -/* Persistent mount data are stored on the disk in this file. */ -#define MOUNT_DATA_FILE MOUNT_CONFIGS_DIR"/mountdata" -#define LAST_RCVD "last_received" +#define CONFIGS_FILE "mountdata" +/** Persistent mount data are stored on the disk in this file. */ +#define MOUNT_DATA_FILE MOUNT_CONFIGS_DIR"/"CONFIGS_FILE +#define LAST_RCVD "last_rcvd" #define LOV_OBJID "lov_objid" #define HEALTH_CHECK "health_check" #define CAPA_KEYS "capa_keys" @@ -62,13 +63,22 @@ #define LDD_F_SV_TYPE_MDT 0x0001 #define LDD_F_SV_TYPE_OST 0x0002 #define LDD_F_SV_TYPE_MGS 0x0004 -#define LDD_F_NEED_INDEX 0x0010 /* need an index assignment */ -#define LDD_F_VIRGIN 0x0020 /* never registered */ -#define LDD_F_UPDATE 0x0040 /* update the config logs for this server*/ -#define LDD_F_REWRITE_LDD 0x0080 /* rewrite the LDD */ -#define LDD_F_WRITECONF 0x0100 /* regenerate all logs for this fs */ -#define LDD_F_UPGRADE14 0x0200 /* COMPAT_14 */ -#define LDD_F_PARAM 0x0400 /* process as lctl conf_param */ +/** need an index assignment */ +#define LDD_F_NEED_INDEX 0x0010 +/** never registered */ +#define LDD_F_VIRGIN 0x0020 +/** update the config logs for this server*/ +#define LDD_F_UPDATE 0x0040 +/** rewrite the LDD */ +#define LDD_F_REWRITE_LDD 0x0080 +/** regenerate all logs for this fs */ +#define LDD_F_WRITECONF 0x0100 +/** COMPAT_14 */ +#define LDD_F_UPGRADE14 0x0200 +/** process as lctl conf_param */ +#define LDD_F_PARAM 0x0400 +/** backend fs make use of IAM directory format. */ +#define LDD_F_IAM_DIR 0x0800 enum ldd_mount_type { LDD_MT_EXT3 = 0, @@ -196,17 +206,28 @@ struct lustre_mount_data { #define LR_MAX_CLIENTS (CFS_PAGE_SIZE * 8) #endif -/* COMPAT_146 */ -#define OBD_COMPAT_OST 0x00000002 /* this is an OST (temporary) */ -#define OBD_COMPAT_MDT 0x00000004 /* this is an MDT (temporary) */ -/* end COMPAT_146 */ - -#define OBD_ROCOMPAT_LOVOBJID 0x00000001 /* MDS handles LOV_OBJID file */ - -#define OBD_INCOMPAT_GROUPS 0x00000001 /* OST handles group subdirs */ -#define OBD_INCOMPAT_OST 0x00000002 /* this is an OST */ -#define OBD_INCOMPAT_MDT 0x00000004 /* this is an MDT */ -#define OBD_INCOMPAT_COMMON_LR 0x00000008 /* common last_rvcd format */ +/** COMPAT_146: this is an OST (temporary) */ +#define OBD_COMPAT_OST 0x00000002 +/** COMPAT_146: this is an MDT (temporary) */ +#define OBD_COMPAT_MDT 0x00000004 + +/** MDS handles LOV_OBJID file */ +#define OBD_ROCOMPAT_LOVOBJID 0x00000001 + +/** OST handles group subdirs */ +#define OBD_INCOMPAT_GROUPS 0x00000001 +/** this is an OST */ +#define OBD_INCOMPAT_OST 0x00000002 +/** this is an MDT */ +#define OBD_INCOMPAT_MDT 0x00000004 +/** common last_rvcd format */ +#define OBD_INCOMPAT_COMMON_LR 0x00000008 +/** FID is enabled */ +#define OBD_INCOMPAT_FID 0x00000010 +/** + * lustre disk using iam format to store directory entries + */ +#define OBD_INCOMPAT_IAM_DIR 0x00000020 /* Data stored per server at the head of the last_rcvd file. In le32 order. diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index 7133abd..470feae 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -79,6 +79,38 @@ enum { LUSTRE_SEQ_SUPER_WIDTH = (LUSTRE_SEQ_META_WIDTH * LUSTRE_SEQ_META_WIDTH) }; +/** special fid seq: used for local object create. */ +#define FID_SEQ_LOCAL_FILE (FID_SEQ_START + 1) + +/** special OID for local objects */ +enum { + /** \see osd_oi_index_create */ + OSD_OI_FID_SMALL_OID = 1UL, + OSD_OI_FID_OTHER_OID = 2UL, + /** \see fld_mod_init */ + FLD_INDEX_OID = 3UL, + /** \see fid_mod_init */ + FID_SEQ_CTL_OID = 4UL, + FID_SEQ_SRV_OID = 5UL, + /** \see mdd_mod_init */ + MDD_ROOT_INDEX_OID = 6UL, + MDD_ORPHAN_OID = 7UL, + MDD_LOV_OBJ_OID = 8UL, + MDD_CAPA_KEYS_OID = 9UL, + MDD_OBJECTS_OID = 10UL, + /** \see mdt_mod_init */ + MDT_LAST_RECV_OID = 11UL, + /** \see osd_mod_init */ + OSD_REM_OBJ_DIR_OID = 12UL, +}; + +static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid) +{ + fid->f_seq = FID_SEQ_LOCAL_FILE; + fid->f_oid = oid; + fid->f_ver = 0; +} + enum lu_mgr_type { LUSTRE_SEQ_SERVER, LUSTRE_SEQ_CONTROLLER diff --git a/lustre/include/lustre_fld.h b/lustre/include/lustre_fld.h index a65408f..ec8be4f 100644 --- a/lustre/include/lustre_fld.h +++ b/lustre/include/lustre_fld.h @@ -46,6 +46,10 @@ struct lu_client_fld; struct lu_server_fld; +extern const struct dt_index_features fld_index_features; +extern const char fld_index_name[]; + + struct fld_stats { __u64 fst_count; __u64 fst_cache; diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index ecc92dc..5a20550 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -183,6 +183,9 @@ struct md_op_spec { /** Check for split */ int sp_ck_split; + + /** to create directory */ + const struct dt_index_features *sp_feat; }; /** @@ -802,6 +805,51 @@ static inline int mdo_rename_tgt(const struct lu_env *env, } } -/** @} md */ +struct dt_device; +/** + * Structure to hold object information. This is used to create object + */ +struct lu_local_obj_desc { + const char *llod_name; + __u32 llod_oid; + int llod_is_index; + const struct dt_index_features * llod_feat; + struct list_head llod_linkage; +}; + +struct md_object *llo_store_resolve(const struct lu_env *env, + struct md_device *md, + struct dt_device *dt, + const char *path, + struct lu_fid *fid); + +struct md_object *llo_store_open(const struct lu_env *env, + struct md_device *md, + struct dt_device *dt, + const char *dirname, + const char *objname, + struct lu_fid *fid); + +struct md_object *llo_store_create_index(const struct lu_env *env, + struct md_device *md, + struct dt_device *dt, + const char *dirname, + const char *objname, + const struct lu_fid *fid, + const struct dt_index_features *feat); + +struct md_object *llo_store_create(const struct lu_env *env, + struct md_device *md, + struct dt_device *dt, + const char *dirname, + const char *objname, + const struct lu_fid *fid); + +int llo_local_obj_register(struct lu_local_obj_desc *); + +int llo_local_objects_setup(const struct lu_env *env, + struct md_device * md, + struct dt_device * dt); +/** @} md */ #endif /* _LINUX_MD_OBJECT_H */ diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 5dd346e..2d499b6 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -891,11 +891,40 @@ struct target_recovery_data { }; enum filter_groups { + FILTER_GROUP_MDS0 = 0, FILTER_GROUP_LLOG = 1, - FILTER_GROUP_ECHO, - FILTER_GROUP_MDS0 + FILTER_GROUP_ECHO = 2 , + FILTER_GROUP_MDS1_N_BASE = 3 }; +static inline __u64 obdo_mdsno(struct obdo *oa) +{ + if (oa->o_gr) + return oa->o_gr - FILTER_GROUP_MDS1_N_BASE; + return 0; +} + +static inline int mdt_to_obd_objgrp(int mdtid) +{ + if (mdtid) + return FILTER_GROUP_MDS1_N_BASE + mdtid; + return 0; +} + +/** + * In HEAD for CMD, the object is created in group number which is 3>= + * or indexing starts from 3. To test this assertions are added to disallow + * group 0. But to run 2.0 mds server on 1.8.x disk format (i.e. interop_mode) + * object in group 0 needs to be allowed. + * So for interop mode following changes needs to be done: + * 1. No need to assert on group 0 or allow group 0 + * 2. The group number indexing starts from 0 instead of 3 + */ + +#define CHECK_MDS_GROUP(group) (group == FILTER_GROUP_MDS0 || \ + group > FILTER_GROUP_MDS1_N_BASE) +#define LASSERT_MDS_GROUP(group) LASSERT(CHECK_MDS_GROUP(group)) + struct obd_llog_group { struct list_head olg_list; int olg_group; @@ -1545,7 +1574,7 @@ static inline void init_obd_quota_ops(quota_interface_t *interface, static inline __u64 oinfo_mdsno(struct obd_info *oinfo) { - return oinfo->oi_oa->o_gr - FILTER_GROUP_MDS0; + return obdo_mdsno(oinfo->oi_oa); } static inline struct lustre_capa *oinfo_capa(struct obd_info *oinfo) diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 8fe3e6c..385fbd9 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -433,10 +433,15 @@ static inline int obd_setup(struct obd_device *obd, struct lustre_cfg *cfg) ldt = obd->obd_type->typ_lu; if (ldt != NULL) { + struct lu_context session_ctx; struct lu_env env; + lu_context_init(&session_ctx, LCT_SESSION); + session_ctx.lc_thread = NULL; + lu_context_enter(&session_ctx); rc = lu_env_init(&env, ldt->ldt_ctx_tags); if (rc == 0) { + env.le_ses = &session_ctx; d = ldt->ldt_ops->ldto_device_alloc(&env, ldt, cfg); lu_env_fini(&env); if (!IS_ERR(d)) { @@ -446,6 +451,9 @@ static inline int obd_setup(struct obd_device *obd, struct lustre_cfg *cfg) } else rc = PTR_ERR(d); } + lu_context_exit(&session_ctx); + lu_context_fini(&session_ctx); + } else { OBD_CHECK_DT_OP(obd, setup, -EOPNOTSUPP); OBD_COUNTER_INCREMENT(obd, setup); diff --git a/lustre/include/obd_lov.h b/lustre/include/obd_lov.h index da3ca51..b4de8d2 100644 --- a/lustre/include/obd_lov.h +++ b/lustre/include/obd_lov.h @@ -52,7 +52,6 @@ static inline int lov_mds_md_size(int stripes, int lmm_magic) stripes * sizeof(struct lov_ost_data_v1); } - #define IOC_LOV_TYPE 'g' #define IOC_LOV_MIN_NR 50 #define IOC_LOV_SET_OSC_ACTIVE _IOWR('g', 50, long) diff --git a/lustre/llite/llite_fid.c b/lustre/llite/llite_fid.c index 15c4021..eab0e84 100644 --- a/lustre/llite/llite_fid.c +++ b/lustre/llite/llite_fid.c @@ -77,3 +77,15 @@ ino_t ll_fid_build_ino(struct ll_sb_info *sbi, ino = ino | 0x80000000; RETURN(ino); } + +__u32 ll_fid_build_gen(struct ll_sb_info *sbi, + struct lu_fid *fid) +{ + __u32 gen = 0; + ENTRY; + + if (fid_is_igif(fid)) { + gen = lu_igif_gen(fid); + } + RETURN(gen); +} diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 2c6153b..22b5981 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -964,6 +964,7 @@ int lustre_check_remote_perm(struct inode *inode, int mask); /* llite/llite_fid.c */ ino_t ll_fid_build_ino(struct ll_sb_info *sbi, struct lu_fid *fid); +__u32 ll_fid_build_gen(struct ll_sb_info *sbi, struct lu_fid *fid); /* llite/llite_capa.c */ extern cfs_timer_t ll_capa_timer; diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 4542588..a2a6c3e 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -1619,6 +1619,7 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md) } #endif inode->i_ino = ll_fid_build_ino(sbi, &body->fid1); + inode->i_generation = ll_fid_build_gen(sbi, &body->fid1); if (body->valid & OBD_MD_FLATIME && body->atime > LTIME_S(inode->i_atime)) diff --git a/lustre/llite/xattr.c b/lustre/llite/xattr.c index e2835d9..d66a025 100644 --- a/lustre/llite/xattr.c +++ b/lustre/llite/xattr.c @@ -238,7 +238,9 @@ int ll_setxattr(struct dentry *dentry, const char *name, } return rc; - } + + } else if (strcmp(name, "trusted.lma") == 0) /* b17288: ignore common_ea */ + return 0; return ll_setxattr_common(inode, name, value, size, flags, OBD_MD_FLXATTR); diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index ea90841..7e49a54 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1692,7 +1692,7 @@ static int lov_change_cbdata(struct obd_export *exp, if (!exp || !exp->exp_obd) RETURN(-ENODEV); - LASSERT(lsm->lsm_object_gr > 0); + LASSERT_MDS_GROUP(lsm->lsm_object_gr); lov = &exp->exp_obd->u.lov; for (i = 0; i < lsm->lsm_stripe_count; i++) { @@ -1730,7 +1730,7 @@ static int lov_cancel(struct obd_export *exp, struct lov_stripe_md *lsm, if (!exp || !exp->exp_obd) RETURN(-ENODEV); - LASSERT(lsm->lsm_object_gr > 0); + LASSERT_MDS_GROUP(lsm->lsm_object_gr); LASSERT(lockh); lov = &exp->exp_obd->u.lov; rc = lov_prep_cancel_set(exp, &oinfo, lsm, mode, lockh, &set); @@ -1786,7 +1786,7 @@ static int lov_cancel_unused(struct obd_export *exp, ASSERT_LSM_MAGIC(lsm); - LASSERT(lsm->lsm_object_gr > 0); + LASSERT_MDS_GROUP(lsm->lsm_object_gr); for (i = 0; i < lsm->lsm_stripe_count; i++) { struct lov_stripe_md submd; struct lov_oinfo *loi = lsm->lsm_oinfo[i]; diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 176968f..2c8c0ad 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -1198,8 +1198,11 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo, memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, sizeof(*req->rq_oi.oi_oa)); req->rq_oi.oi_oa->o_id = loi->loi_id; - LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) - || req->rq_oi.oi_oa->o_gr>0); + LASSERTF(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) || + CHECK_MDS_GROUP(req->rq_oi.oi_oa->o_gr), + "req->rq_oi.oi_oa->o_valid="LPX64" " + "req->rq_oi.oi_oa->o_gr="LPU64"\n", + req->rq_oi.oi_oa->o_valid, req->rq_oi.oi_oa->o_gr); req->rq_oi.oi_oa->o_stripe_idx = i; req->rq_oi.oi_cb_up = cb_setattr_update; req->rq_oi.oi_capa = oinfo->oi_capa; diff --git a/lustre/lvfs/lvfs_linux.c b/lustre/lvfs/lvfs_linux.c index f855ca5..5d07875 100644 --- a/lustre/lvfs/lvfs_linux.c +++ b/lustre/lvfs/lvfs_linux.c @@ -230,8 +230,8 @@ out_up: EXPORT_SYMBOL(simple_mknod); /* utility to make a directory */ -struct dentry *simple_mkdir(struct dentry *dir, struct vfsmount *mnt, - char *name, int mode, int fix) +struct dentry *simple_mkdir(struct dentry *dir, struct vfsmount *mnt, + const char *name, int mode, int fix) { struct dentry *dchild; int err = 0; diff --git a/lustre/mdd/mdd_device.c b/lustre/mdd/mdd_device.c index 26a905f..3e7f17a 100644 --- a/lustre/mdd/mdd_device.c +++ b/lustre/mdd/mdd_device.c @@ -53,6 +53,8 @@ #include #include +#include +#include #include #include #include @@ -62,7 +64,8 @@ const struct md_device_operations mdd_ops; -static const char *mdd_root_dir_name = "root"; +static const char mdd_root_dir_name[] = "ROOT"; + static int mdd_device_init(const struct lu_env *env, struct lu_device *d, const char *name, struct lu_device *next) { @@ -99,25 +102,6 @@ static struct lu_device *mdd_device_fini(const struct lu_env *env, return next; } -static int mdd_mount(const struct lu_env *env, struct mdd_device *mdd) -{ - int rc; - struct dt_object *root; - ENTRY; - - dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb); - root = dt_store_open(env, mdd->mdd_child, mdd_root_dir_name, - &mdd->mdd_root_fid); - if (!IS_ERR(root)) { - LASSERT(root != NULL); - lu_object_put(env, &root->do_lu); - rc = orph_index_init(env, mdd); - } else - rc = PTR_ERR(root); - - RETURN(rc); -} - static void mdd_device_shutdown(const struct lu_env *env, struct mdd_device *m, struct lustre_cfg *cfg) { @@ -162,9 +146,6 @@ static int mdd_process_config(const struct lu_env *env, CERROR("lov init error %d \n", rc); GOTO(out, rc); } - rc = mdd_mount(env, m); - if (rc) - GOTO(out, rc); rc = mdd_txn_init_credits(env, m); break; case LCFG_CLEANUP: @@ -243,10 +224,39 @@ static int mdd_recovery_complete(const struct lu_env *env, RETURN(rc); } +static int mdd_prepare(const struct lu_env *env, + struct lu_device *pdev, + struct lu_device *cdev) +{ + struct mdd_device *mdd = lu2mdd_dev(cdev); + struct lu_device *next = &mdd->mdd_child->dd_lu_dev; + struct dt_object *root; + int rc; + + ENTRY; + rc = next->ld_ops->ldo_prepare(env, cdev, next); + if (rc) + GOTO(out, rc); + + dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb); + root = dt_store_open(env, mdd->mdd_child, "", mdd_root_dir_name, + &mdd->mdd_root_fid); + if (!IS_ERR(root)) { + LASSERT(root != NULL); + lu_object_put(env, &root->do_lu); + rc = orph_index_init(env, mdd); + } else + rc = PTR_ERR(root); + +out: + RETURN(rc); +} + const struct lu_device_operations mdd_lu_ops = { .ldo_object_alloc = mdd_object_alloc, .ldo_process_config = mdd_process_config, - .ldo_recovery_complete = mdd_recovery_complete + .ldo_recovery_complete = mdd_recovery_complete, + .ldo_prepare = mdd_prepare, }; /* @@ -465,10 +475,35 @@ static void mdd_key_fini(const struct lu_context *ctx, /* context key: mdd_thread_key */ LU_CONTEXT_KEY_DEFINE(mdd, LCT_MD_THREAD); +static struct lu_local_obj_desc llod_capa_key = { + .llod_name = CAPA_KEYS, + .llod_oid = MDD_CAPA_KEYS_OID, + .llod_is_index = 0, +}; + +static struct lu_local_obj_desc llod_mdd_orphan = { + .llod_name = orph_index_name, + .llod_oid = MDD_ORPHAN_OID, + .llod_is_index = 1, + .llod_feat = &dt_directory_features, +}; + +static struct lu_local_obj_desc llod_mdd_root = { + .llod_name = mdd_root_dir_name, + .llod_oid = MDD_ROOT_INDEX_OID, + .llod_is_index = 1, + .llod_feat = &dt_directory_features, +}; + static int __init mdd_mod_init(void) { struct lprocfs_static_vars lvars; lprocfs_mdd_init_vars(&lvars); + + llo_local_obj_register(&llod_capa_key); + llo_local_obj_register(&llod_mdd_orphan); + llo_local_obj_register(&llod_mdd_root); + return class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars, LUSTRE_MDD_NAME, &mdd_device_type); } diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 7450c1e..7fe28f3 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -75,7 +75,7 @@ static int __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct dynlock_handle *dlh; int rc; @@ -232,7 +232,7 @@ static int mdd_dir_is_empty(const struct lu_env *env, RETURN(-ENOTDIR); iops = &obj->do_index_ops->dio_it; - it = iops->init(env, obj, 0, BYPASS_CAPA); + it = iops->init(env, obj, BYPASS_CAPA); if (it != NULL) { result = iops->get(env, it, (const void *)""); if (result > 0) { @@ -458,15 +458,6 @@ int mdd_link_sanity_check(const struct lu_env *env, RETURN(rc); } -const struct dt_rec *__mdd_fid_rec(const struct lu_env *env, - const struct lu_fid *fid) -{ - struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack; - - fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2); - return (const struct dt_rec *)pack; -} - /** * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and * assign i_nlink to 1 which means the i_nlink for subdir count is incredible @@ -590,7 +581,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, struct md_object *src_obj, const struct lu_name *lname, struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj); struct mdd_object *mdd_sobj = md2mdd_obj(src_obj); @@ -682,23 +673,28 @@ int mdd_finish_unlink(const struct lu_env *env, struct thandle *th) { int rc; + int reset = 1; ENTRY; rc = mdd_iattr_get(env, obj, ma); if (rc == 0 && ma->ma_attr.la_nlink == 0) { /* add new orphan and the object - * will be deleted during the object_put() */ - if (__mdd_orphan_add(env, obj, th) == 0) - obj->mod_flags |= ORPHAN_OBJ; + * will be deleted during mdd_close() */ + if (obj->mod_count) { + rc = __mdd_orphan_add(env, obj, th); + if (rc == 0) + obj->mod_flags |= ORPHAN_OBJ; + } obj->mod_flags |= DEAD_OBJ; - if (obj->mod_count == 0) + if (!(obj->mod_flags & ORPHAN_OBJ)) { rc = mdd_object_kill(env, obj, ma); - else - /* clear MA_LOV | MA_COOKIE, if we do not - * unlink it in case we get it somewhere */ - ma->ma_valid &= ~(MA_LOV | MA_COOKIE); - } else + if (rc == 0) + reset = 0; + } + + } + if (reset) ma->ma_valid &= ~(MA_LOV | MA_COOKIE); RETURN(rc); @@ -723,7 +719,7 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj, struct md_object *cobj, const struct lu_name *lname, struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_pobj = md2mdd_obj(pobj); struct mdd_object *mdd_cobj = md2mdd_obj(cobj); @@ -850,7 +846,7 @@ static int mdd_name_insert(const struct lu_env *env, const struct lu_fid *fid, const struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct mdd_device *mdd = mdo2mdd(pobj); @@ -968,7 +964,7 @@ static int mdd_name_remove(const struct lu_env *env, const struct lu_name *lname, const struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct mdd_device *mdd = mdo2mdd(pobj); @@ -1074,7 +1070,7 @@ static int mdd_rename_tgt(const struct lu_env *env, const struct lu_fid *lf, const struct lu_name *lname, struct md_attr *ma) { - char *name = lname->ln_name; + const char *name = lname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_tpobj = md2mdd_obj(pobj); struct mdd_object *mdd_tobj = md2mdd_obj(tobj); @@ -1280,7 +1276,7 @@ static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj, const struct lu_name *lname, struct lu_fid* fid, int mask) { - char *name = lname->ln_name; + const char *name = lname->ln_name; const struct dt_key *key = (const struct dt_key *)name; struct mdd_object *mdd_obj = md2mdd_obj(pobj); struct mdd_device *m = mdo2mdd(pobj); @@ -1315,8 +1311,10 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, rc = dir->do_index_ops->dio_lookup(env, dir, (struct dt_rec *)pack, key, mdd_object_capa(env, mdd_obj)); - if (rc == 0) + if (rc > 0) rc = fid_unpack(pack, fid); + else if (rc == 0) + rc = -ENOENT; } else rc = -ENOTDIR; @@ -1325,7 +1323,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, struct mdd_object *child, struct md_attr *ma, - struct thandle *handle) + struct thandle *handle, const struct md_op_spec *spec) { int rc; ENTRY; @@ -1469,7 +1467,7 @@ static int mdd_create(const struct lu_env *env, struct lov_mds_md *lmm = NULL; struct thandle *handle; struct dynlock_handle *dlh; - char *name = lname->ln_name; + const char *name = lname->ln_name; int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0; int got_def_acl = 0; #ifdef HAVE_QUOTA_SUPPORT @@ -1604,7 +1602,7 @@ static int mdd_create(const struct lu_env *env, GOTO(out_trans, rc = -ENOMEM); mdd_write_lock(env, son, MOR_TGT_CHILD); - rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle); + rc = mdd_object_create_internal(env, mdd_pobj, son, ma, handle, spec); if (rc) { mdd_write_unlock(env, son); GOTO(cleanup, rc); @@ -1629,7 +1627,7 @@ static int mdd_create(const struct lu_env *env, #endif rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), - son, ma, handle); + son, ma, handle, spec); mdd_write_unlock(env, son); if (rc) /* @@ -1836,8 +1834,8 @@ static int mdd_rename(const struct lu_env *env, struct md_object *tobj, const struct lu_name *ltname, struct md_attr *ma) { - char *sname = lsname->ln_name; - char *tname = ltname->ln_name; + const char *sname = lsname->ln_name; + const char *tname = ltname->ln_name; struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj); @@ -1846,6 +1844,10 @@ static int mdd_rename(const struct lu_env *env, struct mdd_object *mdd_tobj = NULL; struct dynlock_handle *sdlh, *tdlh; struct thandle *handle; + const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj); + int is_dir; + int rc; + #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdd->mdd_obd_dev; struct mds_obd *mds = &obd->u.mds; @@ -1854,7 +1856,6 @@ static int mdd_rename(const struct lu_env *env, unsigned int qtpids[MAXQUOTAS] = { 0, 0 }; int quota_opc = 0, rec_pending = 0; #endif - int rc, is_dir; ENTRY; LASSERT(ma->ma_attr.la_mode & S_IFMT); @@ -1928,6 +1929,20 @@ static int mdd_rename(const struct lu_env *env, if (rc) GOTO(cleanup, rc); + /* "mv dir1 dir2" needs "dir1/.." link update */ + if (is_dir) { + rc = __mdd_index_delete(env, mdd_sobj, dotdot, is_dir, handle, + mdd_object_capa(env, mdd_spobj)); + if (rc) + GOTO(cleanup, rc); + + rc = __mdd_index_insert(env, mdd_sobj, tpobj_fid, dotdot, + is_dir, handle, + mdd_object_capa(env, mdd_tpobj)); + if (rc) + GOTO(cleanup, rc); + } + /* * Here tobj can be remote one, so we do index_delete unconditionally * and -ENOENT is allowed. diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index 16de10a..fce9bda 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -135,13 +135,6 @@ struct mdd_object { #endif }; -struct orph_key { - /* fid of the object*/ - struct lu_fid ok_fid; - /* type of operation: unlink, truncate */ - __u32 ok_op; -} __attribute__((packed)); - struct mdd_thread_info { struct txn_param mti_param; struct lu_fid mti_fid; @@ -149,7 +142,7 @@ struct mdd_thread_info { struct md_attr mti_ma; struct lu_attr mti_la_for_fix; struct obd_info mti_oi; - struct orph_key mti_orph_key; + char mti_orph_key[NAME_MAX + 1]; struct obd_trans_info mti_oti; struct lu_buf mti_buf; struct obdo mti_oa; @@ -161,9 +154,14 @@ struct mdd_thread_info { int mti_max_lmm_size; struct llog_cookie *mti_max_cookie; int mti_max_cookie_size; + struct dt_object_format mti_dof; struct obd_quotactl mti_oqctl; }; +extern const char orph_index_name[]; + +extern const struct dt_index_features orph_index_features; + struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env, struct mdd_device *mdd); @@ -214,7 +212,8 @@ int mdd_attr_get_internal_locked(const struct lu_env *env, struct md_attr *ma); int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p, struct mdd_object *c, struct md_attr *ma, - struct thandle *handle); + struct thandle *handle, + const struct md_op_spec *spec); int mdd_attr_check_set_internal_locked(const struct lu_env *env, struct mdd_object *obj, struct lu_attr *attr, @@ -262,7 +261,7 @@ int mdd_finish_unlink(const struct lu_env *env, struct mdd_object *obj, struct md_attr *ma, struct thandle *th); int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid, struct mdd_object *child, struct md_attr *ma, - struct thandle *handle); + struct thandle *handle, const struct md_op_spec *spec); int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj, const struct lu_name *lname, struct mdd_object *src_obj); /* mdd_lov.c */ @@ -348,6 +347,9 @@ int mdd_log_txn_param_build(const struct lu_env *env, struct md_object *obj, int mdd_setattr_txn_param_build(const struct lu_env *env, struct md_object *obj, struct md_attr *ma, enum mdd_txn_op); +int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd, + struct mdd_object *obj, struct lu_attr *la); + static inline void mdd_object_put(const struct lu_env *env, struct mdd_object *o) { @@ -475,6 +477,15 @@ static inline const struct lu_fid *mdo2fid(const struct mdd_object *obj) return lu_object_fid(&obj->mod_obj.mo_lu); } +static inline const struct dt_rec *__mdd_fid_rec(const struct lu_env *env, + const struct lu_fid *fid) +{ + struct lu_fid_pack *pack = &mdd_env_info(env)->mti_pack; + + fid_pack(pack, fid, &mdd_env_info(env)->mti_fid2); + return (const struct dt_rec *)pack; +} + static inline umode_t mdd_object_type(const struct mdd_object *obj) { return lu_object_attr(&obj->mod_obj.mo_lu); @@ -658,10 +669,11 @@ static inline int mdo_create_obj(const struct lu_env *env, struct mdd_object *o, struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *handle) { struct dt_object *next = mdd_object_child(o); - return next->do_ops->do_create(env, next, attr, hint, handle); + return next->do_ops->do_create(env, next, attr, hint, dof, handle); } static inline struct obd_capa *mdo_capa_get(const struct lu_env *env, diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index 5e5bd18..8b6ce98 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -409,6 +409,7 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, { struct obd_device *obd = mdd2obd_dev(mdd); struct obd_export *lov_exp = obd->u.mds.mds_osc_exp; + struct lu_site *site = mdd2lu_dev(mdd)->ld_site; struct obdo *oa; struct lov_stripe_md *lsm = NULL; const void *eadata = spec->u.sp_ea.eadata; @@ -437,8 +438,7 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, oa->o_uid = 0; /* must have 0 uid / gid on OST */ oa->o_gid = 0; - oa->o_gr = FILTER_GROUP_MDS0 + - lu_site2md(mdd2lu_dev(mdd)->ld_site)->ms_node_id; + oa->o_gr = mdt_to_obd_objgrp(lu_site2md(site)->ms_node_id); oa->o_mode = S_IFREG | 0600; oa->o_id = mdd_lov_create_id(mdd_object_fid(child)); oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS | @@ -484,7 +484,7 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, } GOTO(out_oti, rc); } - LASSERT(lsm->lsm_object_gr >= FILTER_GROUP_MDS0); + LASSERT_MDS_GROUP(lsm->lsm_object_gr); } else { LASSERT(eadata != NULL); rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm, @@ -558,6 +558,111 @@ out_ids: return rc; } +/* + * used when destroying orphans and from mds_reint_unlink() when MDS wants to + * destroy objects on OSS. + */ +static +int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd, + struct mdd_object *obj, struct lu_attr *la, + struct lov_mds_md *lmm, int lmm_size, + struct llog_cookie *logcookies, + int log_unlink) +{ + struct obd_device *obd = mdd2obd_dev(mdd); + struct obd_export *lov_exp = obd->u.mds.mds_osc_exp; + struct lov_stripe_md *lsm = NULL; + struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti; + struct obdo *oa = &mdd_env_info(env)->mti_oa; + struct lu_site *site = mdd2lu_dev(mdd)->ld_site; + int rc; + ENTRY; + + if (lmm_size == 0) + RETURN(0); + + rc = obd_unpackmd(lov_exp, &lsm, lmm, lmm_size); + if (rc < 0) { + CERROR("Error unpack md %p\n", lmm); + RETURN(rc); + } else { + LASSERT(rc >= sizeof(*lsm)); + rc = 0; + } + + oa->o_id = lsm->lsm_object_id; + oa->o_gr = mdt_to_obd_objgrp(lu_site2md(site)->ms_node_id); + oa->o_mode = la->la_mode & S_IFMT; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP; + + oti_init(oti, NULL); + if (log_unlink && logcookies) { + oa->o_valid |= OBD_MD_FLCOOKIE; + oti->oti_logcookies = logcookies; + } + + CDEBUG(D_INFO, "destroying OSS object %d/%d\n", + (int)oa->o_id, (int)oa->o_gr); + + rc = obd_destroy(lov_exp, oa, lsm, oti, NULL, BYPASS_CAPA); + + obd_free_memmd(lov_exp, &lsm); + RETURN(rc); +} + +/* + * called with obj not locked. + */ + +int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd, + struct mdd_object *obj, struct lu_attr *la) +{ + struct md_attr *ma = &mdd_env_info(env)->mti_ma; + int rc; + ENTRY; + + if (unlikely(la->la_nlink != 0)) { + CWARN("Attempt to destroy OSS object when nlink == %d\n", + la->la_nlink); + RETURN(0); + } + + ma->ma_lmm_size = mdd_lov_mdsize(env, mdd); + ma->ma_lmm = mdd_max_lmm_get(env, mdd); + ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd); + ma->ma_cookie = mdd_max_cookie_get(env, mdd); + if (ma->ma_lmm == NULL || ma->ma_cookie == NULL) + RETURN(rc = -ENOMEM); + + /* get lov ea */ + + rc = mdd_get_md_locked(env, obj, ma->ma_lmm, &ma->ma_lmm_size, + MDS_LOV_MD_NAME); + + if (rc <= 0) { + CWARN("Get lov ea failed for "DFID" rc = %d\n", + PFID(mdo2fid(obj)), rc); + if (rc == 0) + rc = -ENOENT; + RETURN(rc); + } + + ma->ma_valid = MA_LOV; + + rc = mdd_unlink_log(env, mdd, obj, ma); + if (rc) { + CWARN("mds unlink log for "DFID" failed: %d\n", + PFID(mdo2fid(obj)), rc); + RETURN(rc); + } + + if (ma->ma_valid & MA_COOKIE) + rc = mdd_lovobj_unlink(env, mdd, obj, la, + ma->ma_lmm, ma->ma_lmm_size, + ma->ma_cookie, 1); + RETURN(rc); +} + int mdd_log_op_unlink(struct obd_device *obd, struct lov_mds_md *lmm, int lmm_size, struct llog_cookie *logcookies, int cookies_size) diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index c13cdd6..d7a9969 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -211,38 +211,17 @@ static void mdd_object_free(const struct lu_env *env, struct lu_object *o) OBD_FREE_PTR(mdd); } -/* orphan handling is here */ -static void mdd_object_delete(const struct lu_env *env, struct lu_object *o) +static int mdd_object_print(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct lu_object *o) { - struct mdd_object *mdd_obj = lu2mdd_obj(o); - struct thandle *handle = NULL; - ENTRY; - - if (lu2mdd_dev(o->lo_dev)->mdd_orphans == NULL) - return; - - if (mdd_obj->mod_flags & ORPHAN_OBJ) { - mdd_txn_param_build(env, lu2mdd_dev(o->lo_dev), - MDD_TXN_INDEX_DELETE_OP); - handle = mdd_trans_start(env, lu2mdd_dev(o->lo_dev)); - if (IS_ERR(handle)) - CERROR("Cannot get thandle\n"); - else { - mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); - /* let's remove obj from the orphan list */ - __mdd_orphan_del(env, mdd_obj, handle); - mdd_write_unlock(env, mdd_obj); - mdd_trans_stop(env, lu2mdd_dev(o->lo_dev), - 0, handle); - } - } + return (*p)(env, cookie, LUSTRE_MDD_NAME"-object@%p", o); } static const struct lu_object_operations mdd_lu_obj_ops = { - .loo_object_init = mdd_object_init, - .loo_object_start = mdd_object_start, - .loo_object_free = mdd_object_free, - .loo_object_delete = mdd_object_delete + .loo_object_init = mdd_object_init, + .loo_object_start = mdd_object_start, + .loo_object_free = mdd_object_free, + .loo_object_print = mdd_object_print, }; struct mdd_object *mdd_object_find(const struct lu_env *env, @@ -486,10 +465,13 @@ static int mdd_xattr_list(const struct lu_env *env, struct md_object *obj, int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p, struct mdd_object *c, struct md_attr *ma, - struct thandle *handle) + struct thandle *handle, + const struct md_op_spec *spec) { struct lu_attr *attr = &ma->ma_attr; struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint; + struct dt_object_format *dof = &mdd_env_info(env)->mti_dof; + const struct dt_index_features *feat = spec->sp_feat; int rc; ENTRY; @@ -497,11 +479,19 @@ int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p, struct dt_object *next = mdd_object_child(c); LASSERT(next); + if (feat != &dt_directory_features && feat != NULL) + dof->dof_type = DFT_INDEX; + else + dof->dof_type = dt_mode_to_dft(attr->la_mode); + + dof->u.dof_idx.di_feat = feat; + /* @hint will be initialized by underlying device. */ next->do_ops->do_ah_init(env, hint, p ? mdd_object_child(p) : NULL, attr->la_mode & S_IFMT); - rc = mdo_create_obj(env, c, attr, hint, handle); + + rc = mdo_create_obj(env, c, attr, hint, dof, handle); LASSERT(ergo(rc == 0, mdd_object_exists(c))); } else rc = -EEXIST; @@ -1222,7 +1212,7 @@ static int mdd_object_create(const struct lu_env *env, if (rc) GOTO(unlock, rc); - rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle); + rc = mdd_object_create_internal(env, NULL, mdd_obj, ma, handle, spec); if (rc) GOTO(unlock, rc); @@ -1262,7 +1252,7 @@ static int mdd_object_create(const struct lu_env *env, pfid = spec->u.sp_ea.fid; } #endif - rc = mdd_object_initialize(env, pfid, mdd_obj, ma, handle); + rc = mdd_object_initialize(env, pfid, mdd_obj, ma, handle, spec); } EXIT; unlock: @@ -1440,6 +1430,7 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj, if (S_ISREG(mdd_object_type(obj))) { /* Return LOV & COOKIES unconditionally here. We clean evth up. * Caller must be ready for that. */ + rc = __mdd_lmm_get(env, obj, ma); if ((ma->ma_valid & MA_LOV)) rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj), @@ -1454,9 +1445,11 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj, static int mdd_close(const struct lu_env *env, struct md_object *obj, struct md_attr *ma) { - int rc; struct mdd_object *mdd_obj = md2mdd_obj(obj); struct thandle *handle; + int rc; + int reset = 1; + #ifdef HAVE_QUOTA_SUPPORT struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev; struct mds_obd *mds = &obd->u.mds; @@ -1476,19 +1469,30 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, /* release open count */ mdd_obj->mod_count --; + if (mdd_obj->mod_count == 0) { + /* remove link to object from orphan index */ + if (mdd_obj->mod_flags & ORPHAN_OBJ) + __mdd_orphan_del(env, mdd_obj, handle); + } + rc = mdd_iattr_get(env, mdd_obj, ma); - if (rc == 0 && mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) { - rc = mdd_object_kill(env, mdd_obj, ma); + if (rc == 0) { + if (mdd_obj->mod_count == 0 && ma->ma_attr.la_nlink == 0) { + rc = mdd_object_kill(env, mdd_obj, ma); #ifdef HAVE_QUOTA_SUPPORT - if (mds->mds_quota) { - quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; - mdd_quota_wrapper(&ma->ma_attr, qids); - } + if (mds->mds_quota) { + quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD; + mdd_quota_wrapper(&ma->ma_attr, qids); + } #endif - } else { - ma->ma_valid &= ~(MA_LOV | MA_COOKIE); + if (rc == 0) + reset = 0; + } } + if (reset) + ma->ma_valid &= ~(MA_LOV | MA_COOKIE); + mdd_write_unlock(env, mdd_obj); mdd_trans_stop(env, mdo2mdd(obj), rc, handle); #ifdef HAVE_QUOTA_SUPPORT @@ -1614,7 +1618,7 @@ static int __mdd_readpage(const struct lu_env *env, struct mdd_object *obj, * iterate through directory and fill pages from @rdpg */ iops = &next->do_index_ops->dio_it; - it = iops->init(env, next, 0, mdd_object_capa(env, obj)); + it = iops->init(env, next, mdd_object_capa(env, obj)); if (IS_ERR(it)) return PTR_ERR(it); diff --git a/lustre/mdd/mdd_orphans.c b/lustre/mdd/mdd_orphans.c index 940a4df..e587094 100644 --- a/lustre/mdd/mdd_orphans.c +++ b/lustre/mdd/mdd_orphans.c @@ -38,6 +38,7 @@ * Orphan handling code * * Author: Mike Pershin + * Pravin B Shelar */ #ifndef EXPORT_SYMTAB @@ -52,126 +53,368 @@ #include #include "mdd_internal.h" -const char orph_index_name[] = "orphans"; - -static const struct dt_index_features orph_index_features = { - .dif_flags = DT_IND_UPDATE, - .dif_keysize_min = sizeof(struct orph_key), - .dif_keysize_max = sizeof(struct orph_key), - .dif_recsize_min = sizeof(loff_t), - .dif_recsize_max = sizeof(loff_t) -}; +const char orph_index_name[] = "PENDING"; enum { ORPH_OP_UNLINK, ORPH_OP_TRUNCATE }; -static struct orph_key *orph_key_fill(const struct lu_env *env, - const struct lu_fid *lf, __u32 op) +#define ORPHAN_FILE_NAME_FORMAT "%016llx:%08x:%08x:%2x" +#define ORPHAN_FILE_NAME_FORMAT_18 "%llx:%08x" + +static struct dt_key* orph_key_fill(const struct lu_env *env, + const struct lu_fid *lf, __u32 op) { - struct orph_key *key = &mdd_env_info(env)->mti_orph_key; + char *key = mdd_env_info(env)->mti_orph_key; + int rc; + LASSERT(key); - fid_cpu_to_be(&key->ok_fid, lf); - key->ok_op = cpu_to_be32(op); - return key; + rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT, fid_seq(lf), + fid_oid(lf), fid_ver(lf), op); + if (rc > 0) + return (struct dt_key*) key; + else + return ERR_PTR(rc); +} + +static struct dt_key* orph_key_fill_18(const struct lu_env *env, + const struct lu_fid *lf) +{ + char *key = mdd_env_info(env)->mti_orph_key; + int rc; + + LASSERT(key); + rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT_18, fid_seq(lf), + fid_oid(lf)); + if (rc > 0) + return (struct dt_key*) key; + else + return ERR_PTR(rc); +} + +static int orphan_key_to_fid(char *key, struct lu_fid *lf) +{ + int rc = 0; + unsigned int op; + + rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT, &lf->f_seq, &lf->f_oid, + &lf->f_ver, &op); + if (rc == 4) + return 0; + + /* build igif */ + rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT_18, + &lf->f_seq, &lf->f_oid); + if (rc == 2) { + lf->f_ver = 0; + return 0; + } + + CERROR("can not parse orphan file name %s\n",key); + return -EINVAL; +} + +static inline void mdd_orphan_write_lock(const struct lu_env *env, + struct mdd_device *mdd) +{ + + struct dt_object *dor = mdd->mdd_orphans; + dor->do_ops->do_write_lock(env, dor, MOR_TGT_CHILD); +} + +static inline void mdd_orphan_write_unlock(const struct lu_env *env, + struct mdd_device *mdd) +{ + + struct dt_object *dor = mdd->mdd_orphans; + dor->do_ops->do_write_unlock(env, dor); +} + +static inline int mdd_orphan_insert_obj(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *obj, + __u32 op, + struct thandle *th) +{ + struct dt_object *dor = mdd->mdd_orphans; + const struct lu_fid *lf = mdo2fid(obj); + struct dt_key *key = orph_key_fill(env, lf, op); + ENTRY; + + return dor->do_index_ops->dio_insert(env, dor, + __mdd_fid_rec(env, lf), + key, th, + BYPASS_CAPA, 1); +} + +static inline int mdd_orphan_delete_obj(const struct lu_env *env, + struct mdd_device *mdd , + struct dt_key *key, + struct thandle *th) +{ + struct dt_object *dor = mdd->mdd_orphans; + + return dor->do_index_ops->dio_delete(env, dor, + key, th, + BYPASS_CAPA); } +static inline void mdd_orphan_ref_add(const struct lu_env *env, + struct mdd_device *mdd, + struct thandle *th) +{ + struct dt_object *dor = mdd->mdd_orphans; + dor->do_ops->do_ref_add(env, dor, th); +} + +static inline void mdd_orphan_ref_del(const struct lu_env *env, + struct mdd_device *mdd, + struct thandle *th) +{ + struct dt_object *dor = mdd->mdd_orphans; + dor->do_ops->do_ref_del(env, dor, th); +} + + static int orph_index_insert(const struct lu_env *env, - struct mdd_object *obj, __u32 op, - loff_t *offset, struct thandle *th) + struct mdd_object *obj, + __u32 op, + struct thandle *th) { - struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); - struct dt_object *dor = mdd->mdd_orphans; - struct orph_key *key = orph_key_fill(env, mdo2fid(obj), op); + struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); + struct dt_object *dor = mdd->mdd_orphans; + const struct lu_fid *lf_dor = lu_object_fid(&dor->do_lu); + struct dt_object *next = mdd_object_child(obj); + const struct dt_key *dotdot = (const struct dt_key *) ".."; int rc; ENTRY; - rc = dor->do_index_ops->dio_insert(env, dor, (struct dt_rec *)offset, - (struct dt_key *)key, th, - BYPASS_CAPA, 1); + mdd_orphan_write_lock(env, mdd); + + rc = mdd_orphan_insert_obj(env, mdd, obj, op, th); + if (rc) + GOTO(out, rc); + + mdo_ref_add(env, obj, th); + if (!S_ISDIR(mdd_object_type(obj))) + goto out; + + mdo_ref_add(env, obj, th); + mdd_orphan_ref_add(env, mdd, th); + + /* try best to fixup directory, dont return errors + * from here */ + if (!dt_try_as_dir(env, next)) + goto out; + next->do_index_ops->dio_delete(env, next, + dotdot, th, BYPASS_CAPA); + + next->do_index_ops->dio_insert(env, next, + __mdd_fid_rec(env, lf_dor), + dotdot, th, BYPASS_CAPA, 1); + +out: + mdd_orphan_write_unlock(env, mdd); + RETURN(rc); } +/** + * destroy osd object on mdd and associated ost objects. + * + * \param obj orphan object + * \param mdd used for sending llog msg to osts + * + * \retval 0 success + * \retval -ve error + */ +static int orphan_object_kill(const struct lu_env *env, + struct mdd_object *obj, + struct mdd_device *mdd, + struct thandle *th) +{ + struct lu_attr *la = &mdd_env_info(env)->mti_la; + int rc; + + /* No need to lock this object as its recovery phase, and + * no other thread can access it. But we need to lock it + * as its precondition for osd api we using. */ + + mdd_write_lock(env, obj, MOR_TGT_CHILD); + mdo_ref_del(env, obj, th); + if (S_ISDIR(mdd_object_type(obj))) { + mdo_ref_del(env, obj, th); + mdd_orphan_ref_del(env, mdd, th); + mdd_write_unlock(env, obj); + } else { + /* regular file , cleanup linked ost objects */ + rc = mdd_la_get(env, obj, la, BYPASS_CAPA); + mdd_write_unlock(env, obj); + if (rc) + RETURN(rc); + + mdd_lov_destroy(env, mdd, obj, la); + } + return 0; +} + static int orph_index_delete(const struct lu_env *env, - struct mdd_object *obj, __u32 op, + struct mdd_object *obj, + __u32 op, struct thandle *th) { struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); struct dt_object *dor = mdd->mdd_orphans; - struct orph_key *key = orph_key_fill(env, mdo2fid(obj), op); + struct dt_key *key; int rc; + ENTRY; + LASSERT(dor); - rc = dor->do_index_ops->dio_delete(env, dor, - (struct dt_key *)key, th, - BYPASS_CAPA); - RETURN(rc); + key = orph_key_fill(env, mdo2fid(obj), op); + mdd_orphan_write_lock(env, mdd); + + rc = mdd_orphan_delete_obj(env, mdd, key, th); + + if (rc == -ENOENT) { + key = orph_key_fill_18(env, mdo2fid(obj)); + rc = mdd_orphan_delete_obj(env, mdd, key, th); + } + + if (!rc) { + /* lov objects will be destroyed by caller */ + mdo_ref_del(env, obj, th); + if (S_ISDIR(mdd_object_type(obj))) { + mdo_ref_del(env, obj, th); + mdd_orphan_ref_del(env, mdd, th); + } + } else + CERROR("could not delete object: rc = %d\n",rc); + + obj->mod_flags &= ~ORPHAN_OBJ; + mdd_orphan_write_unlock(env, mdd); + RETURN(rc); } -static inline struct orph_key *orph_key_empty(const struct lu_env *env, - __u32 op) + +static int orphan_object_destroy(const struct lu_env *env, + struct mdd_object *obj, + struct dt_key *key) { - struct orph_key *key = &mdd_env_info(env)->mti_orph_key; - LASSERT(key); - fid_zero(&key->ok_fid); - key->ok_op = cpu_to_be32(op); - return key; + struct thandle *th = NULL; + struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); + int rc; + ENTRY; + + mdd_txn_param_build(env, mdd, MDD_TXN_UNLINK_OP); + th = mdd_trans_start(env, mdd); + if (IS_ERR(th)) { + CERROR("Cannot get thandle\n"); + RETURN(-ENOMEM); + } + + mdd_orphan_write_lock(env, mdd); + rc = mdd_orphan_delete_obj(env, mdd, key, th); + if (!rc) + orphan_object_kill(env, obj, mdd, th); + else + CERROR("could not delete object: rc = %d\n",rc); + + mdd_orphan_write_unlock(env, mdd); + mdd_trans_stop(env, mdd, 0, th); + + RETURN(rc); } -static void orph_key_test_and_del(const struct lu_env *env, - struct mdd_device *mdd, - const struct orph_key *key) +static int orph_key_test_and_del(const struct lu_env *env, + struct mdd_device *mdd, + struct lu_fid *lf, + struct dt_key *key) { struct mdd_object *mdo; + int rc; + + mdo = mdd_object_find(env, mdd, lf); - mdo = mdd_object_find(env, mdd, &key->ok_fid); if (IS_ERR(mdo)) - CERROR("Invalid orphan!\n"); - else { - mdd_write_lock(env, mdo, MOR_TGT_CHILD); - if (mdo->mod_count == 0) { - /* non-opened orphan, let's delete it */ - struct md_attr *ma = &mdd_env_info(env)->mti_ma; - CWARN("Found orphan!\n"); - mdd_object_kill(env, mdo, ma); - /* TODO: now handle OST objects */ - //mdd_ost_objects_destroy(env, ma); - /* TODO: destroy index entry */ - } - mdd_write_unlock(env, mdo); - mdd_object_put(env, mdo); + return PTR_ERR(mdo); + + rc = -EBUSY; + if (mdo->mod_count == 0) { + CWARN("Found orphan!\n"); + rc = orphan_object_destroy(env, mdo, key); + } else { + mdo->mod_flags |= ORPHAN_OBJ; } + + mdd_object_put(env, mdo); + return rc; } static int orph_index_iterate(const struct lu_env *env, struct mdd_device *mdd) { - struct dt_object *dt_obj = mdd->mdd_orphans; - struct dt_it *it; + struct dt_object *dor = mdd->mdd_orphans; + char *mti_key = mdd_env_info(env)->mti_orph_key; const struct dt_it_ops *iops; - struct orph_key *key = orph_key_empty(env, 0); - int result; + struct dt_it *it; + char *key; + struct lu_fid fid; + int result = 0; + int key_sz = 0; + int rc; + __u64 cookie; ENTRY; - iops = &dt_obj->do_index_ops->dio_it; - it = iops->init(env, dt_obj, 1, BYPASS_CAPA); + /* In recovery phase, do not need for any lock here */ + + iops = &dor->do_index_ops->dio_it; + it = iops->init(env, dor, BYPASS_CAPA); if (it != NULL) { - result = iops->get(env, it, (const void *)key); + result = iops->get(env, it, (const void *)""); if (result > 0) { - int i; /* main cycle */ - for (result = 0, i = 0; result == +1; ++i) { + do { + key = (void *)iops->key(env, it); - fid_be_to_cpu(&key->ok_fid, &key->ok_fid); - orph_key_test_and_del(env, mdd, key); + if (IS_ERR(key)) + goto next; + key_sz = iops->key_size(env, it); + + /* filter out "." and ".." entries from + * PENDING dir. */ + if (key_sz < 8) + goto next; + + memcpy(mti_key, key, key_sz); + mti_key[key_sz] = 0; + + if (orphan_key_to_fid(mti_key, &fid)) + goto next; + if (!fid_is_sane(&fid)) + goto next; + + /* kill orphan object */ + cookie = iops->store(env, it); + iops->put(env, it); + rc = orph_key_test_and_del(env, mdd, &fid, + (struct dt_key *)mti_key); + + /* after index delete reset iterator */ + if (!rc) + result = iops->get(env, it, + (const void *)""); + else + result = iops->load(env, it, cookie); +next: result = iops->next(env, it); - } + } while (result == 0); + result = 0; } else if (result == 0) /* Index contains no zero key? */ result = -EIO; - iops->put(env, it); iops->fini(env, it); } else @@ -184,17 +427,17 @@ int orph_index_init(const struct lu_env *env, struct mdd_device *mdd) { struct lu_fid fid; struct dt_object *d; - int rc; + int rc = 0; ENTRY; - d = dt_store_open(env, mdd->mdd_child, orph_index_name, &fid); + d = dt_store_open(env, mdd->mdd_child, "", orph_index_name, &fid); if (!IS_ERR(d)) { mdd->mdd_orphans = d; - rc = d->do_ops->do_index_try(env, d, &orph_index_features); - if (rc == 0) - LASSERT(d->do_index_ops != NULL); - else - CERROR("\"%s\" is not an index!\n", orph_index_name); + if (!dt_try_as_dir(env, d)) { + rc = -ENOTDIR; + CERROR("\"%s\" is not an index! : rc = %d\n", + orph_index_name, rc); + } } else { CERROR("cannot find \"%s\" obj %d\n", orph_index_name, (int)PTR_ERR(d)); @@ -214,18 +457,45 @@ void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd) EXIT; } +/** + * Iterate orphan index to cleanup orphan objects in case of recovery. + * \param d mdd device in recovery. + * + */ + int __mdd_orphan_cleanup(const struct lu_env *env, struct mdd_device *d) { return orph_index_iterate(env, d); } +/** + * delete an orphan \a obj from orphan index. + * \param obj file or directory. + * \param th transaction for index insert. + * + * \pre obj nlink == 0 && obj->mod_count != 0 + * + * \retval 0 success + * \retva -ve index operation error. + */ + int __mdd_orphan_add(const struct lu_env *env, struct mdd_object *obj, struct thandle *th) { - loff_t offset = 0; - return orph_index_insert(env, obj, ORPH_OP_UNLINK, &offset, th); + return orph_index_insert(env, obj, ORPH_OP_UNLINK, th); } +/** + * delete an orphan \a obj from orphan index. + * \param obj file or directory. + * \param th transaction for index deletion and object destruction. + * + * \pre obj->mod_count == 0 && ORPHAN_OBJ is set for obj. + * + * \retval 0 success + * \retva -ve index operation error. + */ + int __mdd_orphan_del(const struct lu_env *env, struct mdd_object *obj, struct thandle *th) { diff --git a/lustre/mdd/mdd_trans.c b/lustre/mdd/mdd_trans.c index 2c0a827..947ef75 100644 --- a/lustre/mdd/mdd_trans.c +++ b/lustre/mdd/mdd_trans.c @@ -201,18 +201,28 @@ int mdd_txn_init_credits(const struct lu_env *env, struct mdd_device *mdd) *c = dt[DTO_INDEX_INSERT]; break; case MDD_TXN_UNLINK_OP: - /* delete index + Unlink log */ - *c = dt[DTO_INDEX_DELETE]; + /* delete index + Unlink log + + * mdd orphan handling */ + *c = dt[DTO_INDEX_DELETE] + + dt[DTO_INDEX_DELETE] + + dt[DTO_INDEX_INSERT] * 2 + + dt[DTO_XATTR_SET] * 3; break; case MDD_TXN_RENAME_OP: /* 2 delete index + 1 insert + Unlink log */ *c = 2 * dt[DTO_INDEX_DELETE] + - dt[DTO_INDEX_INSERT]; + dt[DTO_INDEX_INSERT] + + dt[DTO_INDEX_DELETE] + + dt[DTO_INDEX_INSERT] * 2 + + dt[DTO_XATTR_SET] * 3; break; case MDD_TXN_RENAME_TGT_OP: /* index insert + index delete */ *c = dt[DTO_INDEX_DELETE] + - dt[DTO_INDEX_INSERT]; + dt[DTO_INDEX_INSERT] + + dt[DTO_INDEX_DELETE] + + dt[DTO_INDEX_INSERT] * 2 + + dt[DTO_XATTR_SET] * 3; break; case MDD_TXN_CREATE_DATA_OP: /* same as set xattr(lsm) */ diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index c888039..a3e34df 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -432,6 +432,9 @@ static int mds_cmd_cleanup(struct obd_device *obd) LCONSOLE_WARN("%s: shutting down for failover; client state " "will be preserved.\n", obd->obd_name); + if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME))) + RETURN(0); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); mds_lov_destroy_objids(obd); diff --git a/lustre/mds/mds_fs.c b/lustre/mds/mds_fs.c index a39e495..ce287c6 100644 --- a/lustre/mds/mds_fs.c +++ b/lustre/mds/mds_fs.c @@ -135,7 +135,7 @@ int mds_obd_create(struct obd_export *exp, struct obdo *oa, err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode, handle, 0); if (!err) { - oa->o_gr = FILTER_GROUP_MDS0 + mds->mds_id; + oa->o_gr = mdt_to_obd_objgrp(mds->mds_id); oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FLGROUP; } else if (!rc) rc = err; diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index 1968b9c..b623979 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -307,7 +307,7 @@ int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid) * objects above this ID, they will be removed. */ memset(&oa, 0, sizeof(oa)); oa.o_flags = OBD_FL_DELORPHAN; - oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id; + oa.o_gr = mdt_to_obd_objgrp(mds->mds_id); oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP; if (ost_uuid != NULL) oti.oti_ost_uuid = ost_uuid; @@ -483,7 +483,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE; #endif data->ocd_version = LUSTRE_VERSION_CODE; - data->ocd_group = mds->mds_id + FILTER_GROUP_MDS0; + data->ocd_group = mdt_to_obd_objgrp(mds->mds_id); /* NB: lov_connect() needs to fill in .ocd_index for each OST */ rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL); OBD_FREE(data, sizeof(*data)); @@ -633,7 +633,7 @@ static int __mds_lov_synchronize(void *data) CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc); GOTO(out, rc); } - mgi.group = FILTER_GROUP_MDS0 + mds->mds_id; + mgi.group = mdt_to_obd_objgrp(mds->mds_id); mgi.uuid = uuid; rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_MDS_CONN), diff --git a/lustre/mdt/mdt_capa.c b/lustre/mdt/mdt_capa.c index 3f911de..1f03d81 100644 --- a/lustre/mdt/mdt_capa.c +++ b/lustre/mdt/mdt_capa.c @@ -60,10 +60,6 @@ static void make_capa_key(struct lustre_capa_key *key, ll_get_random_bytes(key->lk_key, sizeof(key->lk_key)); } -enum { - MDT_TXN_CAPA_KEYS_WRITE_CREDITS = 1 -}; - static inline void lck_cpu_to_le(struct lustre_capa_key *tgt, struct lustre_capa_key *src) { @@ -93,8 +89,8 @@ static int write_capa_keys(const struct lu_env *env, int i, rc; mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); - - th = mdt_trans_start(env, mdt, MDT_TXN_CAPA_KEYS_WRITE_CREDITS); + mdt_trans_credit_init(env, mdt, MDT_TXN_CAPA_KEYS_WRITE_OP); + th = mdt_trans_start(env, mdt); if (IS_ERR(th)) RETURN(PTR_ERR(th)); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 5b9a3d9..5547e62 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -4028,11 +4028,14 @@ out: } static int mdt_stack_init(struct lu_env *env, - struct mdt_device *m, struct lustre_cfg *cfg) + struct mdt_device *m, + struct lustre_cfg *cfg, + struct lustre_mount_info *lmi) { struct lu_device *d = &m->mdt_md_dev.md_lu_dev; struct lu_device *tmp; struct md_device *md; + struct lu_device *child_lu_dev; int rc; ENTRY; @@ -4067,7 +4070,15 @@ static int mdt_stack_init(struct lu_env *env, /* process setup config */ tmp = &m->mdt_md_dev.md_lu_dev; rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg); - GOTO(out, rc); + if (rc) + GOTO(out, rc); + + /* initialize local objects */ + child_lu_dev = &m->mdt_child->md_lu_dev; + + rc = child_lu_dev->ld_ops->ldo_prepare(env, + &m->mdt_md_dev.md_lu_dev, + child_lu_dev); out: /* fini from last known good lu_device */ if (rc) @@ -4210,6 +4221,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, const char *num = lustre_cfg_string(cfg, 2); struct lustre_mount_info *lmi = NULL; struct lustre_sb_info *lsi; + struct lustre_disk_data *ldd; struct lu_site *s; struct md_site *mite; const char *identity_upcall = "NONE"; @@ -4217,6 +4229,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, struct md_device *next; #endif int rc; + int node_id; ENTRY; md_device_init(&m->mdt_md_dev, ldt); @@ -4253,6 +4266,15 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, } else { lsi = s2lsi(lmi->lmi_sb); fsoptions_to_mdt_flags(m, lsi->lsi_lmd->lmd_opts); + server_put_mount_2(dev, lmi->lmi_mnt); + /* CMD is supported only in IAM mode */ + ldd = lsi->lsi_ldd; + LASSERT(num); + node_id = simple_strtol(num, NULL, 10); + if (!(ldd->ldd_flags & LDD_F_IAM_DIR) && node_id) { + CERROR("CMD Operation not allowed in IOP mode\n"); + RETURN(-EINVAL); + } } rwlock_init(&m->mdt_sptlrpc_lock); @@ -4305,12 +4327,11 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, lprocfs_nid_stats_clear_write, obd, NULL); /* set server index */ - LASSERT(num); - lu_site2md(s)->ms_node_id = simple_strtol(num, NULL, 10); + lu_site2md(s)->ms_node_id = node_id; /* failover is the default * FIXME: we do not failout mds0/mgs, which may cause some problems. - * assumed whose ls_node_id == 0 XXX + * assumed whose ms_node_id == 0 XXX * */ obd->obd_replayable = 1; /* No connection accepted until configurations will finish */ @@ -4325,7 +4346,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, } /* init the stack */ - rc = mdt_stack_init((struct lu_env *)env, m, cfg); + rc = mdt_stack_init((struct lu_env *)env, m, cfg, lmi); if (rc) { CERROR("Can't init device stack, rc %d\n", rc); GOTO(err_fini_proc, rc); @@ -4370,7 +4391,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (rc) GOTO(err_free_ns, rc); - rc = mdt_fs_setup(env, m, obd); + rc = mdt_fs_setup(env, m, obd, lsi); if (rc) GOTO(err_capa, rc); @@ -4492,6 +4513,19 @@ static int mdt_process_config(const struct lu_env *env, struct lprocfs_static_vars lvars; struct obd_device *obd = d->ld_obd; + /* + * For interoperability between 1.8 and 2.0, + * skip old "mdt.group_upcall" param. + */ + { + char *param = lustre_cfg_string(cfg, 1); + if (param && !strncmp("mdt.group_upcall", param, 16)) { + CWARN("For 1.8 interoperability, skip this" + " mdt.group_upcall. It is obsolete\n"); + break; + } + } + lprocfs_mdt_init_vars(&lvars); rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, cfg, obd); @@ -4583,7 +4617,7 @@ static void mdt_object_free(const struct lu_env *env, struct lu_object *o) static const struct lu_device_operations mdt_lu_ops = { .ldo_object_alloc = mdt_object_alloc, - .ldo_process_config = mdt_process_config + .ldo_process_config = mdt_process_config, }; static const struct lu_object_operations mdt_obj_ops = { @@ -5200,11 +5234,19 @@ static struct lu_device_type mdt_device_type = { .ldt_ctx_tags = LCT_MD_THREAD }; +static struct lu_local_obj_desc mdt_last_recv = { + .llod_name = LAST_RCVD, + .llod_oid = MDT_LAST_RECV_OID, + .llod_is_index = 0, +}; + static int __init mdt_mod_init(void) { struct lprocfs_static_vars lvars; int rc; + llo_local_obj_register(&mdt_last_recv); + mdt_num_threads = MDT_NUM_THREADS; lprocfs_mdt_init_vars(&lvars); rc = class_register_type(&mdt_obd_device_ops, NULL, diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index b5aaecb..ae204a9 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -381,6 +381,12 @@ struct mdt_commit_cb { void *mdt_cb_data; }; +enum mdt_txn_op { + MDT_TXN_CAPA_KEYS_WRITE_OP, + MDT_TXN_LAST_RCVD_WRITE_OP, +}; + + /* * Info allocated per-transaction. */ @@ -535,7 +541,7 @@ extern void target_recovery_fini(struct obd_device *obd); extern void target_recovery_init(struct obd_device *obd, svc_handler_t handler); int mdt_fs_setup(const struct lu_env *, struct mdt_device *, - struct obd_device *); + struct obd_device *, struct lustre_sb_info *lsi); void mdt_fs_cleanup(const struct lu_env *, struct mdt_device *); int mdt_client_del(const struct lu_env *env, @@ -580,8 +586,12 @@ void mdt_shrink_reply(struct mdt_thread_info *info); int mdt_handle_last_unlink(struct mdt_thread_info *, struct mdt_object *, const struct md_attr *); void mdt_reconstruct_open(struct mdt_thread_info *, struct mdt_lock_handle *); + +void mdt_trans_credit_init(const struct lu_env *env, + struct mdt_device *mdt, + enum mdt_txn_op op); struct thandle* mdt_trans_start(const struct lu_env *env, - struct mdt_device *mdt, int credits); + struct mdt_device *mdt); void mdt_trans_stop(const struct lu_env *env, struct mdt_device *mdt, struct thandle *th); int mdt_record_write(const struct lu_env *env, diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 916e3e0..48c6af1 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -1003,6 +1003,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) * not exist. */ info->mti_spec.sp_cr_lookup = 0; + info->mti_spec.sp_feat = &dt_directory_features; result = mdo_create(info->mti_env, mdt_object_child(parent), diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index 1286919..81c9dfa 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -108,21 +108,49 @@ int mdt_record_write(const struct lu_env *env, rc = -EFAULT; return rc; } -/* only one record write */ -enum { - MDT_TXN_LAST_RCVD_WRITE_CREDITS = 3 -}; +static inline int mdt_trans_credit_get(const struct lu_env *env, + struct mdt_device *mdt, + enum mdt_txn_op op) +{ + struct dt_device *dev = mdt->mdt_bottom; + int cr; + switch (op) { + case MDT_TXN_CAPA_KEYS_WRITE_OP: + case MDT_TXN_LAST_RCVD_WRITE_OP: + cr = dev->dd_ops->dt_credit_get(env, + dev, + DTO_WRITE_BLOCK); + break; + default: + LBUG(); + } + return cr; +} + +void mdt_trans_credit_init(const struct lu_env *env, + struct mdt_device *mdt, + enum mdt_txn_op op) +{ + struct mdt_thread_info *mti; + struct txn_param *p; + int cr; + + mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + p = &mti->mti_txn_param; + + cr = mdt_trans_credit_get(env, mdt, op); + txn_param_init(p, cr); +} struct thandle* mdt_trans_start(const struct lu_env *env, - struct mdt_device *mdt, int credits) + struct mdt_device *mdt) { struct mdt_thread_info *mti; struct txn_param *p; mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); p = &mti->mti_txn_param; - txn_param_init(p, credits); /* export can require sync operations */ if (mti->mti_exp != NULL) @@ -225,7 +253,8 @@ static inline int mdt_last_rcvd_header_write(const struct lu_env *env, mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); - th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS); + mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP); + th = mdt_trans_start(env, mdt); if (IS_ERR(th)) RETURN(PTR_ERR(th)); @@ -419,7 +448,8 @@ err_client: } static int mdt_server_data_init(const struct lu_env *env, - struct mdt_device *mdt) + struct mdt_device *mdt, + struct lustre_sb_info *lsi) { struct lr_server_data *lsd = &mdt->mdt_lsd; struct lsd_client_data *lcd = NULL; @@ -427,6 +457,7 @@ static int mdt_server_data_init(const struct lu_env *env, struct mdt_thread_info *mti; struct dt_object *obj; struct lu_attr *la; + struct lustre_disk_data *ldd; unsigned long last_rcvd_size; __u64 mount_count; int rc; @@ -479,7 +510,13 @@ static int mdt_server_data_init(const struct lu_env *env, } mount_count = lsd->lsd_mount_count; + ldd = lsi->lsi_ldd; + + if (ldd->ldd_flags & LDD_F_IAM_DIR) + lsd->lsd_feature_incompat |= OBD_INCOMPAT_IAM_DIR; + lsd->lsd_feature_compat = OBD_COMPAT_MDT; + lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID; spin_lock(&mdt->mdt_transno_lock); mdt->mdt_last_transno = lsd->lsd_last_transno; @@ -616,7 +653,8 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt) LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off); /* write new client data */ off = med->med_lr_off; - th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS); + mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP); + th = mdt_trans_start(env, mdt); if (IS_ERR(th)) RETURN(PTR_ERR(th)); @@ -739,7 +777,8 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt) * mdt->mdt_last_rcvd may be NULL that time. */ if (mdt->mdt_last_rcvd != NULL) { - th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS); + mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP); + th = mdt_trans_start(env, mdt); if (IS_ERR(th)) GOTO(free, rc = PTR_ERR(th)); @@ -847,7 +886,10 @@ extern struct lu_context_key mdt_thread_key; static int mdt_txn_start_cb(const struct lu_env *env, struct txn_param *param, void *cookie) { - param->tp_credits += MDT_TXN_LAST_RCVD_WRITE_CREDITS; + struct mdt_device *mdt = cookie; + + param->tp_credits += mdt_trans_credit_get(env, mdt, + MDT_TXN_LAST_RCVD_WRITE_OP); return 0; } @@ -946,7 +988,8 @@ static int mdt_txn_commit_cb(const struct lu_env *env, } int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, - struct obd_device *obd) + struct obd_device *obd, + struct lustre_sb_info *lsi) { struct lu_fid fid; struct dt_object *o; @@ -965,10 +1008,10 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb); - o = dt_store_open(env, mdt->mdt_bottom, LAST_RCVD, &fid); + o = dt_store_open(env, mdt->mdt_bottom, "", LAST_RCVD, &fid); if (!IS_ERR(o)) { mdt->mdt_last_rcvd = o; - rc = mdt_server_data_init(env, mdt); + rc = mdt_server_data_init(env, mdt, lsi); if (rc) GOTO(put_last_rcvd, rc); } else { @@ -977,7 +1020,7 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, RETURN(rc); } - o = dt_store_open(env, mdt->mdt_bottom, CAPA_KEYS, &fid); + o = dt_store_open(env, mdt->mdt_bottom, "", CAPA_KEYS, &fid); if (!IS_ERR(o)) { mdt->mdt_ck_obj = o; rc = mdt_capa_keys_init(env, mdt); diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 2fb2fde..4de1f39 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -145,6 +145,7 @@ static int mdt_md_create(struct mdt_thread_info *info) * or not. */ info->mti_spec.sp_cr_lookup = 1; + info->mti_spec.sp_feat = &dt_directory_features; lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen); diff --git a/lustre/obdclass/Makefile.in b/lustre/obdclass/Makefile.in index 1bbd3c3..2c7f0d2 100644 --- a/lustre/obdclass/Makefile.in +++ b/lustre/obdclass/Makefile.in @@ -27,6 +27,7 @@ obdclass-all-objs += statfs_pack.o obdo.o obd_config.o obd_mount.o mea.o obdclass-all-objs += lu_object.o dt_object.o hash.o capa.o lu_time.o obdclass-all-objs += cl_object.o cl_page.o cl_lock.o cl_io.o lu_ref.o obdclass-all-objs += acl.o idmap.o +obdclass-all-objs += md_local_object.o obdclass-objs := $(obdclass-linux-objs) $(obdclass-all-objs) diff --git a/lustre/obdclass/dt_object.c b/lustre/obdclass/dt_object.c index 2a99005..a4bbb9a 100644 --- a/lustre/obdclass/dt_object.c +++ b/lustre/obdclass/dt_object.c @@ -52,6 +52,28 @@ /* fid_be_to_cpu() */ #include +struct dt_find_hint { + struct lu_fid *dfh_fid; + struct dt_device *dfh_dt; + struct dt_object *dfh_o; +}; + +struct dt_thread_info { + char dti_buf[DT_MAX_PATH]; + struct lu_fid_pack dti_pack; + struct dt_find_hint dti_dfh; +}; + +/* context key constructor/destructor: dt_global_key_init, dt_global_key_fini */ +LU_KEY_INIT(dt_global, struct dt_thread_info); +LU_KEY_FINI(dt_global, struct dt_thread_info); + +static struct lu_context_key dt_key = { + .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD, + .lct_init = dt_global_key_init, + .lct_fini = dt_global_key_fini +}; + /* no lock is necessary to protect the list, because call-backs * are added during system startup. Please refer to "struct dt_device". */ @@ -157,13 +179,44 @@ int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj) } EXPORT_SYMBOL(dt_try_as_dir); -extern struct lu_context_key lu_global_key; +enum dt_format_type dt_mode_to_dft(__u32 mode) +{ + enum dt_format_type result; + + switch (mode & S_IFMT) { + case S_IFDIR: + result = DFT_DIR; + break; + case S_IFREG: + result = DFT_REGULAR; + break; + case S_IFLNK: + result = DFT_SYM; + break; + case S_IFCHR: + case S_IFBLK: + case S_IFIFO: + case S_IFSOCK: + result = DFT_NODE; + break; + default: + LBUG(); + break; + } + return result; +} + +EXPORT_SYMBOL(dt_mode_to_dft); +/** + * lookup fid for object named \a name in directory \a dir. + */ static int dt_lookup(const struct lu_env *env, struct dt_object *dir, const char *name, struct lu_fid *fid) { - struct lu_fid_pack *pack = lu_context_key_get(&env->le_ctx, - &lu_global_key); + struct dt_thread_info *info = lu_context_key_get(&env->le_ctx, + &dt_key); + struct lu_fid_pack *pack = &info->dti_pack; struct dt_rec *rec = (struct dt_rec *)pack; const struct dt_key *key = (const struct dt_key *)name; int result; @@ -171,16 +224,21 @@ static int dt_lookup(const struct lu_env *env, struct dt_object *dir, if (dt_try_as_dir(env, dir)) { result = dir->do_index_ops->dio_lookup(env, dir, rec, key, BYPASS_CAPA); - if (result == 0) + if (result > 0) result = fid_unpack(pack, fid); + else if (result == 0) + result = -ENOENT; } else result = -ENOTDIR; return result; } -static struct dt_object *dt_locate(const struct lu_env *env, - struct dt_device *dev, - const struct lu_fid *fid) +/** + * get object for given \a fid. + */ +struct dt_object *dt_locate(const struct lu_env *env, + struct dt_device *dev, + const struct lu_fid *fid) { struct lu_object *obj; struct dt_object *dt; @@ -191,38 +249,154 @@ static struct dt_object *dt_locate(const struct lu_env *env, LASSERT(obj != NULL); dt = container_of(obj, struct dt_object, do_lu); } else - dt = (void *)obj; + dt = (struct dt_object *)obj; return dt; } +EXPORT_SYMBOL(dt_locate); -struct dt_object *dt_store_open(const struct lu_env *env, - struct dt_device *dt, const char *name, - struct lu_fid *fid) +/** + * find a object named \a entry in given \a dfh->dfh_o directory. + */ +static int dt_find_entry(const struct lu_env *env, const char *entry, void *data) { + struct dt_find_hint *dfh = data; + struct dt_device *dt = dfh->dfh_dt; + struct lu_fid *fid = dfh->dfh_fid; + struct dt_object *obj = dfh->dfh_o; + int result; + + result = dt_lookup(env, obj, entry, fid); + lu_object_put(env, &obj->do_lu); + if (result == 0) { + obj = dt_locate(env, dt, fid); + if (IS_ERR(obj)) + result = PTR_ERR(obj); + } + dfh->dfh_o = obj; + return result; +} + +/** + * Abstract function which parses path name. This function feeds + * path component to \a entry_func. + */ +int dt_path_parser(const struct lu_env *env, + char *path, dt_entry_func_t entry_func, + void *data) +{ + char *e; + int rc = 0; + + while (1) { + e = strsep(&path, "/"); + if (e == NULL) + break; + + if (e[0] == 0) { + if (!path || path[0] == '\0') + break; + continue; + } + rc = entry_func(env, e, data); + if (rc) + break; + } + + return rc; +} + +static struct dt_object *dt_store_resolve(const struct lu_env *env, + struct dt_device *dt, + const char *path, + struct lu_fid *fid) +{ + struct dt_thread_info *info = lu_context_key_get(&env->le_ctx, + &dt_key); + struct dt_find_hint *dfh = &info->dti_dfh; + struct dt_object *obj = dfh->dfh_o; + char *local = info->dti_buf; int result; - struct dt_object *root; - struct dt_object *child; + dfh->dfh_dt = dt; + dfh->dfh_fid = fid; + + strncpy(local, path, DT_MAX_PATH); + local[DT_MAX_PATH - 1] = '\0'; result = dt->dd_ops->dt_root_get(env, dt, fid); if (result == 0) { - root = dt_locate(env, dt, fid); - if (!IS_ERR(root)) { - result = dt_lookup(env, root, name, fid); - if (result == 0) - child = dt_locate(env, dt, fid); - else - child = ERR_PTR(result); - lu_object_put(env, &root->do_lu); - } else { - CERROR("No root\n"); - child = (void *)root; + obj = dt_locate(env, dt, fid); + if (!IS_ERR(obj)) { + dfh->dfh_o = obj; + result = dt_path_parser(env, local, dt_find_entry, dfh); + if (result != 0) + obj = ERR_PTR(result); } - } else - child = ERR_PTR(result); - return child; + } else { + obj = ERR_PTR(result); + } + return obj; +} + +static struct dt_object *dt_reg_open(const struct lu_env *env, + struct dt_device *dt, + struct dt_object *p, + const char *name, + struct lu_fid *fid) +{ + struct dt_object *o; + int result; + + result = dt_lookup(env, p, name, fid); + if (result == 0){ + o = dt_locate(env, dt, fid); + } + else + o = ERR_PTR(result); + + return o; +} + +/** + * Open dt object named \a filename from \a dirname directory. + * \param dt dt device + * \param fid on success, object fid is stored in *fid + */ +struct dt_object *dt_store_open(const struct lu_env *env, + struct dt_device *dt, + const char *dirname, + const char *filename, + struct lu_fid *fid) +{ + struct dt_object *file; + struct dt_object *dir; + + dir = dt_store_resolve(env, dt, dirname, fid); + if (!IS_ERR(dir)) { + file = dt_reg_open(env, dt, dir, + filename, fid); + lu_object_put(env, &dir->do_lu); + } else { + file = dir; + } + return file; } EXPORT_SYMBOL(dt_store_open); +/* dt class init function. */ +int dt_global_init(void) +{ + int result; + + LU_CONTEXT_KEY_INIT(&dt_key); + result = lu_context_key_register(&dt_key); + return result; +} + +void dt_global_fini(void) +{ + lu_context_key_degister(&dt_key); +} + const struct dt_index_features dt_directory_features; EXPORT_SYMBOL(dt_directory_features); diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index 42798fb..01b2d3e 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -1512,6 +1512,12 @@ void cl_global_fini(void); int lu_ref_global_init(void); void lu_ref_global_fini(void); +int dt_global_init(void); +void dt_global_fini(void); + +int llo_global_init(void); +void llo_global_fini(void); + /** * Initialization of global lu_* data. */ @@ -1549,10 +1555,22 @@ int lu_global_init(void) return -ENOMEM; result = lu_time_global_init(); - if (result != 0) - return result; + if (result) + GOTO(out, result); + +#ifdef __KERNEL__ + result = dt_global_init(); + if (result) + GOTO(out, result); - return cl_global_init(); + result = llo_global_init(); + if (result) + GOTO(out, result); +#endif + result = cl_global_init(); +out: + + return result; } /** @@ -1561,6 +1579,10 @@ int lu_global_init(void) void lu_global_fini(void) { cl_global_fini(); +#ifdef __KERNEL__ + llo_global_fini(); + dt_global_fini(); +#endif lu_time_global_fini(); if (lu_site_shrinker != NULL) { remove_shrinker(lu_site_shrinker); @@ -1739,4 +1761,3 @@ void lu_kmem_fini(struct lu_kmem_descr *caches) } } EXPORT_SYMBOL(lu_kmem_fini); - diff --git a/lustre/obdclass/md_local_object.c b/lustre/obdclass/md_local_object.c new file mode 100644 index 0000000..919c284 --- /dev/null +++ b/lustre/obdclass/md_local_object.c @@ -0,0 +1,447 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/obdclass/md_local_object.c + * + * Lustre Local Object create APIs + * 'create on first mount' facility. Files registed under llo module will + * be created on first mount. + * + * Author: Pravin Shelar + */ + +#define DEBUG_SUBSYSTEM S_CLASS +#ifndef EXPORT_SYMTAB +# define EXPORT_SYMTAB +#endif + +#include +#include +#include +#include +#include +#include + + +/** List head to hold list of objects to be created. */ +static struct list_head llo_lobj_list; + +/** Lock to protect list manipulations */ +static struct mutex llo_lock; + +/** + * Structure used to maintain state of path parsing. + * \see llo_find_entry, llo_store_resolve + */ +struct llo_find_hint { + struct lu_fid *lfh_cfid; + struct md_device *lfh_md; + struct md_object *lfh_pobj; +}; + +/** + * Thread Local storage for this module. + */ +struct llo_thread_info { + /** buffer to resolve path */ + char lti_buf[DT_MAX_PATH]; + /** used for path resolve */ + struct lu_fid lti_fid; + /** used to pass child object fid */ + struct lu_fid lti_cfid; + struct llo_find_hint lti_lfh; + struct md_op_spec lti_spc; + struct md_attr lti_ma; + struct lu_name lti_lname; +}; + +LU_KEY_INIT(llod_global, struct llo_thread_info); +LU_KEY_FINI(llod_global, struct llo_thread_info); + +static struct lu_context_key llod_key = { + .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD, + .lct_init = llod_global_key_init, + .lct_fini = llod_global_key_fini +}; + +static inline struct llo_thread_info * llo_env_info(const struct lu_env *env) +{ + return lu_context_key_get(&env->le_ctx, &llod_key); +} + +/** + * Search md object for given fid. + */ +static struct md_object *llo_locate(const struct lu_env *env, + struct md_device *md, + const struct lu_fid *fid) +{ + struct lu_object *obj; + struct md_object *mdo; + + obj = lu_object_find(env, &md->md_lu_dev, fid, NULL); + if (!IS_ERR(obj)) { + obj = lu_object_locate(obj->lo_header, md->md_lu_dev.ld_type); + LASSERT(obj != NULL); + mdo = (struct md_object *) obj; + } else + mdo = (struct md_object *)obj; + return mdo; +} + +/** + * Lookup FID for object named \a name in directory \a pobj. + */ +static int llo_lookup(const struct lu_env *env, + struct md_object *pobj, + const char *name, + struct lu_fid *fid) +{ + struct llo_thread_info *info = llo_env_info(env); + struct lu_name *lname = &info->lti_lname; + struct md_op_spec *spec = &info->lti_spc; + + spec->sp_feat = NULL; + spec->sp_cr_flags = 0; + spec->sp_cr_lookup = 1; + spec->sp_cr_mode = 0; + spec->sp_ck_split = 0; + + lname->ln_name = name; + lname->ln_namelen = strlen(name); + + return mdo_lookup(env, pobj, lname, fid, spec); +} + +/** + * Function to look up path component, this is passed to parsing + * function. \see llo_store_resolve + */ +static int llo_find_entry(const struct lu_env *env, + const char *name, void *data) +{ + struct llo_find_hint *lfh = data; + struct md_device *md = lfh->lfh_md; + struct lu_fid *fid = lfh->lfh_cfid; + struct md_object *obj = lfh->lfh_pobj; + int result; + + /* lookup fid for object */ + result = llo_lookup(env, obj, name, fid); + lu_object_put(env, &obj->mo_lu); + + if (result == 0) { + /* get md object for fid that we got in lookup */ + obj = llo_locate(env, md, fid); + if (IS_ERR(obj)) + result = PTR_ERR(obj); + } + + lfh->lfh_pobj = obj; + return result; +} + +static struct md_object *llo_reg_open(const struct lu_env *env, + struct md_device *md, + struct md_object *p, + const char *name, + struct lu_fid *fid) +{ + struct md_object *o; + int result; + + result = llo_lookup(env, p, name, fid); + if (result == 0) + o = llo_locate(env, md, fid); + else + o = ERR_PTR(result); + + return o; +} + +/** + * Resolve given \a path, on success function returns + * md object for last directory and \a fid points to + * its fid. + */ +struct md_object *llo_store_resolve(const struct lu_env *env, + struct md_device *md, + struct dt_device *dt, + const char *path, + struct lu_fid *fid) +{ + struct llo_thread_info *info = llo_env_info(env); + struct llo_find_hint *lfh = &info->lti_lfh; + char *local = info->lti_buf; + struct md_object *obj = lfh->lfh_pobj; + int result; + + strncpy(local, path, DT_MAX_PATH); + local[DT_MAX_PATH - 1] = '\0'; + + lfh->lfh_md = md; + lfh->lfh_cfid = fid; + /* start path resolution from backend fs root. */ + result = dt->dd_ops->dt_root_get(env, dt, fid); + if (result == 0) { + /* get md object for root */ + obj = llo_locate(env, md, fid); + if (!IS_ERR(obj)) { + /* start path parser from root md */ + lfh->lfh_pobj = obj; + result = dt_path_parser(env, local, llo_find_entry, lfh); + if (result != 0) + obj = ERR_PTR(result); + } + } else { + obj = ERR_PTR(result); + } + return obj; +} +EXPORT_SYMBOL(llo_store_resolve); + +/** + * Returns md object for \a objname in given \a dirname. + */ +struct md_object *llo_store_open(const struct lu_env *env, + struct md_device *md, + struct dt_device *dt, + const char *dirname, + const char *objname, + struct lu_fid *fid) +{ + struct md_object *obj; + struct md_object *dir; + + /* search md object for parent dir */ + dir = llo_store_resolve(env, md, dt, dirname, fid); + if (!IS_ERR(dir)) { + obj = llo_reg_open(env, md, dir, objname, fid); + lu_object_put(env, &dir->mo_lu); + } else + obj = dir; + + return obj; +} +EXPORT_SYMBOL(llo_store_open); + +static struct md_object *llo_create_obj(const struct lu_env *env, + struct md_device *md, + struct md_object *dir, + const char *objname, + const struct lu_fid *fid, + const struct dt_index_features *feat) +{ + struct llo_thread_info *info = llo_env_info(env); + struct md_object *mdo; + struct md_attr *ma = &info->lti_ma; + struct md_op_spec *spec = &info->lti_spc; + struct lu_name *lname = &info->lti_lname; + struct lu_attr *la = &ma->ma_attr; + int rc; + + mdo = llo_locate(env, md, fid); + if (IS_ERR(mdo)) + return mdo; + + lname->ln_name = objname; + lname->ln_namelen = strlen(objname); + + spec->sp_feat = feat; + spec->sp_cr_flags = 0; + spec->sp_cr_lookup = 1; + spec->sp_cr_mode = 0; + spec->sp_ck_split = 0; + + if (feat == &dt_directory_features) + la->la_mode = S_IFDIR; + else + la->la_mode = S_IFREG; + + la->la_mode |= S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; + la->la_uid = la->la_gid = 0; + la->la_valid = LA_MODE | LA_UID | LA_GID; + + ma->ma_valid = 0; + ma->ma_need = 0; + + rc = mdo_create(env, dir, lname, mdo, spec, ma); + + if (rc) { + lu_object_put(env, &mdo->mo_lu); + mdo = ERR_PTR(rc); + } + + return mdo; +} + +/** + * Create md object, object could be diretcory or + * special index defined by \a feat in \a directory. + * + * \param md device + * \param dir parent directory + * \param objname file name + * \param fid object fid + * \param feat index features required for directory create + */ + +struct md_object *llo_store_create_index(const struct lu_env *env, + struct md_device *md, + struct dt_device *dt, + const char *dirname, + const char *objname, + const struct lu_fid *fid, + const struct dt_index_features *feat) +{ + struct llo_thread_info *info = llo_env_info(env); + struct md_object *obj; + struct md_object *dir; + struct lu_fid *ignore = &info->lti_fid; + + dir = llo_store_resolve(env, md, dt, dirname, ignore); + if (!IS_ERR(dir)) { + obj = llo_create_obj(env, md, dir, objname, fid, feat); + lu_object_put(env, &dir->mo_lu); + } else { + obj = dir; + } + return obj; +} + +EXPORT_SYMBOL(llo_store_create_index); + +/** + * Create md object for regular file in \a directory. + * + * \param md device + * \param dir parent directory + * \param objname file name + * \param fid object fid. + */ + +struct md_object *llo_store_create(const struct lu_env *env, + struct md_device *md, + struct dt_device *dt, + const char *dirname, + const char *objname, + const struct lu_fid *fid) +{ + return llo_store_create_index(env, md, dt, dirname, + objname, fid, NULL); +} + +EXPORT_SYMBOL(llo_store_create); + +/** + * Register object for 'create on first mount' facility. + */ + +int llo_local_obj_register(struct lu_local_obj_desc *llod) +{ + mutex_lock(&llo_lock); + list_add(&llod->llod_linkage, &llo_lobj_list); + mutex_unlock(&llo_lock); + + return 0; +} + +EXPORT_SYMBOL(llo_local_obj_register); + +/** + * Created registed objects. + */ + +int llo_local_objects_setup(const struct lu_env *env, + struct md_device * md, + struct dt_device *dt) +{ + struct llo_thread_info *info = llo_env_info(env); + struct lu_fid *fid; + struct lu_local_obj_desc *scan; + struct md_object *mdo; + int rc = 0; + + fid = &info->lti_cfid; + + mutex_lock(&llo_lock); + + list_for_each_entry(scan, &llo_lobj_list, llod_linkage) { + + lu_local_obj_fid(fid, scan->llod_oid); + + if (scan->llod_is_index) + mdo = llo_store_create_index(env, md, dt , + "", scan->llod_name, + fid, + scan->llod_feat); + else + mdo = llo_store_create(env, md, dt, + "", scan->llod_name, + fid); + if (IS_ERR(mdo) && PTR_ERR(mdo) != -EEXIST) { + rc = PTR_ERR(mdo); + CERROR("creating obj [%s] fid = "DFID" rc = %d\n", + scan->llod_name, PFID(fid), rc); + goto out; + } + + if (!IS_ERR(mdo)) + lu_object_put(env, &mdo->mo_lu); + } + +out: + mutex_unlock(&llo_lock); + return rc; +} + +EXPORT_SYMBOL(llo_local_objects_setup); + +int llo_global_init(void) +{ + int result; + + CFS_INIT_LIST_HEAD(&llo_lobj_list); + mutex_init(&llo_lock); + + LU_CONTEXT_KEY_INIT(&llod_key); + result = lu_context_key_register(&llod_key); + return result; +} + +void llo_global_fini(void) +{ + lu_context_key_degister(&llod_key); +} diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 128343e..ef51c1e 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -1047,6 +1047,29 @@ static int class_config_llog_handler(struct llog_handle * handle, break; } + /* + * For interoperability between 1.8 and 2.0, + * rename "mds" obd device type to "mdt". + */ + { + char *typename = lustre_cfg_string(lcfg, 1); + char *index = lustre_cfg_string(lcfg, 2); + + if ((lcfg->lcfg_command == LCFG_ATTACH && typename && + strcmp(typename, "mds") == 0)) { + CWARN("For 1.8 interoperability, rename obd " + "type from mds to mdt\n"); + typename[2] = 't'; + } + if ((lcfg->lcfg_command == LCFG_SETUP && index && + strcmp(index, "type") == 0)) { + CWARN("For 1.8 interoperability, set this" + " index to '0'\n"); + index[0] = '0'; + index[1] = 0; + } + } + if ((clli->cfg_flags & CFG_F_EXCLUDE) && (lcfg->lcfg_command == LCFG_LOV_ADD_OBD)) /* Add inactive instead */ diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 639bfbb..2c3d5c1 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -80,7 +80,7 @@ #include "filter_internal.h" /* Group 0 is no longer a legal group, to catch uninitialized IDs */ -#define FILTER_MIN_GROUPS FILTER_GROUP_MDS0 +#define FILTER_MIN_GROUPS FILTER_GROUP_MDS1_N_BASE static struct lvfs_callback_ops filter_lvfs_ops; cfs_mem_cache_t *ll_fmd_cachep; @@ -955,7 +955,9 @@ static int filter_update_last_group(struct obd_device *obd, int group) CDEBUG(D_INODE, "error reading LAST_GROUP: rc %d\n",rc); GOTO(cleanup, rc); } - LASSERT(off == 0 || last_group >= FILTER_MIN_GROUPS); + LASSERTF(off == 0 || CHECK_MDS_GROUP(last_group), + "off = %llu and last_group = %d\n", off, last_group); + CDEBUG(D_INODE, "%s: previous %d, new %d\n", obd->obd_name, last_group, group); @@ -1145,8 +1147,6 @@ static int filter_read_groups(struct obd_device *obd, int last_group, down(&filter->fo_init_lock); old_count = filter->fo_group_count; for (group = old_count; group <= last_group; group++) { - if (group == 0) - continue; /* no group zero */ rc = filter_read_group_internal(obd, group, create); if (rc != 0) @@ -1245,7 +1245,7 @@ static int filter_prep_groups(struct obd_device *obd) if (off == 0) { last_group = FILTER_MIN_GROUPS; } else { - LASSERT(last_group >= FILTER_MIN_GROUPS); + LASSERT_MDS_GROUP(last_group); } CWARN("%s: initialize groups [%d,%d]\n", obd->obd_name, @@ -1369,7 +1369,7 @@ static void filter_post(struct obd_device *obd) if (rc) CERROR("error writing server data: rc = %d\n", rc); - for (i = 1; i < filter->fo_group_count; i++) { + for (i = 0; i < filter->fo_group_count; i++) { rc = filter_update_last_objid(obd, i, (i == filter->fo_group_count - 1)); if (rc) @@ -1416,7 +1416,6 @@ obd_id filter_last_id(struct filter_obd *filter, obd_gr group) spin_lock(&filter->fo_objidlock); id = filter->fo_last_objids[group]; spin_unlock(&filter->fo_objidlock); - return id; } @@ -1433,7 +1432,7 @@ struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid) struct filter_subdirs *subdirs; LASSERT(group < filter->fo_group_count); /* FIXME: object groups */ - if ((group > 0 && group < FILTER_GROUP_MDS0) || + if ((group > FILTER_GROUP_MDS0 && group < FILTER_GROUP_MDS1_N_BASE) || filter->fo_subdir_count == 0) return filter->fo_dentry_O_groups[group]; @@ -2770,8 +2769,6 @@ static int filter_connect(const struct lu_env *env, } group = data->ocd_group; - if (group == 0) - GOTO(cleanup, rc); CWARN("%s: Received MDS connection ("LPX64"); group %d\n", obd->obd_name, exp->exp_handle.h_cookie, group); @@ -2948,7 +2945,7 @@ static void filter_sync_llogs(struct obd_device *obd, struct obd_export *dexp) { struct obd_llog_group *olg_min, *olg; struct filter_obd *filter; - int worked = 0, group; + int worked = -1, group; struct llog_ctxt *ctxt; ENTRY; @@ -3454,7 +3451,7 @@ static int filter_destroy_precreated(struct obd_export *exp, struct obdo *oa, ENTRY; LASSERT(oa); - LASSERT(oa->o_gr != 0); + LASSERT_MDS_GROUP(oa->o_gr); LASSERT(oa->o_valid & OBD_MD_FLGROUP); LASSERT(down_trylock(&filter->fo_create_locks[oa->o_gr]) != 0); @@ -3552,8 +3549,8 @@ static int filter_handle_precreate(struct obd_export *exp, struct obdo *oa, obd->obd_name); GOTO(out, rc = 0); } - /* only precreate if group == 0 and o_id is specified */ - if (group < FILTER_GROUP_MDS0 || oa->o_id == 0) + /* only precreate if group == 0 and o_id is specfied */ + if (group == FILTER_GROUP_LLOG || oa->o_id == 0) diff = 1; else diff = oa->o_id - filter_last_id(filter, group); @@ -3832,7 +3829,7 @@ static int filter_create(struct obd_export *exp, struct obdo *oa, CDEBUG(D_INODE, "%s: filter_create(od->o_gr="LPU64",od->o_id=" LPU64")\n", obd->obd_name, oa->o_gr, oa->o_id); - if (!(oa->o_valid & OBD_MD_FLGROUP) || group == 0) { + if (!(oa->o_valid & OBD_MD_FLGROUP)) { CERROR("!!! nid %s sent invalid object group %d\n", obd_export_nid2str(exp), group); RETURN(-EINVAL); @@ -4230,13 +4227,32 @@ static int filter_get_info(struct obd_export *exp, __u32 keylen, RETURN(-EINVAL); } +static inline int filter_setup_llog_group(struct obd_export *exp, + struct obd_device *obd, + int group) +{ + struct obd_llog_group *olg; + struct llog_ctxt *ctxt; + int rc; + + olg = filter_find_create_olg(obd, group); + if (IS_ERR(olg)) + RETURN(PTR_ERR(olg)); + + llog_group_set_export(olg, exp); + + ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT); + LASSERTF(ctxt != NULL, "ctxt is null\n"); + + rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse); + llog_ctxt_put(ctxt); + return rc; +} static int filter_set_info_async(struct obd_export *exp, __u32 keylen, void *key, __u32 vallen, void *val, struct ptlrpc_request_set *set) { struct obd_device *obd; - struct obd_llog_group *olg; - struct llog_ctxt *ctxt; int rc = 0, group; ENTRY; @@ -4268,23 +4284,20 @@ static int filter_set_info_async(struct obd_export *exp, __u32 keylen, /* setup llog imports */ LASSERT(val != NULL); - group = (int)(*(__u32 *)val); - LASSERT(group >= FILTER_GROUP_MDS0); - - olg = filter_find_create_olg(obd, group); - if (IS_ERR(olg)) - RETURN(PTR_ERR(olg)); - - llog_group_set_export(olg, exp); - ctxt = llog_group_get_ctxt(olg, LLOG_MDS_OST_REPL_CTXT); - LASSERTF(ctxt != NULL, "ctxt is null\n"); - - rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse); - llog_ctxt_put(ctxt); + group = (int)(*(__u32 *)val); + LASSERT_MDS_GROUP(group); + rc = filter_setup_llog_group(exp, obd, group); + if (rc) + goto out; lquota_setinfo(filter_quota_interface_ref, obd, exp); + if (group == FILTER_GROUP_MDS0) { + /* setup llog group 1 for interop */ + filter_setup_llog_group(exp, obd, FILTER_GROUP_LLOG); + } +out: RETURN(rc); } diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index e5db720..28578d6 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -235,12 +235,6 @@ static void lprocfs_filter_init_vars(struct lprocfs_static_vars *lvars) /* Quota stuff */ extern quota_interface_t *filter_quota_interface_ref; -/* Capability */ -static inline __u64 obdo_mdsno(struct obdo *oa) -{ - return oa->o_gr - FILTER_GROUP_MDS0; -} - int filter_update_capa_key(struct obd_device *obd, struct lustre_capa_key *key); int filter_auth_capa(struct obd_export *exp, struct lu_fid *fid, __u64 mdsid, struct lustre_capa *capa, __u64 opc); diff --git a/lustre/obdfilter/lproc_obdfilter.c b/lustre/obdfilter/lproc_obdfilter.c index 7810acc..5e5f1c7 100644 --- a/lustre/obdfilter/lproc_obdfilter.c +++ b/lustre/obdfilter/lproc_obdfilter.c @@ -103,8 +103,14 @@ static int lprocfs_filter_rd_last_id(char *page, char **start, off_t off, if (obd == NULL) return 0; + rc = snprintf(page, count, LPU64"\n",filter_last_id(filter, 0)); + if (rc < 0) + return rc; + page += rc; + count -= rc; + retval += rc; - for (i = FILTER_GROUP_MDS0; i < filter->fo_group_count; i++) { + for (i = FILTER_GROUP_MDS1_N_BASE + 1; i < filter->fo_group_count; i++) { rc = snprintf(page, count, LPU64"\n",filter_last_id(filter, i)); if (rc < 0) { retval = rc; diff --git a/lustre/osc/osc_create.c b/lustre/osc/osc_create.c index b34341b..39cea5a 100644 --- a/lustre/osc/osc_create.c +++ b/lustre/osc/osc_create.c @@ -184,7 +184,7 @@ static int oscc_internal_create(struct osc_creator *oscc) spin_lock(&oscc->oscc_lock); body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count; body->oa.o_gr = oscc->oscc_oa.o_gr; - LASSERT(body->oa.o_gr > 0); + LASSERT_MDS_GROUP(body->oa.o_gr); body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP; spin_unlock(&oscc->oscc_lock); CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n", @@ -317,7 +317,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa, LASSERT(oa); LASSERT(ea); - LASSERT(oa->o_gr > 0); + LASSERT_MDS_GROUP(oa->o_gr); LASSERT(oa->o_valid & OBD_MD_FLGROUP); if ((oa->o_valid & OBD_MD_FLFLAGS) && diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 329200b..f36306f 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -96,7 +96,7 @@ static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, if (lsm) { LASSERT(lsm->lsm_object_id); - LASSERT(lsm->lsm_object_gr); + LASSERT_MDS_GROUP(lsm->lsm_object_gr); (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id); (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr); } @@ -153,7 +153,7 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id); (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr); LASSERT((*lsmp)->lsm_object_id); - LASSERT((*lsmp)->lsm_object_gr); + LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr); } (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; @@ -312,8 +312,10 @@ static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo, int rc; ENTRY; - LASSERT(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) || - oinfo->oi_oa->o_gr > 0); + LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) || + CHECK_MDS_GROUP(oinfo->oi_oa->o_gr), + "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n", + oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr); req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); if (req == NULL) @@ -3634,7 +3636,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen, oscc->oscc_oa.o_gr = (*(__u32 *)val); oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP; - LASSERT(oscc->oscc_oa.o_gr > 0); + LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr); req->rq_interpret_reply = osc_setinfo_mds_conn_interpret; } diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index 57e2f71..1844e6c 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -82,6 +82,14 @@ #include "osd_internal.h" #include "osd_igif.h" +/* llo_* api support */ +#include + +static const char MDT_XATTR_NAME[] = "trusted.lma"; +static const char dot[] = "."; +static const char dotdot[] = ".."; +static const char remote_obj_dir[] = "REM_OBJ_DIR"; + struct osd_directory { struct iam_container od_container; struct iam_descr od_descr; @@ -102,6 +110,14 @@ struct osd_object { struct osd_directory *oo_dir; /** protects inode attributes. */ spinlock_t oo_guard; + /** + * Following two members are used to indicate the presence of dot and + * dotdot in the given directory. This is required for interop mode + * (b11826). + */ + int oo_compat_dot_created; + int oo_compat_dotdot_created; + const struct lu_env *oo_owner; #ifdef CONFIG_LOCKDEP struct lockdep_map oo_dep_map; @@ -145,38 +161,60 @@ static int osd_inode_setattr (const struct lu_env *env, struct inode *inode, const struct lu_attr *attr); static int osd_param_is_sane (const struct osd_device *dev, const struct txn_param *param); -static int osd_index_lookup (const struct lu_env *env, - struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa); -static int osd_index_insert (const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *handle, - struct lustre_capa *capa, - int ingore_quota); -static int osd_index_delete (const struct lu_env *env, - struct dt_object *dt, const struct dt_key *key, - struct thandle *handle, - struct lustre_capa *capa); -static int osd_index_probe (const struct lu_env *env, - struct osd_object *o, - const struct dt_index_features *feat); +static int osd_index_iam_lookup(const struct lu_env *env, + struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key, + struct lustre_capa *capa); +static int osd_index_ea_lookup(const struct lu_env *env, + struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key, + struct lustre_capa *capa); +static int osd_index_iam_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa, + int ingore_quota); +static int osd_index_ea_insert (const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa, + int ingore_quota); +static int osd_index_iam_delete(const struct lu_env *env, + struct dt_object *dt, const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa); +static int osd_index_ea_delete (const struct lu_env *env, + struct dt_object *dt, const struct dt_key *key, + struct thandle *handle, + struct lustre_capa *capa); + +static int osd_iam_index_probe (const struct lu_env *env, + struct osd_object *o, + const struct dt_index_features *feat); static int osd_index_try (const struct lu_env *env, struct dt_object *dt, const struct dt_index_features *feat); static void osd_index_fini (struct osd_object *o); -static void osd_it_fini (const struct lu_env *env, struct dt_it *di); -static int osd_it_get (const struct lu_env *env, +static void osd_it_iam_fini (const struct lu_env *env, struct dt_it *di); +static int osd_it_iam_get (const struct lu_env *env, + struct dt_it *di, const struct dt_key *key); +static void osd_it_iam_put (const struct lu_env *env, struct dt_it *di); +static int osd_it_iam_next (const struct lu_env *env, struct dt_it *di); +static int osd_it_iam_key_size (const struct lu_env *env, + const struct dt_it *di); +static void osd_it_ea_fini (const struct lu_env *env, struct dt_it *di); +static int osd_it_ea_get (const struct lu_env *env, struct dt_it *di, const struct dt_key *key); -static void osd_it_put (const struct lu_env *env, struct dt_it *di); -static int osd_it_next (const struct lu_env *env, struct dt_it *di); -static int osd_it_del (const struct lu_env *env, struct dt_it *di, - struct thandle *th); -static int osd_it_key_size (const struct lu_env *env, +static void osd_it_ea_put (const struct lu_env *env, struct dt_it *di); +static int osd_it_ea_next (const struct lu_env *env, struct dt_it *di); +static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di); + static void osd_conf_get (const struct lu_env *env, const struct dt_device *dev, struct dt_device_param *param); @@ -202,13 +240,21 @@ static struct inode *osd_iget (struct osd_thread_info *info, struct osd_device *dev, const struct osd_inode_id *id); static struct super_block *osd_sb (const struct osd_device *dev); -static struct dt_it *osd_it_init (const struct lu_env *env, - struct dt_object *dt, int wable, +static struct dt_it *osd_it_iam_init (const struct lu_env *env, + struct dt_object *dt, + struct lustre_capa *capa); +static struct dt_key *osd_it_iam_key (const struct lu_env *env, + const struct dt_it *di); +static struct dt_rec *osd_it_iam_rec (const struct lu_env *env, + const struct dt_it *di); +static struct dt_it *osd_it_ea_init (const struct lu_env *env, + struct dt_object *dt, struct lustre_capa *capa); -static struct dt_key *osd_it_key (const struct lu_env *env, +static struct dt_key *osd_it_ea_key (const struct lu_env *env, const struct dt_it *di); -static struct dt_rec *osd_it_rec (const struct lu_env *env, +static struct dt_rec *osd_it_ea_rec (const struct lu_env *env, const struct dt_it *di); + static struct timespec *osd_inode_time (const struct lu_env *env, struct inode *inode, __u64 seconds); @@ -217,6 +263,12 @@ static struct thandle *osd_trans_start (const struct lu_env *env, struct txn_param *p); static journal_t *osd_journal (const struct osd_device *dev); +static int __osd_ea_add_rec(struct osd_thread_info *info, + struct osd_object *pobj, + struct osd_object *cobj, + const char *name, + struct thandle *th); + static const struct lu_device_type_operations osd_device_type_ops; static struct lu_device_type osd_device_type; static const struct lu_object_operations osd_lu_obj_ops; @@ -224,9 +276,10 @@ static struct obd_ops osd_obd_device_ops; static const struct lu_device_operations osd_lu_ops; static struct lu_context_key osd_key; static const struct dt_object_operations osd_obj_ops; +static const struct dt_object_operations osd_obj_ea_ops; static const struct dt_body_operations osd_body_ops; -static const struct dt_index_operations osd_index_ops; -static const struct dt_index_operations osd_index_compat_ops; +static const struct dt_index_operations osd_index_iam_ops; +static const struct dt_index_operations osd_index_ea_ops; struct osd_thandle { struct thandle ot_super; @@ -343,7 +396,11 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env, l = &mo->oo_dt.do_lu; dt_object_init(&mo->oo_dt, NULL, d); - mo->oo_dt.do_ops = &osd_obj_ops; + if (osd_dev(d)->od_iop_mode) + mo->oo_dt.do_ops = &osd_obj_ea_ops; + else + mo->oo_dt.do_ops = &osd_obj_ops; + l->lo_ops = &osd_lu_obj_ops; init_rwsem(&mo->oo_sem); spin_lock_init(&mo->oo_guard); @@ -398,11 +455,18 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l) OBD_FREE_PTR(obj); } -static struct iam_path_descr *osd_ipd_get(const struct lu_env *env, - const struct iam_container *bag) +static struct iam_path_descr *osd_it_ipd_get(const struct lu_env *env, + const struct iam_container *bag) +{ + return bag->ic_descr->id_ops->id_ipd_alloc(bag, + osd_oti_get(env)->oti_it_ipd); +} + +static struct iam_path_descr *osd_idx_ipd_get(const struct lu_env *env, + const struct iam_container *bag) { return bag->ic_descr->id_ops->id_ipd_alloc(bag, - osd_oti_get(env)->oti_ipd); + osd_oti_get(env)->oti_idx_ipd); } static void osd_ipd_put(const struct lu_env *env, @@ -486,8 +550,6 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l) /* * If object is unlinked remove fid->ino mapping from object index. - * - * File body will be deleted by iput(). */ osd_index_fini(obj); @@ -501,6 +563,7 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l) "Failed to cleanup: %d\n", result); } + iput(inode); obj->oo_inode = NULL; } @@ -579,6 +642,19 @@ static void osd_conf_get(const struct lu_env *env, param->ddp_block_shift = osd_sb(osd_dt_dev(dev))->s_blocksize_bits; } +/** + * Helper function to get and fill the buffer with input values. + */ +static struct lu_buf *osd_buf_get(const struct lu_env *env, void *area, ssize_t len) +{ + struct lu_buf *buf; + + buf = &osd_oti_get(env)->oti_buf; + buf->lb_buf = area; + buf->lb_len = len; + return buf; +} + /* * Journal */ @@ -767,6 +843,7 @@ static void osd_ro(const struct lu_env *env, struct dt_device *d) EXIT; } + /* * Concurrency: serialization provided by callers. */ @@ -1259,6 +1336,43 @@ static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj, extern struct inode *ldiskfs_create_inode(handle_t *handle, struct inode * dir, int mode); +extern int ldiskfs_add_entry(handle_t *handle, struct dentry *dentry, + struct inode *inode); +extern int ldiskfs_delete_entry(handle_t *handle, + struct inode * dir, + struct ldiskfs_dir_entry_2 * de_del, + struct buffer_head * bh); +extern struct buffer_head * ldiskfs_find_entry(struct dentry *dentry, + struct ldiskfs_dir_entry_2 + ** res_dir); +extern int ldiskfs_add_dot_dotdot(handle_t *handle, struct inode *dir, + struct inode *inode); + +extern int ldiskfs_xattr_set_handle(handle_t *handle, struct inode *inode, + int name_index, const char *name, + const void *value, size_t value_len, + int flags); + +static struct dentry * osd_child_dentry_get(const struct lu_env *env, + struct osd_object *obj, + const char *name, + const int namelen) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct dentry *child_dentry = &info->oti_child_dentry; + struct dentry *obj_dentry = &info->oti_obj_dentry; + + obj_dentry->d_inode = obj->oo_inode; + obj_dentry->d_sb = osd_sb(osd_obj2dev(obj)); + obj_dentry->d_name.hash = 0; + + child_dentry->d_name.hash = 0; + child_dentry->d_parent = obj_dentry; + child_dentry->d_name.name = name; + child_dentry->d_name.len = namelen; + return child_dentry; +} + static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, umode_t mode, @@ -1268,7 +1382,7 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, int result; struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oth; - struct inode *parent; + struct dt_object *parent; struct inode *inode; #ifdef HAVE_QUOTA_SUPPORT struct osd_ctxt *save = &info->oti_ctxt; @@ -1276,21 +1390,23 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_inode == NULL); - LASSERT(osd->od_obj_area != NULL); oth = container_of(th, struct osd_thandle, ot_super); LASSERT(oth->ot_handle->h_transaction != NULL); if (hint && hint->dah_parent) - parent = osd_dt_obj(hint->dah_parent)->oo_inode; + parent = hint->dah_parent; else - parent = osd->od_obj_area->d_inode; - LASSERT(parent->i_op != NULL); + parent = osd->od_obj_area; + + LASSERT(parent != NULL); + LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL); #ifdef HAVE_QUOTA_SUPPORT osd_push_ctxt(info->oti_env, save); #endif - inode = ldiskfs_create_inode(oth->ot_handle, parent, mode); + inode = ldiskfs_create_inode(oth->ot_handle, + osd_dt_obj(parent)->oo_inode, mode); #ifdef HAVE_QUOTA_SUPPORT osd_pop_ctxt(save); #endif @@ -1307,6 +1423,10 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj, extern int iam_lvar_create(struct inode *obj, int keysize, int ptrsize, int recsize, handle_t *handle); +extern int iam_lfix_create(struct inode *obj, int keysize, int ptrsize, + int recsize, handle_t *handle); + + enum { OSD_NAME_LEN = 255 }; @@ -1314,22 +1434,25 @@ enum { static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj, struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { int result; struct osd_thandle *oth; + struct osd_device *osd = osd_obj2dev(obj); + __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX)); LASSERT(S_ISDIR(attr->la_mode)); oth = container_of(th, struct osd_thandle, ot_super); LASSERT(oth->ot_handle->h_transaction != NULL); - result = osd_mkfile(info, obj, (attr->la_mode & - (S_IFMT | S_IRWXUGO | S_ISVTX)), hint, th); - if (result == 0) { + result = osd_mkfile(info, obj, mode, hint, th); + if (result == 0 && osd->od_iop_mode == 0) { LASSERT(obj->oo_inode != NULL); /* * XXX uh-oh... call low-level iam function directly. */ + result = iam_lvar_create(obj->oo_inode, OSD_NAME_LEN, 4, sizeof (struct lu_fid_pack), oth->ot_handle); @@ -1337,9 +1460,47 @@ static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj, return result; } +static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) +{ + int result; + struct osd_thandle *oth; + const struct dt_index_features *feat = dof->u.dof_idx.di_feat; + + __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX)); + + LASSERT(S_ISREG(attr->la_mode)); + + oth = container_of(th, struct osd_thandle, ot_super); + LASSERT(oth->ot_handle->h_transaction != NULL); + + result = osd_mkfile(info, obj, mode, hint, th); + if (result == 0) { + LASSERT(obj->oo_inode != NULL); + if (feat->dif_flags & DT_IND_VARKEY) + result = iam_lvar_create(obj->oo_inode, + feat->dif_keysize_max, + feat->dif_ptrsize, + feat->dif_recsize_max, + oth->ot_handle); + else + result = iam_lfix_create(obj->oo_inode, + feat->dif_keysize_max, + feat->dif_ptrsize, + feat->dif_recsize_max, + oth->ot_handle); + + } + return result; +} + static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj, struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { LASSERT(S_ISREG(attr->la_mode)); @@ -1350,6 +1511,7 @@ static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj, static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj, struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { LASSERT(S_ISLNK(attr->la_mode)); @@ -1360,22 +1522,17 @@ static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj, static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj, struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { - int result; - struct osd_device *osd = osd_obj2dev(obj); - struct inode *dir; umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX); + int result; LINVRNT(osd_invariant(obj)); LASSERT(obj->oo_inode == NULL); - LASSERT(osd->od_obj_area != NULL); LASSERT(S_ISCHR(mode) || S_ISBLK(mode) || S_ISFIFO(mode) || S_ISSOCK(mode)); - dir = osd->od_obj_area->d_inode; - LASSERT(dir->i_op != NULL); - result = osd_mkfile(info, obj, mode, hint, th); if (result == 0) { LASSERT(obj->oo_inode != NULL); @@ -1388,28 +1545,30 @@ static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj, typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *, struct lu_attr *, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *); -static osd_obj_type_f osd_create_type_f(__u32 mode) +static osd_obj_type_f osd_create_type_f(enum dt_format_type type) { osd_obj_type_f result; - switch (mode) { - case S_IFDIR: + switch (type) { + case DFT_DIR: result = osd_mkdir; break; - case S_IFREG: + case DFT_REGULAR: result = osd_mkreg; break; - case S_IFLNK: + case DFT_SYM: result = osd_mksym; break; - case S_IFCHR: - case S_IFBLK: - case S_IFIFO: - case S_IFSOCK: + case DFT_NODE: result = osd_mknod; break; + case DFT_INDEX: + result = osd_mk_index; + break; + default: LBUG(); break; @@ -1428,19 +1587,62 @@ static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah, ah->dah_mode = child_mode; } +/** + * Helper function for osd_object_create() + * + * \retval 0, on success + */ +static int __osd_object_create(struct osd_thread_info *info, + struct osd_object *obj, struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) +{ + + int result; -/* - * Concurrency: @dt is write locked. + result = osd_create_pre(info, obj, attr, th); + if (result == 0) { + result = osd_create_type_f(dof->dof_type)(info, obj, + attr, hint, dof, th); + if (result == 0) + result = osd_create_post(info, obj, attr, th); + } + return result; +} + +/** + * Helper function for osd_object_create() + * + * \retval 0, on success */ +static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj, + const struct lu_fid *fid, struct thandle *th) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct osd_inode_id *id = &info->oti_id; + struct osd_device *osd = osd_obj2dev(obj); + struct md_ucred *uc = md_ucred(env); + + LASSERT(obj->oo_inode != NULL); + LASSERT(uc != NULL); + + id->oii_ino = obj->oo_inode->i_ino; + id->oii_gen = obj->oo_inode->i_generation; + + return osd_oi_insert(info, &osd->od_oi, fid, id, th, + uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK); +} + static int osd_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { - const struct lu_fid *fid = lu_object_fid(&dt->do_lu); - struct osd_object *obj = osd_dt_obj(dt); - struct osd_device *osd = osd_obj2dev(obj); - struct osd_thread_info *info = osd_oti_get(env); + const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + struct osd_object *obj = osd_dt_obj(dt); + struct osd_thread_info *info = osd_oti_get(env); int result; ENTRY; @@ -1450,31 +1652,170 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, LASSERT(osd_write_locked(env, obj)); LASSERT(th != NULL); - /* - * XXX missing: Quote handling. - */ + result = __osd_object_create(info, obj, attr, hint, dof, th); + if (result == 0) + result = __osd_oi_insert(env, obj, fid, th); - result = osd_create_pre(info, obj, attr, th); - if (result == 0) { - result = osd_create_type_f(attr->la_mode & S_IFMT)(info, obj, - attr, hint, th); - if (result == 0) - result = osd_create_post(info, obj, attr, th); + LASSERT(ergo(result == 0, dt_object_exists(dt))); + LASSERT(osd_invariant(obj)); + RETURN(result); +} + +/** + * Helper function for osd_xattr_set() + */ +static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, const char *name, int fl) +{ + struct osd_object *obj = osd_dt_obj(dt); + struct inode *inode = obj->oo_inode; + struct osd_thread_info *info = osd_oti_get(env); + struct dentry *dentry = &info->oti_child_dentry; + struct timespec *t = &info->oti_time; + int fs_flags = 0; + int rc; + + LASSERT(dt_object_exists(dt)); + LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL); + LASSERT(osd_write_locked(env, obj)); + + if (fl & LU_XATTR_REPLACE) + fs_flags |= XATTR_REPLACE; + + if (fl & LU_XATTR_CREATE) + fs_flags |= XATTR_CREATE; + + dentry->d_inode = inode; + *t = inode->i_ctime; + rc = inode->i_op->setxattr(dentry, name, buf->lb_buf, + buf->lb_len, fs_flags); + if (likely(rc == 0)) { + spin_lock(&obj->oo_guard); + inode->i_ctime = *t; + spin_unlock(&obj->oo_guard); + mark_inode_dirty(inode); } - if (result == 0) { - struct osd_inode_id *id = &info->oti_id; - struct md_ucred *uc = md_ucred(env); + return rc; +} - LASSERT(obj->oo_inode != NULL); - LASSERT(uc != NULL); +/** + * Put the fid into lustre_mdt_attrs, and then place the structure + * inode's ea. This fid should not be altered during the life time + * of the inode. + * + * \retval +ve, on success + * \retval -ve, on error + * + * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here + */ +static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_fid *fid) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs; + + fid_cpu_to_be(&mdt_attrs->lma_self_fid, fid); + + return __osd_xattr_set(env, dt, + osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs), + MDT_XATTR_NAME, LU_XATTR_CREATE); + +} + +/** + * Helper function to form igif + */ +static inline void osd_igif_get(const struct lu_env *env, struct dentry *dentry, + struct lu_fid *fid) +{ + struct inode *inode = dentry->d_inode; + lu_igif_build(fid, inode->i_ino, inode->i_generation); +} + +/** + * Helper function to pack the fid + */ +static inline void osd_fid_pack(const struct lu_env *env, const struct lu_fid *fid, + struct lu_fid_pack *pack) +{ + fid_pack(pack, fid, &osd_oti_get(env)->oti_fid); +} + +/** + * Try to read the fid from inode ea into dt_rec, if return value + * i.e. rc is +ve, then we got fid, otherwise we will have to form igif + * + * \param rec, the data-structure into which fid/igif is read + * + * \retval 0, on success + */ +static int osd_ea_fid_get(const struct lu_env *env, struct dentry *dentry, + struct dt_rec *rec) +{ + struct inode *inode = dentry->d_inode; + struct osd_thread_info *info = osd_oti_get(env); + struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs; + struct lu_fid *fid = &info->oti_fid; + int rc; + + LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL); - id->oii_ino = obj->oo_inode->i_ino; - id->oii_gen = obj->oo_inode->i_generation; + rc = inode->i_op->getxattr(dentry, MDT_XATTR_NAME, (void *)mdt_attrs, + sizeof *mdt_attrs); - result = osd_oi_insert(info, &osd->od_oi, fid, id, th, - uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK); + if (rc > 0) { + fid_be_to_cpu(fid, &mdt_attrs->lma_self_fid); + rc = 0; + } else if (rc == -ENODATA) { + osd_igif_get(env, dentry, fid); + rc = 0; } + if (rc == 0) + osd_fid_pack(env, fid, (struct lu_fid_pack*)rec); + + return rc; +} + +/** + * OSD layer object create function for interoperability mode (b11826). + * This is mostly similar to osd_object_create(). Only difference being, fid is + * inserted into inode ea here. + * + * \retval 0, on success + * \retval -ve, on error + */ +static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) +{ + const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + struct osd_object *obj = osd_dt_obj(dt); + struct osd_thread_info *info = osd_oti_get(env); + int result; + int is_root = 0; + + ENTRY; + + LASSERT(osd_invariant(obj)); + LASSERT(!dt_object_exists(dt)); + LASSERT(osd_write_locked(env, obj)); + LASSERT(th != NULL); + + result = __osd_object_create(info, obj, attr, hint, dof, th); + + if (hint && hint->dah_parent) + is_root = osd_object_is_root(osd_dt_obj(hint->dah_parent)); + + /* objects under osd root shld have igif fid, so dont add fid EA */ + if (result == 0 && is_root == 0) + result = osd_ea_fid_set(env, dt, fid); + + if (result == 0) + result = __osd_oi_insert(env, obj, fid, th); + LASSERT(ergo(result == 0, dt_object_exists(dt))); LINVRNT(osd_invariant(obj)); RETURN(result); @@ -1538,7 +1879,7 @@ static int osd_xattr_get(const struct lu_env *env, struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry = &info->oti_dentry; + struct dentry *dentry = &info->oti_obj_dentry; LASSERT(dt_object_exists(dt)); LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL); @@ -1551,6 +1892,7 @@ static int osd_xattr_get(const struct lu_env *env, return inode->i_op->getxattr(dentry, name, buf->lb_buf, buf->lb_len); } + /* * Concurrency: @dt is write locked. */ @@ -1558,39 +1900,12 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *handle, struct lustre_capa *capa) { - struct osd_object *obj = osd_dt_obj(dt); - struct inode *inode = obj->oo_inode; - struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry = &info->oti_dentry; - struct timespec *t = &info->oti_time; - int fs_flags = 0, rc; - - LASSERT(dt_object_exists(dt)); - LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL); - LASSERT(osd_write_locked(env, obj)); LASSERT(handle != NULL); if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE)) return -EACCES; - if (fl & LU_XATTR_REPLACE) - fs_flags |= XATTR_REPLACE; - - if (fl & LU_XATTR_CREATE) - fs_flags |= XATTR_CREATE; - - dentry->d_inode = inode; - *t = inode->i_ctime; - rc = inode->i_op->setxattr(dentry, name, - buf->lb_buf, buf->lb_len, fs_flags); - if (likely(rc == 0)) { - /* ctime should not be updated with server-side time. */ - spin_lock(&obj->oo_guard); - inode->i_ctime = *t; - spin_unlock(&obj->oo_guard); - mark_inode_dirty(inode); - } - return rc; + return __osd_xattr_set(env, dt, buf, name, fl); } /* @@ -1604,7 +1919,7 @@ static int osd_xattr_list(const struct lu_env *env, struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry = &info->oti_dentry; + struct dentry *dentry = &info->oti_obj_dentry; LASSERT(dt_object_exists(dt)); LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL); @@ -1629,7 +1944,7 @@ static int osd_xattr_del(const struct lu_env *env, struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry = &info->oti_dentry; + struct dentry *dentry = &info->oti_obj_dentry; struct timespec *t = &info->oti_time; int rc; @@ -1747,7 +2062,7 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt) struct osd_object *obj = osd_dt_obj(dt); struct inode *inode = obj->oo_inode; struct osd_thread_info *info = osd_oti_get(env); - struct dentry *dentry = &info->oti_dentry; + struct dentry *dentry = &info->oti_obj_dentry; struct file *file = &info->oti_file; ENTRY; @@ -1781,6 +2096,30 @@ static const struct dt_object_operations osd_obj_ops = { .do_object_sync = osd_object_sync, }; +/** + * dt_object_operations for interoperability mode + * (i.e. to run 2.0 mds on 1.8 disk) (b11826) + */ +static const struct dt_object_operations osd_obj_ea_ops = { + .do_read_lock = osd_object_read_lock, + .do_write_lock = osd_object_write_lock, + .do_read_unlock = osd_object_read_unlock, + .do_write_unlock = osd_object_write_unlock, + .do_attr_get = osd_attr_get, + .do_attr_set = osd_attr_set, + .do_ah_init = osd_ah_init, + .do_create = osd_object_ea_create, + .do_index_try = osd_index_try, + .do_ref_add = osd_object_ref_add, + .do_ref_del = osd_object_ref_del, + .do_xattr_get = osd_xattr_get, + .do_xattr_set = osd_xattr_set, + .do_xattr_del = osd_xattr_del, + .do_xattr_list = osd_xattr_list, + .do_capa_get = osd_capa_get, + .do_object_sync = osd_object_sync, +}; + /* * Body operations. */ @@ -1861,10 +2200,11 @@ static int osd_object_is_root(const struct osd_object *obj) return osd_sb(osd_obj2dev(obj))->s_root->d_inode == obj->oo_inode; } -static int osd_index_probe(const struct lu_env *env, struct osd_object *o, +static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o, const struct dt_index_features *feat) { struct iam_descr *descr; + struct dt_object *dt = &o->oo_dt; if (osd_object_is_root(o)) return feat == &dt_directory_features; @@ -1872,14 +2212,23 @@ static int osd_index_probe(const struct lu_env *env, struct osd_object *o, LASSERT(o->oo_dir != NULL); descr = o->oo_dir->od_container.ic_descr; - if (feat == &dt_directory_features) - return descr == &iam_htree_compat_param || - (descr->id_rec_size == sizeof(struct lu_fid_pack) && - 1 /* - * XXX check that index looks like directory. - */ - ); - else + if (feat == &dt_directory_features) { + if (descr->id_rec_size == sizeof(struct lu_fid_pack)) + return 1; + + if (descr == &iam_htree_compat_param) { + /* if it is a HTREE dir then there is good chance that, + * we dealing with ext3 directory here with no FIDs. */ + + if (descr->id_rec_size == + sizeof ((struct ldiskfs_dir_entry_2 *)NULL)->inode) { + + dt->do_index_ops = &osd_index_ea_ops; + return 1; + } + } + return 0; + } else { return feat->dif_keysize_min <= descr->id_key_size && descr->id_key_size <= feat->dif_keysize_max && @@ -1890,11 +2239,12 @@ static int osd_index_probe(const struct lu_env *env, struct osd_object *o, ergo(feat->dif_flags & DT_IND_UPDATE, 1 /* XXX check that object (and file system) is * writable */); + } } -static int osd_container_init(const struct lu_env *env, - struct osd_object *obj, - struct osd_directory *dir) +static int osd_iam_container_init(const struct lu_env *env, + struct osd_object *obj, + struct osd_directory *dir) { int result; struct iam_container *bag; @@ -1904,7 +2254,7 @@ static int osd_container_init(const struct lu_env *env, if (result == 0) { result = iam_container_setup(bag); if (result == 0) - obj->oo_dt.do_index_ops = &osd_index_ops; + obj->oo_dt.do_index_ops = &osd_index_iam_ops; else iam_container_fini(bag); } @@ -1918,16 +2268,25 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, const struct dt_index_features *feat) { int result; + int ea_dir = 0; struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); LINVRNT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); if (osd_object_is_root(obj)) { - dt->do_index_ops = &osd_index_compat_ops; + dt->do_index_ops = &osd_index_ea_ops; result = 0; - } else if (!osd_has_index(obj)) { - struct osd_directory *dir; + } else if (feat == &dt_directory_features && osd->od_iop_mode) { + dt->do_index_ops = &osd_index_ea_ops; + if (S_ISDIR(obj->oo_inode->i_mode)) + result = 0; + else + result = -ENOTDIR; + ea_dir = 1; + } else if (!osd_has_index(obj)) { + struct osd_directory *dir; OBD_ALLOC_PTR(dir); if (dir != NULL) { @@ -1951,7 +2310,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, * recheck under lock. */ if (!osd_has_index(obj)) - result = osd_container_init(env, obj, dir); + result = osd_iam_container_init(env, obj, dir); else result = 0; up(&obj->oo_dir->od_sem); @@ -1960,8 +2319,8 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, } else result = 0; - if (result == 0) { - if (!osd_index_probe(env, obj, feat)) + if (result == 0 && ea_dir == 0) { + if (!osd_iam_index_probe(env, obj, feat)) result = -ENOTDIR; } LINVRNT(osd_invariant(obj)); @@ -1969,9 +2328,21 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt, return result; } -static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, - const struct dt_key *key, struct thandle *handle, - struct lustre_capa *capa) +/** + * delete a (key, value) pair from index \a dt specified by \a key + * + * \param dt_object osd index object + * \param key key for index + * \param rec record reference + * \param handle transaction handler + * + * \retval 0 success + * \retval -ve failure + */ + +static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt, + const struct dt_key *key, struct thandle *handle, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); struct osd_thandle *oh; @@ -1989,7 +2360,7 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) RETURN(-EACCES); - ipd = osd_ipd_get(env, bag); + ipd = osd_idx_ipd_get(env, bag); if (unlikely(ipd == NULL)) RETURN(-ENOMEM); @@ -2003,40 +2374,123 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) +/** + * Index delete function for interoperability mode (b11826). + * It will remove the directory entry added by osd_index_ea_insert(). + * This entry is needed to maintain name->fid mapping. + * + * \param key, key i.e. file entry to be deleted + * + * \retval 0, on success + * \retval -ve, on error + */ +static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt, + const struct dt_key *key, struct thandle *handle, + struct lustre_capa *capa) +{ + struct osd_object *obj = osd_dt_obj(dt); + struct inode *dir = obj->oo_inode; + struct dentry *dentry; + struct osd_thandle *oh; + struct ldiskfs_dir_entry_2 *de; + struct buffer_head *bh; + + int rc; + + ENTRY; + + LINVRNT(osd_invariant(obj)); + LASSERT(dt_object_exists(dt)); + LASSERT(handle != NULL); + + oh = container_of(handle, struct osd_thandle, ot_super); + LASSERT(oh->ot_handle != NULL); + LASSERT(oh->ot_handle->h_transaction != NULL); + + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) + RETURN(-EACCES); + + dentry = osd_child_dentry_get(env, obj, + (char *)key, strlen((char *)key)); + bh = ldiskfs_find_entry(dentry, &de); + if (bh) { + rc = ldiskfs_delete_entry(oh->ot_handle, + dir, de, bh); + if (!rc) + mark_inode_dirty(dir); + brelse(bh); + } else + rc = -ENOENT; + + LASSERT(osd_invariant(obj)); + RETURN(rc); +} + +/** + * Lookup index for \a key and copy record to \a rec. + * + * \param dt_object osd index object + * \param key key for index + * \param rec record reference + * + * \retval +ve success : exact mach + * \retval 0 return record with key not greater than \a key + * \retval -ve failure + */ +static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key, + struct lustre_capa *capa) { struct osd_object *obj = osd_dt_obj(dt); struct iam_path_descr *ipd; struct iam_container *bag = &obj->oo_dir->od_container; + struct osd_thread_info *oti = osd_oti_get(env); + struct iam_iterator *it = &oti->oti_idx_it; int rc; - ENTRY; - LINVRNT(osd_invariant(obj)); + LASSERT(osd_invariant(obj)); LASSERT(dt_object_exists(dt)); LASSERT(bag->ic_object == obj->oo_inode); if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP)) - return -EACCES; + RETURN(-EACCES); - ipd = osd_ipd_get(env, bag); - if (unlikely(ipd == NULL)) + ipd = osd_idx_ipd_get(env, bag); + if (IS_ERR(ipd)) RETURN(-ENOMEM); - rc = iam_lookup(bag, (const struct iam_key *)key, - (struct iam_rec *)rec, ipd); + /* got ipd now we can start iterator. */ + iam_it_init(it, bag, 0, ipd); + + rc = iam_it_get(it, (struct iam_key *)key); + if (rc >= 0) + iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)rec); + + iam_it_put(it); + iam_it_fini(it); osd_ipd_put(env, bag, ipd); + LINVRNT(osd_invariant(obj)); RETURN(rc); } -static int osd_index_insert(const struct lu_env *env, struct dt_object *dt, - const struct dt_rec *rec, const struct dt_key *key, - struct thandle *th, struct lustre_capa *capa, - int ignore_quota) +/** + * Inserts (key, value) pair in \a dt index object. + * + * \param dt osd index object + * \param key key for index + * \param rec record reference + * \param th transaction handler + * + * \retval 0 success + * \retval -ve failure + */ +static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt, + const struct dt_rec *rec, const struct dt_key *key, + struct thandle *th, struct lustre_capa *capa, + int ignore_quota) { struct osd_object *obj = osd_dt_obj(dt); struct iam_path_descr *ipd; @@ -2057,7 +2511,7 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt, if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) return -EACCES; - ipd = osd_ipd_get(env, bag); + ipd = osd_idx_ipd_get(env, bag); if (unlikely(ipd == NULL)) RETURN(-ENOMEM); @@ -2080,375 +2534,775 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -/* - * Iterator operations. +/** + * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries + * into the directory.Also sets flags into osd object to + * indicate dot and dotdot are created. This is required for + * interoperability mode (b11826) + * + * \param dir directory for dot and dotdot fixup. + * \param obj child object for linking + * + * \retval 0, on success + * \retval -ve, on error */ -struct osd_it { - struct osd_object *oi_obj; - struct iam_path_descr *oi_ipd; - struct iam_iterator oi_it; -}; +static int osd_add_dot_dotdot(struct osd_thread_info *info, + struct osd_object *dir, + struct osd_object *obj, const char *name, + struct thandle *th) +{ + struct inode *parent_dir = obj->oo_inode; + struct inode *inode = dir->oo_inode; + struct osd_thandle *oth; + int result = 0; + + oth = container_of(th, struct osd_thandle, ot_super); + LASSERT(oth->ot_handle->h_transaction != NULL); + LASSERT(S_ISDIR(dir->oo_inode->i_mode)); + + if (strcmp(name, dot) == 0) { + if (dir->oo_compat_dot_created) { + result = -EEXIST; + } else { + LASSERT(obj == dir); + dir->oo_compat_dot_created = 1; + result = 0; + } + } else if(strcmp(name, dotdot) == 0) { + if (!dir->oo_compat_dot_created) + return -EINVAL; + if (dir->oo_compat_dotdot_created) + return __osd_ea_add_rec(info, dir, obj, name, th); + + result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode); + if (result == 0) + dir->oo_compat_dotdot_created = 1; + } + + return result; +} + +/** + * Calls ldiskfs_add_entry() to add directory entry + * into the directory. This is required for + * interoperability mode (b11826) + * + * \retval 0, on success + * \retval -ve, on error + */ +static int __osd_ea_add_rec(struct osd_thread_info *info, + struct osd_object *pobj, + struct osd_object *cobj, + const char *name, + struct thandle *th) +{ + struct dentry *child; + struct osd_thandle *oth; + struct inode *cinode = cobj->oo_inode; + int rc; + + oth = container_of(th, struct osd_thandle, ot_super); + LASSERT(oth->ot_handle != NULL); + LASSERT(oth->ot_handle->h_transaction != NULL); + + child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name)); + rc = ldiskfs_add_entry(oth->ot_handle, child, cinode); + + RETURN(rc); +} + +/** + * It will call the appropriate osd_add* function and return the + * value, return by respective functions. + */ +static int osd_ea_add_rec(const struct lu_env *env, + struct osd_object *pobj, + struct osd_object *cobj, + const char *name, + struct thandle *th) +{ + struct osd_thread_info *info = osd_oti_get(env); + int rc; -static struct dt_it *osd_it_init(const struct lu_env *env, - struct dt_object *dt, int writable, + if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' && + name[2] =='\0'))) + rc = osd_add_dot_dotdot(info, pobj, cobj, name, th); + else + rc = __osd_ea_add_rec(info, pobj, cobj, name, th); + + return rc; +} + +/** + * Calls ->lookup() to find dentry. From dentry get inode and + * read inode's ea to get fid. This is required for interoperability + * mode (b11826) + * + * \retval 0, on success + * \retval -ve, on error + */ +static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, + struct dt_rec *rec, const struct dt_key *key) +{ + struct inode *dir = obj->oo_inode; + struct osd_thread_info *info = osd_oti_get(env); + struct dentry *dentry; + struct osd_device *dev = osd_dev(obj->oo_dt.do_lu.lo_dev); + struct osd_inode_id *id = &info->oti_id; + struct ldiskfs_dir_entry_2 *de; + struct buffer_head *bh; + struct inode *inode; + int ino; + int rc; + + LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL); + + dentry = osd_child_dentry_get(env, obj, + (char *)key, strlen((char *)key)); + bh = ldiskfs_find_entry(dentry, &de); + if (bh) { + ino = le32_to_cpu(de->inode); + brelse(bh); + id->oii_ino = ino; + id->oii_gen = OSD_OII_NOGEN; + + inode = osd_iget(info, dev, id); + if (!IS_ERR(inode)) { + dentry->d_inode = inode; + + rc = osd_ea_fid_get(env, dentry, rec); + iput(inode); + } else + rc = -ENOENT; + } else + rc = -ENOENT; + + RETURN (rc); +} + +/** + * Find the osd object for given fid. + * + * \param fid, need to find the osd object having this fid + * + * \retval osd_object, on success + * \retval -ve, on error + */ +struct osd_object *osd_object_find(const struct lu_env *env, + struct dt_object *dt, + const struct lu_fid *fid) +{ + struct lu_device *ludev = dt->do_lu.lo_dev; + struct osd_object *child = NULL; + struct lu_object *luch; + struct lu_object *lo; + + luch = lu_object_find(env, ludev, fid, NULL); + if (!IS_ERR(luch)) { + if (lu_object_exists(luch)) { + lo = lu_object_locate(luch->lo_header, ludev->ld_type); + if (lo != NULL) + child = osd_obj(lo); + else + LU_OBJECT_DEBUG(D_ERROR, env, luch, + "lu_object can't be located" + ""DFID"\n", PFID(fid)); + + if (child == NULL) { + lu_object_put(env, luch); + CERROR("Unable to get osd_object\n"); + child = ERR_PTR(-ENOENT); + } + } else { + LU_OBJECT_DEBUG(D_ERROR, env, luch, + "lu_object does not exists "DFID"\n", + PFID(fid)); + child = ERR_PTR(-ENOENT); + } + } else + child = (void *)luch; + + return child; +} + +/** + * Put the osd object once done with it. + * + * \param obj, osd object that needs to be put + */ +static inline void osd_object_put(const struct lu_env *env, + struct osd_object *obj) +{ + lu_object_put(env, &obj->oo_dt.do_lu); +} + +/** + * Index add function for interoperability mode (b11826). + * It will add the directory entry.This entry is needed to + * maintain name->fid mapping. + * + * \param key, it is key i.e. file entry to be inserted + * \param rec, it is value of given key i.e. fid + * + * \retval 0, on success + * \retval -ve, on error + */ +static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, struct thandle *th, + struct lustre_capa *capa, int ignore_quota) +{ + struct osd_object *obj = osd_dt_obj(dt); + struct lu_fid *fid = &osd_oti_get(env)->oti_fid; + const struct lu_fid_pack *pack = (const struct lu_fid_pack *)rec; + const char *name = (const char *)key; + struct osd_object *child; + + int rc; + + ENTRY; + + LASSERT(osd_invariant(obj)); + LASSERT(dt_object_exists(dt)); + LASSERT(th != NULL); + + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) + RETURN(-EACCES); + + rc = fid_unpack(pack, fid); + if (rc != 0) + RETURN(rc); + child = osd_object_find(env, dt, fid); + if (!IS_ERR(child)) { + rc = osd_ea_add_rec(env, obj, child, name, th); + osd_object_put(env, child); + } else { + rc = PTR_ERR(child); + } + + LASSERT(osd_invariant(obj)); + RETURN(rc); +} + +/** + * Initialize osd Iterator for given osd index object. + * + * \param dt osd index object + */ + +static struct dt_it *osd_it_iam_init(const struct lu_env *env, + struct dt_object *dt, struct lustre_capa *capa) { - struct osd_it *it; + struct osd_it_iam *it; + struct osd_thread_info *oti = osd_oti_get(env); struct osd_object *obj = osd_dt_obj(dt); struct lu_object *lo = &dt->do_lu; struct iam_path_descr *ipd; struct iam_container *bag = &obj->oo_dir->od_container; - __u32 flags; LASSERT(lu_object_exists(lo)); - if (osd_object_auth(env, dt, capa, writable ? CAPA_OPC_BODY_WRITE : - CAPA_OPC_BODY_READ)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ)) return ERR_PTR(-EACCES); - flags = writable ? IAM_IT_MOVE|IAM_IT_WRITE : IAM_IT_MOVE; - OBD_ALLOC_PTR(it); - if (it != NULL) { - /* - * XXX: as ipd is allocated within osd_thread_info, assignment - * below implies that iterator usage is confined within single - * environment. - */ - ipd = osd_ipd_get(env, bag); - if (likely(ipd != NULL)) { - it->oi_obj = obj; - it->oi_ipd = ipd; - lu_object_get(lo); - iam_it_init(&it->oi_it, bag, flags, ipd); - return (struct dt_it *)it; - } else - OBD_FREE_PTR(it); + it = &oti->oti_it; + ipd = osd_it_ipd_get(env, bag); + if (likely(ipd != NULL)) { + it->oi_obj = obj; + it->oi_ipd = ipd; + lu_object_get(lo); + iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd); + return (struct dt_it *)it; } return ERR_PTR(-ENOMEM); } -static void osd_it_fini(const struct lu_env *env, struct dt_it *di) +/** + * free given Iterator. + */ + +static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; struct osd_object *obj = it->oi_obj; iam_it_fini(&it->oi_it); osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd); lu_object_put(env, &obj->oo_dt.do_lu); - OBD_FREE_PTR(it); } -static int osd_it_get(const struct lu_env *env, +/** + * Move Iterator to record specified by \a key + * + * \param di osd iterator + * \param key key for index + * + * \retval +ve di points to record with least key not larger than key + * \retval 0 di points to exact matched key + * \retval -ve failure + */ + +static int osd_it_iam_get(const struct lu_env *env, struct dt_it *di, const struct dt_key *key) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return iam_it_get(&it->oi_it, (const struct iam_key *)key); } -static void osd_it_put(const struct lu_env *env, struct dt_it *di) +/** + * Release Iterator + * + * \param di osd iterator + */ + +static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; iam_it_put(&it->oi_it); } -static int osd_it_next(const struct lu_env *env, struct dt_it *di) +/** + * Move iterator by one record + * + * \param di osd iterator + * + * \retval +1 end of container reached + * \retval 0 success + * \retval -ve failure + */ + +static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return iam_it_next(&it->oi_it); } -static int osd_it_del(const struct lu_env *env, struct dt_it *di, - struct thandle *th) -{ - struct osd_it *it = (struct osd_it *)di; - struct osd_thandle *oh; - - LASSERT(th != NULL); - - oh = container_of0(th, struct osd_thandle, ot_super); - LASSERT(oh->ot_handle != NULL); - LASSERT(oh->ot_handle->h_transaction != NULL); - - return iam_it_rec_delete(oh->ot_handle, &it->oi_it); -} +/** + * Return pointer to the key under iterator. + */ -static struct dt_key *osd_it_key(const struct lu_env *env, +static struct dt_key *osd_it_iam_key(const struct lu_env *env, const struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return (struct dt_key *)iam_it_key_get(&it->oi_it); } -static int osd_it_key_size(const struct lu_env *env, const struct dt_it *di) +/** + * Return size of key under iterator (in bytes) + */ + +static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return iam_it_key_size(&it->oi_it); } -static struct dt_rec *osd_it_rec(const struct lu_env *env, +/** + * Return pointer to the record under iterator. + */ +static struct dt_rec *osd_it_iam_rec(const struct lu_env *env, const struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return (struct dt_rec *)iam_it_rec_get(&it->oi_it); } -static __u64 osd_it_store(const struct lu_env *env, const struct dt_it *di) +/** + * Returns cookie for current Iterator position. + */ +static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return iam_it_store(&it->oi_it); } -static int osd_it_load(const struct lu_env *env, +/** + * Restore iterator from cookie. + * + * \param di osd iterator + * \param hash Iterator location cookie + * + * \retval +ve di points to record with least key not larger than key. + * \retval 0 di points to exact matched key + * \retval -ve failure + */ + +static int osd_it_iam_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) { - struct osd_it *it = (struct osd_it *)di; + struct osd_it_iam *it = (struct osd_it_iam *)di; return iam_it_load(&it->oi_it, hash); } -static const struct dt_index_operations osd_index_ops = { - .dio_lookup = osd_index_lookup, - .dio_insert = osd_index_insert, - .dio_delete = osd_index_delete, +static const struct dt_index_operations osd_index_iam_ops = { + .dio_lookup = osd_index_iam_lookup, + .dio_insert = osd_index_iam_insert, + .dio_delete = osd_index_iam_delete, .dio_it = { - .init = osd_it_init, - .fini = osd_it_fini, - .get = osd_it_get, - .put = osd_it_put, - .del = osd_it_del, - .next = osd_it_next, - .key = osd_it_key, - .key_size = osd_it_key_size, - .rec = osd_it_rec, - .store = osd_it_store, - .load = osd_it_load + .init = osd_it_iam_init, + .fini = osd_it_iam_fini, + .get = osd_it_iam_get, + .put = osd_it_iam_put, + .next = osd_it_iam_next, + .key = osd_it_iam_key, + .key_size = osd_it_iam_key_size, + .rec = osd_it_iam_rec, + .store = osd_it_iam_store, + .load = osd_it_iam_load } }; -static int osd_index_compat_delete(const struct lu_env *env, - struct dt_object *dt, - const struct dt_key *key, - struct thandle *handle, - struct lustre_capa *capa) +/** + * Creates or initializes iterator context. + * + * \retval struct osd_it_ea, iterator structure on success + * + */ +static struct dt_it *osd_it_ea_init(const struct lu_env *env, + struct dt_object *dt, + struct lustre_capa *capa) +{ + struct osd_object *obj = osd_dt_obj(dt); + struct osd_thread_info *info = osd_oti_get(env); + struct osd_it_ea *it = &info->oti_it_ea; + struct lu_object *lo = &dt->do_lu; + struct dentry *obj_dentry = &info->oti_obj_dentry; + ENTRY; + LASSERT(lu_object_exists(lo)); + + obj_dentry->d_inode = obj->oo_inode; + obj_dentry->d_sb = osd_sb(osd_obj2dev(obj)); + obj_dentry->d_name.hash = 0; + + it->oie_namelen = 0; + it->oie_curr_pos = 0; + it->oie_next_pos = 0; + it->oie_obj = obj; + it->oie_file.f_dentry = obj_dentry; + it->oie_file.f_mapping = obj->oo_inode->i_mapping; + it->oie_file.f_op = obj->oo_inode->i_fop; + it->oie_file.private_data = NULL; + lu_object_get(lo); + + RETURN((struct dt_it*) it); +} + +/** + * Destroy or finishes iterator context. + * + * \param di, struct osd_it_ea, iterator structure to be destroyed + */ +static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di) { - struct osd_object *obj = osd_dt_obj(dt); + struct osd_it_ea *it = (struct osd_it_ea *)di; + struct osd_object *obj = it->oie_obj; + - LASSERT(handle != NULL); - LASSERT(S_ISDIR(obj->oo_inode->i_mode)); ENTRY; + lu_object_put(env, &obj->oo_dt.do_lu); + EXIT; +} -#if 0 - if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE)) - RETURN(-EACCES); -#endif +/** + * It position the iterator at given key, so that next lookup continues from + * that key Or it is similar to dio_it->load() but based on a key, + * rather than file position. + * + * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator + * to the beginning. + * + * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty(). + */ +static int osd_it_ea_get(const struct lu_env *env, + struct dt_it *di, const struct dt_key *key) +{ + struct osd_it_ea *it = (struct osd_it_ea *)di; - RETURN(-EOPNOTSUPP); + ENTRY; + LASSERT(((const char *)key)[0] == '\0'); + it->oie_namelen = 0; + it->oie_curr_pos = 0; + it->oie_next_pos = 0; + + RETURN(+1); } -/* - * Compatibility index operations. +/** + * Does nothing */ +static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di) +{ +} - -static void osd_build_pack(const struct lu_env *env, struct osd_device *osd, - struct dentry *dentry, struct lu_fid_pack *pack) +/** + * It is called internally by ->readdir(). It fills the + * iterator's in-memory data structure with required + * information i.e. name, namelen, rec_size etc. + * + * \param buf, in which information to be filled in. + * \param name, name of the file in given dir + * + * \retval 0, on success + * \retval 1, on buffer full + */ +static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen, + loff_t offset, ino_t ino, + unsigned int d_type) { - struct inode *inode = dentry->d_inode; - struct lu_fid *fid = &osd_oti_get(env)->oti_fid; + struct osd_it_ea *it = (struct osd_it_ea *)buf; + struct dirent64 *dirent = &it->oie_dirent64; + int reclen = LDISKFS_DIR_REC_LEN(namelen); - lu_igif_build(fid, inode->i_ino, inode->i_generation); - fid_cpu_to_be(fid, fid); - pack->fp_len = sizeof *fid + 1; - memcpy(pack->fp_area, fid, sizeof *fid); + + ENTRY; + if (it->oie_namelen) + RETURN(-ENOENT); + + if (namelen == 0 || namelen > LDISKFS_NAME_LEN) + RETURN(-EIO); + + strncpy(dirent->d_name, name, LDISKFS_NAME_LEN); + dirent->d_name[namelen] = 0; + dirent->d_ino = ino; + dirent->d_off = offset; + dirent->d_reclen = reclen; + it->oie_namelen = namelen; + it->oie_curr_pos = offset; + + RETURN(0); } -static int osd_index_compat_lookup(const struct lu_env *env, - struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) +/** + * Calls ->readdir() to load a directory entry at a time + * and stored it in iterator's in-memory data structure. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval 0, on success + * \retval -ve, on error + */ +int osd_ldiskfs_it_fill(const struct dt_it *di) { - struct osd_object *obj = osd_dt_obj(dt); + struct osd_it_ea *it = (struct osd_it_ea *)di; + struct osd_object *obj = it->oie_obj; + struct inode *inode = obj->oo_inode; + int result = 0; - struct osd_device *osd = osd_obj2dev(obj); - struct osd_thread_info *info = osd_oti_get(env); - struct inode *dir; + ENTRY; + it->oie_namelen = 0; + it->oie_file.f_pos = it->oie_curr_pos; - int result; + result = inode->i_fop->readdir(&it->oie_file, it, + (filldir_t) osd_ldiskfs_filldir); - /* - * XXX temporary solution. - */ - struct dentry *dentry; - struct dentry *parent; + it->oie_next_pos = it->oie_file.f_pos; - LINVRNT(osd_invariant(obj)); - LASSERT(S_ISDIR(obj->oo_inode->i_mode)); - LASSERT(osd_has_index(obj)); + if(!result && it->oie_namelen == 0) + result = -EIO; - if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP)) - return -EACCES; + RETURN(result); +} - info->oti_str.name = (const char *)key; - info->oti_str.len = strlen((const char *)key); +/** + * It calls osd_ldiskfs_it_fill() which will use ->readdir() + * to load a directory entry at a time and stored it in + * iterator's in-memory data structure. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval +ve, iterator reached to end + * \retval 0, iterator not reached to end + * \retval -ve, on error + */ +static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di) +{ + struct osd_it_ea *it = (struct osd_it_ea *)di; + int rc; - dir = obj->oo_inode; - LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL); + ENTRY; + it->oie_curr_pos = it->oie_next_pos; - parent = d_alloc_root(dir); - if (parent == NULL) - return -ENOMEM; - igrab(dir); - dentry = d_alloc(parent, &info->oti_str); - if (dentry != NULL) { - struct dentry *d; + if (it->oie_curr_pos == LDISKFS_HTREE_EOF) + rc = +1; + else + rc = osd_ldiskfs_it_fill(di); - /* - * XXX passing NULL for nameidata should work for - * ext3/ldiskfs. - */ - d = dir->i_op->lookup(dir, dentry, NULL); - if (d == NULL) { - /* - * normal case, result is in @dentry. - */ - if (dentry->d_inode != NULL) { - osd_build_pack(env, osd, dentry, - (struct lu_fid_pack *)rec); - result = 0; - } else - result = -ENOENT; - } else { - /* What? Disconnected alias? Ppheeeww... */ - CERROR("Aliasing where not expected\n"); - result = -EIO; - dput(d); - } - dput(dentry); - } else - result = -ENOMEM; - dput(parent); - LINVRNT(osd_invariant(obj)); - return result; + RETURN(rc); +} + +/** + * Returns the key at current position from iterator's in memory structure. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval key i.e. struct dt_key on success + */ +static struct dt_key *osd_it_ea_key(const struct lu_env *env, + const struct dt_it *di) +{ + struct osd_it_ea *it = (struct osd_it_ea *)di; + ENTRY; + RETURN((struct dt_key *)it->oie_dirent64.d_name); } -static int osd_add_rec(struct osd_thread_info *info, struct osd_device *dev, - struct inode *dir, struct inode *inode, const char *name) +/** + * Returns the key's size at current position from iterator's in memory structure. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval key_size i.e. struct dt_key on success + */ +static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di) { - struct dentry *old; - struct dentry *new; - struct dentry *parent; + struct osd_it_ea *it = (struct osd_it_ea *)di; + ENTRY; + RETURN(it->oie_namelen); +} - int result; +/** + * Returns the value (i.e. fid/igif) at current position from iterator's + * in memory structure. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval value i.e. struct dt_rec on success + */ +static struct dt_rec *osd_it_ea_rec(const struct lu_env *env, + const struct dt_it *di) +{ + struct osd_it_ea *it = (struct osd_it_ea *)di; + struct osd_object *obj = it->oie_obj; + struct osd_thread_info *info = osd_oti_get(env); + struct osd_inode_id *id = &info->oti_id; + struct lu_fid_pack *rec = &info->oti_pack; + struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev; + struct dentry *dentry = &info->oti_child_dentry; + struct osd_device *dev; + struct inode *inode; + int rc; - info->oti_str.name = name; - info->oti_str.len = strlen(name); - - LASSERT(atomic_read(&dir->i_count) > 0); - result = -ENOMEM; - old = d_alloc(dev->od_obj_area, &info->oti_str); - if (old != NULL) { - d_instantiate(old, inode); - igrab(inode); - LASSERT(atomic_read(&dir->i_count) > 0); - parent = d_alloc_root(dir); - if (parent != NULL) { - igrab(dir); - LASSERT(atomic_read(&dir->i_count) > 1); - new = d_alloc(parent, &info->oti_str); - LASSERT(atomic_read(&dir->i_count) > 1); - if (new != NULL) { - LASSERT(atomic_read(&dir->i_count) > 1); - result = dir->i_op->link(old, dir, new); - LASSERT(atomic_read(&dir->i_count) > 1); - dput(new); - LASSERT(atomic_read(&dir->i_count) > 1); - } - LASSERT(atomic_read(&dir->i_count) > 1); - dput(parent); - LASSERT(atomic_read(&dir->i_count) > 0); - } - dput(old); + ENTRY; + dev = osd_dev(ldev); + id->oii_ino = it->oie_dirent64.d_ino; + id->oii_gen = OSD_OII_NOGEN; + inode = osd_iget(info, dev, id); + if (!IS_ERR(inode)) { + dentry->d_inode = inode; + LASSERT(dentry->d_inode->i_sb == osd_sb(dev)); + } else { + CERROR("Error getting inode for ino =%d", id->oii_ino); + RETURN((struct dt_rec *) PTR_ERR(inode)); } - LASSERT(atomic_read(&dir->i_count) > 0); - return result; + + rc = osd_ea_fid_get(env, dentry, (struct dt_rec*) rec); + + iput(inode); + RETURN((struct dt_rec *)rec); + } +/** + * Returns a cookie for current position of the iterator head, so that + * user can use this cookie to load/start the iterator next time. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval cookie for current position, on success + */ +static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di) +{ + struct osd_it_ea *it = (struct osd_it_ea *)di; + ENTRY; + RETURN(it->oie_curr_pos); +} -/* - * XXX Temporary stuff. +/** + * It calls osd_ldiskfs_it_fill() which will use ->readdir() + * to load a directory entry at a time and stored it i inn, + * in iterator's in-memory data structure. + * + * \param di, struct osd_it_ea, iterator's in memory structure + * + * \retval +ve, on success + * \retval -ve, on error */ -static int osd_index_compat_insert(const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, struct thandle *th, - struct lustre_capa *capa, - int ignore_quota) +static int osd_it_ea_load(const struct lu_env *env, + const struct dt_it *di, __u64 hash) { - struct osd_object *obj = osd_dt_obj(dt); + struct osd_it_ea *it = (struct osd_it_ea *)di; + int rc; - const char *name = (const char *)key; + ENTRY; + it->oie_curr_pos = it->oie_next_pos = hash; - struct lu_device *ludev = dt->do_lu.lo_dev; - struct lu_object *luch; + rc = osd_ldiskfs_it_fill(di); + if (rc == 0) + rc = +1; - struct osd_thread_info *info = osd_oti_get(env); - const struct lu_fid_pack *pack = (const struct lu_fid_pack *)rec; - struct lu_fid *fid = &osd_oti_get(env)->oti_fid; + RETURN(rc); +} +/** + * Index and Iterator operations for interoperability + * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826) + */ +static const struct dt_index_operations osd_index_ea_ops = { + .dio_lookup = osd_index_ea_lookup, + .dio_insert = osd_index_ea_insert, + .dio_delete = osd_index_ea_delete, + .dio_it = { + .init = osd_it_ea_init, + .fini = osd_it_ea_fini, + .get = osd_it_ea_get, + .put = osd_it_ea_put, + .next = osd_it_ea_next, + .key = osd_it_ea_key, + .key_size = osd_it_ea_key_size, + .rec = osd_it_ea_rec, + .store = osd_it_ea_store, + .load = osd_it_ea_load + } +}; - int result; +/** + * Index lookup function for interoperability mode (b11826). + * + * \param key, key i.e. file name to be searched + * + * \retval +ve, on success + * \retval -ve, on error + */ +static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key, + struct lustre_capa *capa) +{ + struct osd_object *obj = osd_dt_obj(dt); + int rc = 0; + + ENTRY; LASSERT(S_ISDIR(obj->oo_inode->i_mode)); LINVRNT(osd_invariant(obj)); - LASSERT(th != NULL); - if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT)) + if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP)) return -EACCES; - result = fid_unpack(pack, fid); - if (result != 0) - return result; + rc = osd_ea_lookup_rec(env, obj, rec, key); - luch = lu_object_find(env, ludev, fid, NULL); - if (!IS_ERR(luch)) { - if (lu_object_exists(luch)) { - struct osd_object *child; - - child = osd_obj(lu_object_locate(luch->lo_header, - ludev->ld_type)); - if (child != NULL) - result = osd_add_rec(info, osd_obj2dev(obj), - obj->oo_inode, - child->oo_inode, name); - else { - CERROR("No osd slice.\n"); - result = -ENOENT; - } - LINVRNT(osd_invariant(obj)); - LINVRNT(osd_invariant(child)); - } else { - CERROR("Sorry.\n"); - result = -ENOENT; - } - lu_object_put(env, luch); - } else - result = PTR_ERR(luch); - LINVRNT(osd_invariant(obj)); - return result; + if (rc == 0) + rc = +1; + RETURN(rc); } -static const struct dt_index_operations osd_index_compat_ops = { - .dio_lookup = osd_index_compat_lookup, - .dio_insert = osd_index_compat_insert, - .dio_delete = osd_index_compat_delete -}; - /* type constructor/destructor: osd_type_init, osd_type_fini */ LU_TYPE_INIT_FINI(osd, &osd_key); @@ -2506,7 +3360,7 @@ static int osd_shutdown(const struct lu_env *env, struct osd_device *o) struct osd_thread_info *info = osd_oti_get(env); ENTRY; if (o->od_obj_area != NULL) { - dput(o->od_obj_area); + lu_object_put(env, &o->od_obj_area->do_lu); o->od_obj_area = NULL; } osd_oi_fini(info, &o->od_oi); @@ -2519,8 +3373,8 @@ static int osd_mount(const struct lu_env *env, { struct lustre_mount_info *lmi; const char *dev = lustre_cfg_string(cfg, 0); - struct osd_thread_info *info = osd_oti_get(env); - int result; + struct lustre_disk_data *ldd; + struct lustre_sb_info *lsi; ENTRY; @@ -2540,20 +3394,17 @@ static int osd_mount(const struct lu_env *env, /* save lustre_mount_info in dt_device */ o->od_mount = lmi; - result = osd_oi_init(info, &o->od_oi, &o->od_dt_dev); - if (result == 0) { - struct dentry *d; + lsi = s2lsi(lmi->lmi_sb); + ldd = lsi->lsi_ldd; - d = simple_mkdir(osd_sb(o)->s_root, lmi->lmi_mnt, "*OBJ-TEMP*", - 0777, 1); - if (!IS_ERR(d)) { - o->od_obj_area = d; - } else - result = PTR_ERR(d); - } - if (result != 0) - osd_shutdown(env, o); - RETURN(result); + if (ldd->ldd_flags & LDD_F_IAM_DIR) { + o->od_iop_mode = 0; + LCONSOLE_WARN("OSD: IAM mode enabled\n"); + } else + o->od_iop_mode = 1; + + o->od_obj_area = NULL; + RETURN(0); } static struct lu_device *osd_device_fini(const struct lu_env *env, @@ -2640,11 +3491,12 @@ static int osd_process_config(const struct lu_env *env, err = osd_shutdown(env, o); break; default: - err = -ENOTTY; + err = -ENOSYS; } RETURN(err); } + extern void ldiskfs_orphan_cleanup (struct super_block * sb, struct ldiskfs_super_block * es); @@ -2658,6 +3510,49 @@ static int osd_recovery_complete(const struct lu_env *env, RETURN(0); } +static int osd_prepare(const struct lu_env *env, + struct lu_device *pdev, + struct lu_device *dev) +{ + struct osd_device *osd = osd_dev(dev); + struct lustre_sb_info *lsi; + struct lustre_disk_data *ldd; + struct lustre_mount_info *lmi; + struct osd_thread_info *oti = osd_oti_get(env); + struct dt_object *d; + int result; + + ENTRY; + /* 1. initialize oi before any file create or file open */ + result = osd_oi_init(oti, &osd->od_oi, + &osd->od_dt_dev, lu2md_dev(pdev)); + if (result != 0) + RETURN(result); + + lmi = osd->od_mount; + lsi = s2lsi(lmi->lmi_sb); + ldd = lsi->lsi_ldd; + + /* 2. setup local objects */ + result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev)); + if (result) + goto out; + + /* 3. open remote object dir */ + d = dt_store_open(env, lu2dt_dev(dev), "", + remote_obj_dir, &oti->oti_fid); + if (!IS_ERR(d)) { + osd->od_obj_area = d; + result = 0; + } else { + result = PTR_ERR(d); + osd->od_obj_area = NULL; + } + +out: + RETURN(result); +} + static struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev, const struct osd_inode_id *id) @@ -2672,7 +3567,8 @@ static struct inode *osd_iget(struct osd_thread_info *info, CERROR("bad inode\n"); iput(inode); inode = ERR_PTR(-ENOENT); - } else if (inode->i_generation != id->oii_gen) { + } else if (id->oii_gen != OSD_OII_NOGEN && + inode->i_generation != id->oii_gen) { CERROR("stale inode\n"); iput(inode); inode = ERR_PTR(-ESTALE); @@ -2719,6 +3615,10 @@ static int osd_fid_lookup(const struct lu_env *env, if (!IS_ERR(inode)) { obj->oo_inode = inode; LASSERT(obj->oo_inode->i_sb == osd_sb(dev)); + if (dev->od_iop_mode) { + obj->oo_compat_dot_created = 1; + obj->oo_compat_dotdot_created = 1; + } result = 0; } else /* @@ -2732,6 +3632,7 @@ static int osd_fid_lookup(const struct lu_env *env, } else if (result == -ENOENT) result = 0; LINVRNT(osd_invariant(obj)); + RETURN(result); } @@ -2831,7 +3732,8 @@ static const struct lu_object_operations osd_lu_obj_ops = { static const struct lu_device_operations osd_lu_ops = { .ldo_object_alloc = osd_object_alloc, .ldo_process_config = osd_process_config, - .ldo_recovery_complete = osd_recovery_complete + .ldo_recovery_complete = osd_recovery_complete, + .ldo_prepare = osd_prepare, }; static const struct lu_device_type_operations osd_device_type_ops = { @@ -2862,10 +3764,19 @@ static struct obd_ops osd_obd_device_ops = { .o_owner = THIS_MODULE }; +static struct lu_local_obj_desc llod_osd_rem_obj_dir = { + .llod_name = remote_obj_dir, + .llod_oid = OSD_REM_OBJ_DIR_OID, + .llod_is_index = 1, + .llod_feat = &dt_directory_features, +}; + static int __init osd_mod_init(void) { struct lprocfs_static_vars lvars; + osd_oi_mod_init(); + llo_local_obj_register(&llod_osd_rem_obj_dir); lprocfs_osd_init_vars(&lvars); return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars, LUSTRE_OSD_NAME, &osd_device_type); diff --git a/lustre/osd/osd_internal.h b/lustre/osd/osd_internal.h index e187323..952754a 100644 --- a/lustre/osd/osd_internal.h +++ b/lustre/osd/osd_internal.h @@ -54,6 +54,8 @@ /* struct dentry */ #include #include +/* struct dirent64 */ +#include /* LUSTRE_OSD_NAME */ #include @@ -66,6 +68,7 @@ struct inode; +#define OSD_OII_NOGEN (0) #define OSD_COUNTERS (0) #ifdef HAVE_QUOTA_SUPPORT @@ -90,7 +93,7 @@ struct osd_device { * XXX temporary stuff for object index: directory where every object * is named by its fid. */ - struct dentry *od_obj_area; + struct dt_object *od_obj_area; /* Environment for transaction commit callback. * Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD @@ -117,23 +120,59 @@ struct osd_device { cfs_time_t od_osfs_age; struct kstatfs od_kstatfs; spinlock_t od_osfs_lock; + + /** + * The following flag indicates, if it is interop mode or not. + * It will be initialized, using mount param. + */ + __u32 od_iop_mode; }; +/** + * This is iterator's in-memory data structure in interoperability + * mode (i.e. iterator over ldiskfs style directory) + */ +struct osd_it_ea { + struct osd_object *oie_obj; + /** used in ldiskfs iterator, to stored file pointer */ + struct file oie_file; + /** used in ldiskfs iterator, to store directory entry */ + struct dirent64 oie_dirent64; + /** current file position */ + __u64 oie_curr_pos; + /** next file position */ + __u64 oie_next_pos; + /** namelen of the file */ + __u8 oie_namelen; + +}; + +/** + * Iterator's in-memory data structure for IAM mode. + */ +struct osd_it_iam { + struct osd_object *oi_obj; + struct iam_path_descr *oi_ipd; + struct iam_iterator oi_it; +}; struct osd_thread_info { const struct lu_env *oti_env; + /** + * used for index operations. + */ + struct dentry oti_obj_dentry; + struct dentry oti_child_dentry; struct lu_fid oti_fid; struct osd_inode_id oti_id; /* * XXX temporary: for ->i_op calls. */ - struct qstr oti_str; struct txn_param oti_txn; /* * XXX temporary: fake dentry used by xattr calls. */ - struct dentry oti_dentry; struct timespec oti_time; /* * XXX temporary: fake struct file for osd_object_sync @@ -147,14 +186,43 @@ struct osd_thread_info { struct lu_fid_pack oti_pack; - /* union to guarantee that ->oti_ipd[] has proper alignment. */ + /** + * following ipd and it structures are used for osd_index_iam_lookup() + * these are defined separately as we might do index operation + * in open iterator session. + */ + + /** osd iterator context used for iterator session */ + + union { + struct osd_it_iam oti_it; + /** ldiskfs iterator data structure, see osd_it_ea_{init, fini} */ + struct osd_it_ea oti_it_ea; + }; + + + /** IAM iterator for index operation. */ + struct iam_iterator oti_idx_it; + + /** union to guarantee that ->oti_ipd[] has proper alignment. */ union { - char oti_ipd[DX_IPD_MAX_SIZE]; + char oti_it_ipd[DX_IPD_MAX_SIZE]; long long oti_alignment_lieutenant; }; + + union { + char oti_idx_ipd[DX_IPD_MAX_SIZE]; + long long oti_alignment_lieutenant_colonel; + }; + + int oti_r_locks; int oti_w_locks; int oti_txns; + /** used in osd_fid_set() to put xattr */ + struct lu_buf oti_buf; + /** used in osd_ea_fid_set() to set fid into common ea */ + struct lustre_mdt_attrs oti_mdt_attrs; #ifdef HAVE_QUOTA_SUPPORT struct osd_ctxt oti_ctxt; #endif diff --git a/lustre/osd/osd_oi.c b/lustre/osd/osd_oi.c index 79d4082..ad03c3c 100644 --- a/lustre/osd/osd_oi.c +++ b/lustre/osd/osd_oi.c @@ -77,50 +77,89 @@ struct oi_descr { int fid_size; char *name; + __u32 oid; +}; + +/** to serialize concurrent OI index initialization */ +static struct mutex oi_init_lock; + +static struct dt_index_features oi_feat = { + .dif_flags = DT_IND_UPDATE, + .dif_recsize_min = sizeof(struct osd_inode_id), + .dif_recsize_max = sizeof(struct osd_inode_id), + .dif_ptrsize = 4 }; static const struct oi_descr oi_descr[OSD_OI_FID_NR] = { [OSD_OI_FID_SMALL] = { .fid_size = 5, - .name = "oi.5" + .name = "oi.5", + .oid = OSD_OI_FID_SMALL_OID }, [OSD_OI_FID_OTHER] = { .fid_size = sizeof(struct lu_fid), - .name = "oi.16" + .name = "oi.16", + .oid = OSD_OI_FID_OTHER_OID } }; +static int osd_oi_index_create(struct osd_thread_info *info, + struct dt_device *dev, + struct md_device *mdev) +{ + const struct lu_env *env; + struct lu_fid *oi_fid = &info->oti_fid; + struct md_object *mdo; + int i; + int rc; + + env = info->oti_env; + + for (i = rc = 0; i < OSD_OI_FID_NR && rc == 0; ++i) { + char *name; + name = oi_descr[i].name; + lu_local_obj_fid(oi_fid, oi_descr[i].oid); + oi_feat.dif_keysize_min = oi_descr[i].fid_size, + oi_feat.dif_keysize_max = oi_descr[i].fid_size, + + mdo = llo_store_create_index(env, mdev, dev, + "/", name, + oi_fid, &oi_feat); + + if (IS_ERR(mdo)) + RETURN(PTR_ERR(mdo)); + + lu_object_put(env, &mdo->mo_lu); + } + return 0; +} + int osd_oi_init(struct osd_thread_info *info, - struct osd_oi *oi, struct dt_device *dev) + struct osd_oi *oi, + struct dt_device *dev, + struct md_device *mdev) { + const struct lu_env *env; int rc; int i; - const struct lu_env *env; CLASSERT(ARRAY_SIZE(oi->oi_dir) == ARRAY_SIZE(oi_descr)); env = info->oti_env; - + mutex_lock(&oi_init_lock); memset(oi, 0, sizeof *oi); - - for (i = rc = 0; i < ARRAY_SIZE(oi->oi_dir) && rc == 0; ++i) { +retry: + for (i = rc = 0; i < OSD_OI_FID_NR && rc == 0; ++i) { const char *name; - /* - * Allocate on stack---this is initialization. - */ - const struct dt_index_features feat = { - .dif_flags = DT_IND_UPDATE, - .dif_keysize_min = oi_descr[i].fid_size, - .dif_keysize_max = oi_descr[i].fid_size, - .dif_recsize_min = sizeof(struct osd_inode_id), - .dif_recsize_max = sizeof(struct osd_inode_id) - }; struct dt_object *obj; name = oi_descr[i].name; - obj = dt_store_open(env, dev, name, &info->oti_fid); + oi_feat.dif_keysize_min = oi_descr[i].fid_size, + oi_feat.dif_keysize_max = oi_descr[i].fid_size, + + obj = dt_store_open(env, dev, "", name, &info->oti_fid); if (!IS_ERR(obj)) { - rc = obj->do_ops->do_index_try(env, obj, &feat); + rc = obj->do_ops->do_index_try(env, obj, &oi_feat); if (rc == 0) { LASSERT(obj->do_index_ops != NULL); oi->oi_dir[i] = obj; @@ -130,17 +169,25 @@ int osd_oi_init(struct osd_thread_info *info, } } else { rc = PTR_ERR(obj); + if (rc == -ENOENT) { + rc = osd_oi_index_create(info, dev, mdev); + if (!rc) + goto retry; + } CERROR("Cannot open \"%s\": %d\n", name, rc); } } if (rc != 0) osd_oi_fini(info, oi); + + mutex_unlock(&oi_init_lock); return rc; } void osd_oi_fini(struct osd_thread_info *info, struct osd_oi *oi) { int i; + for (i = 0; i < ARRAY_SIZE(oi->oi_dir); ++i) { if (oi->oi_dir[i] != NULL) { lu_object_put(info->oti_env, &oi->oi_dir[i]->do_lu); @@ -171,6 +218,16 @@ static const struct dt_key *oi_fid_key(struct osd_thread_info *info, return NULL; } +static inline int fid_is_oi_fid(const struct lu_fid *fid) +{ + /* We need to filter-out oi obj's fid. As we can not store it, while + * oi-index create operation. + */ + return (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE && + (fid_oid(fid) == OSD_OI_FID_SMALL_OID || + fid_oid(fid) == OSD_OI_FID_OTHER_OID))); +} + int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi, const struct lu_fid *fid, struct osd_inode_id *id) { @@ -183,12 +240,19 @@ int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi, struct dt_object *idx; const struct dt_key *key; + if (fid_is_oi_fid(fid)) + return -ENOENT; + key = oi_fid_key(info, oi, fid, &idx); rc = idx->do_index_ops->dio_lookup(info->oti_env, idx, (struct dt_rec *)id, key, BYPASS_CAPA); - id->oii_ino = be32_to_cpu(id->oii_ino); - id->oii_gen = be32_to_cpu(id->oii_gen); + if (rc > 0) { + id->oii_ino = be32_to_cpu(id->oii_ino); + id->oii_gen = be32_to_cpu(id->oii_gen); + rc = 0; + } else if (rc == 0) + rc = -ENOENT; } return rc; } @@ -204,6 +268,9 @@ int osd_oi_insert(struct osd_thread_info *info, struct osd_oi *oi, if (fid_is_igif(fid)) return 0; + if (fid_is_oi_fid(fid)) + return 0; + key = oi_fid_key(info, oi, fid, &idx); id = &info->oti_id; id->oii_ino = cpu_to_be32(id0->oii_ino); @@ -228,3 +295,9 @@ int osd_oi_delete(struct osd_thread_info *info, return idx->do_index_ops->dio_delete(info->oti_env, idx, key, th, BYPASS_CAPA); } + +int osd_oi_mod_init() +{ + mutex_init(&oi_init_lock); + return 0; +} diff --git a/lustre/osd/osd_oi.h b/lustre/osd/osd_oi.h index 8e02eb2..fe87768 100644 --- a/lustre/osd/osd_oi.h +++ b/lustre/osd/osd_oi.h @@ -54,6 +54,7 @@ /* struct rw_semaphore */ #include #include +#include struct lu_fid; struct osd_thread_info; @@ -90,8 +91,11 @@ struct osd_inode_id { __u32 oii_gen; /* inode generation */ }; -int osd_oi_init(struct osd_thread_info *info, - struct osd_oi *oi, struct dt_device *dev); +int osd_oi_mod_init(void); +int osd_oi_init(struct osd_thread_info *info, + struct osd_oi *oi, + struct dt_device *dev, + struct md_device *mdev); void osd_oi_fini(struct osd_thread_info *info, struct osd_oi *oi); int osd_oi_lookup(struct osd_thread_info *info, struct osd_oi *oi, diff --git a/lustre/tests/cfg/lmv.sh b/lustre/tests/cfg/lmv.sh index 6358789..3b573bc 100644 --- a/lustre/tests/cfg/lmv.sh +++ b/lustre/tests/cfg/lmv.sh @@ -29,6 +29,7 @@ TMP=${TMP:-/tmp} MDSDEV=${MDSDEV:-$TMP/${FSNAME}-mdt1} MDSCOUNT=${MDSCOUNT:-3} test $MDSCOUNT -gt 4 && MDSCOUNT=4 +MDSCOUNT=1 MDSDEVBASE=${MDSDEVBASE:-$TMP/${FSNAME}-mdt} MDSSIZE=${MDSSIZE:-100000} diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh index 3cf4e80..f4754f4 100644 --- a/lustre/tests/conf-sanity.sh +++ b/lustre/tests/conf-sanity.sh @@ -1022,26 +1022,27 @@ test_32b() { [ -z "$TUNEFS" ] && skip "No tunefs" && return local DISK1_8=$LUSTRE/tests/disk1_8.tgz [ ! -r $DISK1_8 ] && skip "Cannot find $DISK1_8" && return 0 - mkdir -p $TMP/$tdir - tar xjvf $DISK1_8 -C $TMP/$tdir || \ + local tmpdir=$TMP/$tdir + mkdir -p $tmpdir + tar xjvf $DISK1_8 -C $tmpdir || \ { skip "Cannot untar $DISK1_8" && return ; } load_modules lctl set_param debug=$PTLDEBUG - NEWNAME=sofia + NEWNAME=lustre # writeconf will cause servers to register with their current nids $TUNEFS --writeconf --fsname=$NEWNAME $tmpdir/mds || error "tunefs failed" - start mds $tmpdir/mds "-o loop" || return 3 + start mds1 $tmpdir/mds "-o loop" || return 3 local UUID=$(lctl get_param -n mdt.${NEWNAME}-MDT0000.uuid) echo MDS uuid $UUID - [ "$UUID" == "mdsA_UUID" ] || error "UUID is wrong: $UUID" + [ "$UUID" == "${NEWNAME}-MDT0000_UUID" ] || error "UUID is wrong: $UUID" - $TUNEFS --mgsnode=`hostname` --fsname=$NEWNAME --writeconf $tmpdir/ost1 || error "tunefs failed" + $TUNEFS --mgsnode=`hostname` --writeconf --fsname=$NEWNAME $tmpdir/ost1 || error "tunefs failed" start ost1 $tmpdir/ost1 "-o loop" || return 5 UUID=$(lctl get_param -n obdfilter.${NEWNAME}-OST0000.uuid) echo OST uuid $UUID - [ "$UUID" == "ost1_UUID" ] || error "UUID is wrong: $UUID" + [ "$UUID" == "${NEWNAME}-OST0000_UUID" ] || error "UUID is wrong: $UUID" echo "OSC changes should succeed:" $LCTL conf_param ${NEWNAME}-OST0000.osc.max_dirty_mb=15 || return 7 @@ -1059,7 +1060,7 @@ test_32b() { mount_client $MOUNT FSNAME=$OLDFS set_and_check client "lctl get_param -n mdc.*.max_rpcs_in_flight" "${NEWNAME}-MDT0000.mdc.max_rpcs_in_flight" || return 11 - [ "$(cksum $MOUNT/passwd | cut -d' ' -f 1,2)" == "2479747619 779" ] || return 12 + [ "$(cksum $MOUNT/passwd | cut -d' ' -f 1,2)" == "94306271 1478" ] || return 12 echo "ok." cleanup diff --git a/lustre/tests/disk1_8.tgz b/lustre/tests/disk1_8.tgz new file mode 100644 index 0000000000000000000000000000000000000000..1657c1e948c54abc39259d9f2a1f34ddbfa62c77 GIT binary patch literal 10506 zcma)iWn5d$_AV{7g%aGgg+P$v1lQv379e;i65Oo?iaWvG-6^icy~W*%J4K2V+TQd% z_kYg0U+%ptlbL5`p7pFXvuA(Vzmd?l65xCb0ck=$-2*e9c@zKnf7&Kk%=dsaeOUYH z*A`NyrG|M_MlLLd`6U#TCvxRw%;k)A;%amUB(VWQwNK(uP|#Q-ym%l4-~cr&0E)I< zVw_r>0SqmiNE{dADI+2Yg^_^NVp3ZH5fWND(g>cU&YdJj`HxK^;qX6Q)a6|8NO4p; z82mYnjQYd5w)R_kpnCD!JPuG);(BqMw}^Uzq|!%a6mnE?@QjGsviRhTq|I}G95pV~ zx5Y~0SXd}iabOgbNh=gonSi+Gff_|(@28WPt*oq%{swrGmy3e(+#iXokpF)h*Iz`Q zgg(qk{ydsApg`OL5Tf+jiVF}Dt7e7;#ySe^)Vk&Ypw9r&KOE(Ii*u2_HY2yd0=yo! zV<;;CMTuKkSXj-VJY%AxMM=Oy>+j{3FD@xiMitNYPq(^eIPwQ$gY=N(zh3`eQXy(*PG%6e`vMDDkhv6z7kTUJ%2nDv^tXQVi~KNjfJ7&zi^4R0fvB;q=@h%n zf%0}-2(urG+1O$S7io-KTDRCE+!Hn!{q?o>#O_S5Jz}howW1W;dn~thwMoB%m>F6@ zq!U$|)fVy$Gsby@$_Z$)mEPP|sl8SEDYKO`qFkHft10NxOeHOR!YM97OVzEb%?>sM zY&vXRzMFoZvorQ~3ZcOyUgkYH`MGwOt5;y)})Y&K>iV>;7De+I+vbKs#ZrT4cQ2`G!Gq?67tI!by^BJ zI7L(@GsQ0sGf51=+5U=`$yrH4>8G>5S2p?v@5L5ktEkbkDTdy(%Z|u9N&kwxWWJs9 zxz6l2ej*EQl?7%Er;t#*pllf!(j~vVLQrZn4sU&{R+WD{Sx?oDsiQJE#==TC>c0I( zJ5-0O@M6zrg(4uaPDj}Bv%W^0YO>aqU(>Fg0fvB4*>#O3i#Pv;#`#RnN3OC2+nM|J zZ<+LSduLFPi5n&f|FHW!A-_TZ6-=^F>#Oo~$KeAMT=}N-&=|^Vtj{x;ahYM#3@Ed& zr+4|zj+#&r856cldvKsB@llUZR@R6uIN8@hqQYT3$D00*x!GJb|{9M6Y<#Iogte(uK$ZqbirEL_rl7Z^eL<$Fl&`r@(%&qT6~^K)Qf*jn)z&q8PX>!y zl-n_W>2}m86YNmx6Rxgd3c_Y6=yF}Ygq~B_e-}bf;$yySk%)dBe|K+M1?O zUJ#FvDybTIDGbukRr$)l>Bm{jtuuP`Nt@7ly@zTJM#_|{q(#RRZRPA8`3 ze)=6Uz<~GP|o* zslFZtDhl-Ov26R4pEFf<=Tj8SuWVxttnmd=Y;RKZV9is_v)T5q{8;P~-(YyUN;k$n67hDYDdzB$tNrDL07aBGFFvi99l zrQNK##2FCh6Fk)}=Fi6u#yO!$j>GcJ#L@7}w%iB_mBg7si4$nJHU3O;WqsRum5Dp& z3gxZYr|f2{fganUz(Z&XY{?y(I>TZCn92}B<$tGiF-uZn<}NGY{tm@)Pi;627|n~g z?qpZ)D1z_W%-O8WmdN8vjo3F`|cd%JmpGn;=^p5;$`@ummO%;30~X)8!y(@c&p|`(5Op= zxmtM8#ssu1ugrzg1j~UgGWXE>V}%&V$i9*>cz_+hp0Vo^K-QJhDFpZN6?P^L*HtdP z@y5ML&~W*hy}0HZPDnYUclU|SDdqQ9rL&sqij#}V=WOM5%4-`06mcN|z5V+_gmYnL zpdYSfg$myjh_+nxbd9Qb*u|FjR^D)Z0apko=u|6|yHRx7x_t!jkMs^U${Tr4PnWpv zEX!)7G3gvH=qE*{CX~9~dN?$%VPgADwejv%YnNavkm-yuLtu?bBhaKh)?1jvPMR)e z@b1~tOjbg$`&_0feMR-Id*%1&Ap)A@R6tH# zPMnkw>bGVp#DtZ@o4uQx-)c*q`RDwS8=>1wua^ADMJ)>yg*B zs9wjR=Z^1QU@<$audi=($~{d^EXrjJYfWq@V*N}gof)eR)Gai&92^`RpP;j8YHPGv z;O>ksjwS~We(aRi2kS(nzfstBU9D*H4$^Ky+TM$orq&7uy{Y;dDHSFTS3cbOV5wd)-}u8${_Zfg6V|f&Tgww1kE`2`tNRg39DSI7QQOILQqvH?|MdQu{X6`xa$*`n3g=FS zz!K1Cp5q45pi3Yt3}mYm#nJl@r5zth`CEcg?P<{A7RV{&wB)F@S)7 z9QFnW`;iJRSi@m)$QE6Kg01au5F`hw*G}+H)B5xucLHQMq@MUcJOX6sfK+7oVF}#D zrN6q1eE<&RJ|OFM77k?H2Vnar>pEmI;dH5{Atb!KymU0^auT?I{r;+$1}{L}k=c^` zlh|K@@g8(Z^M9G?rnr=p(-S-bmZk+nj=@oMGMyo zwJIeak)zTGQ74U4`Z&jxZzKPnaJWL9<+bmXhy2D*F~v0x=+_Go2J_7}TUh2;y7aVG z{ozR?g!%BfL`-^GKwP*Nt%m8*&jR7ovkyMkzEz*@I`XIJGIr6mV8jr;VZrqjGRx4w zB?EZtPQ^6IWJ7#jpdys{ACTTxqJ>*T@K)C82pyS} zu#8r)ZE@OnNPW%6w=^@Z2_lNCxnhA`$YINz3zw>2j8O20`bhbEl<%R!4xY(8t{yUD-$y!rja3O-lwO5-5H>M? zuMV!J)(dm9sW`*!_)ln*T7OYQNyCSN+CM^OZ!au72D(5S_Zz zZ6^`O@i|}8u3YLY!)ZW}Pimzw4LqW0iS70&b6?<#?rUdShCo^cso$zWQc_aV7%!#N zsl%_Ysndd~tKqXV73C3Rf*6Op)ZH)=+b=mqvjyuisneERnq(KZ%P55?;wS|EbpH7M z7~fy<`{n%z*|dLA@_>~*cFJqO-txejvIcK5_N!m=yKRSeQ(W>IC)Gcmrl8-(V11{= zycILJ<7YtIG^7arKyvCwa_JaD*4Ld8_CfZC6Z^^U^Y2?ZySJ1EQBL+eL;Sa|w{s4M zTNxhgUtNr}$};|`d0cH)9wSX%$OF3J-%OvcVAuCUy=Vpt4yc0$>WUFsCAaVxQwf0{)_ON)`ZIwTHI$-UBt`mG* zq!KbgO9E~3bP4tddAh&F4ecMWF#WIY-bAKH!99x(y^RVWaRLpZ|0E z-#llgzZzK8Z(Oe3Y9{Bcre;^j`5%cb(!zc{R&aa*a+)){6)U8isRvxV8;)jDs&Cax z*yTG{F3>csU0ZELI)kqF!kd&&k*L|_`d^{4ME)IZ8UNp*{y%|o?|+E=h5tKud)Eo@ z8t+fu44FO^>8gT)79_bS?JuVbt~ zf_#QAbNs(-kB#})tD1YIkw&9Fl2;cCP)Qiq zjUS+4`$;3in?D$78t!`ua*tHMFHvT$)*d4O$=9H%Pj79iU}cREf>19<#pk!It6oA*VV^j^21g+#iL^N`6KOLKX`ZyZLs?eNITVb zMvYt&yx2f&ZH!#3i^XnxF5BcJ<{Yb-8TS7~?Lcfnsr7QUz>D=AQ}4|tBCUQ`nmyR* z4Kxa(Re>UA+{c(B2WvK_%-wKZ{EGkB46|Q?x!Lb+? zo-b!@^;ArGnr2}4p+udG&9MI$zJhm^D4MBb%PwU^Kp0bmf1R0Y^@RB*)tG%<`>%D$ zJ>C*}2Yq!_rBVH!@%BKADN~%yN?GAzcReXb!~{se%!YaR7MZ;yph+BO)Hu? zh+iao!V`_|DLt8=uYdRw`I1;wv)NWoI|mC1>i{AK@gc#oQLXN01tNB&KE*sTfXX_H zQMQt-&6+AyqQ|w+Jb_y@HKlcl&m5G*oy*^-u>7(HQAKY)b`RYNQW3t0jTo|~Z(&!-~Y)9HnPahR=tvkDnis9{0`53WoA!Y>8=7J&ZhxF;EKYx+~^=K0Q&@ z6}$YEfB7EmF*tOJX^miHwO-Xzku`I8XPO^3?JEQB4ftpRyQX5gVq6gV!bK7*N4{{+ zB*k8j7v5({cXb)+4-ld#u&qfZi)FJ~>(6=ayVdvM1B5psr*wI9HWnn<8hT*LR|I^= zt@xW*mOHIRMPPv-rLB`*FT08l!f6@m-J+jyboawMWzt>Db-M-o+84~jKuf(4N z@?yn1@p2`fm=kL@lYSuHr4YDQQlcu1d|fGrE-=ufR+;-+g?;z%E{ z7Rclp2h}Uo=jRwY@vnv)oz(pnrM7=ujjx5X@mpj-BAiZ=)fJf36%xx7GTk656LVwL zD~6bSq&`)*TMiRp^U8|RdP&-(2*(z_l0=($N&=brL5k`xgGb;5iLIHEj7Sx_gq!?J#x4?8ge@`Ot|SX;jNXQ@9F{aEQiU- zXwf_#{5Grhf>qIRQ{WWBhJ@fAR^gK|K26&-GkA#Rn}RHMx`Qvhmi#66sC{(cRQsl8 zny}uTUFYVKPtLGx8z<0$IZhna<%=A?Liei3vF>d@{i~7_MxP9wLspqen<3x#lYP(U zdhLSCQ=WtP9qp9U_2$5J;JL=)ktiiIb0DeSNL1=b2x%XwISVW*b&xY^&>R4PB}2Fd z^~ORH{CeQLqI_i;{ zG+q%l3fS~MCBnb?$@}3im>T9EH5z5g?9#+;azZI4$oWeJo!TH~q3Le4u{1)Ce)N+q zMAF|89~uUUjjedY-}gii@l~oi`(294*LnA$rl-2AVoQvOQf5vb2Y>r>?s^3qoQ{3Z zq9`q@RIzSh6n7QUyt3|o>teC&mTGM@m0zo*@>~07=$gS;`dRcN!5~%KsigR?AFlxo3LGSDD#Ic^6Ijxnjj0y+O{v-)$Dv zt?wEks_B;MNy(<(Ay#fr;%+fQSZaR?eXE*T+4A%J6Cuj+AX3<`B}~>T5_A;L62st| z-pXcj<8tUjwtn>jq5iJEJ!5$_wELFNzM`AQb7o<-=u0!1X)+lppKpx58|y9ecll+u zESK{3C$rk~I7=$64*>m+*L!14Q86}pr_b1=4&+Y}ht1VpYnU4JLn_Y-)#M(lBRy(C z|ecVrcrV6M_xc0qwBbpS>kI6}ef1 z-yVQR>)5BsA<`*$vll;si)<3V=QV?OK-IRh2Ub0!5Yi7vUJZAl3Zk$9?WqzHxBzC) znsPEr2T-T7-W;LdXb#eIwwE`=|L|7QZ^S^>-XE~oC z1F>WRmfi=h4is4{_rr`;Oiq7RjLU{4af(dLDeh6W#z2 zU5!H{h5NN``qz|e=;x7cA3^gd{t=#64%5`)UJBbf_pyqEsbPdm$ZiOMy|XpXR}53iwOO!W<830 z(Knkb?YrQsz=R2gXEDTdi7ZkvM58Kb`=Em(@60H!NJ$ILA%vO8JE)*@f?h&wIX-wP zPq6=|i9^imeeGcul}3&WD$e_AXQqz#P+;J0B|~VfaM~C-J6@6?uyNHvPta0Rt86GS zd<^S!gUvfHjbZz;MRRDIaso@5?mPy*d%+NBNuD=Zd#F0Vf^dK=hr4D;xD0v|XDKV| z=yL(N!~s2sURS&+Kqnpk<(zwQI|1O z1=PPxU$j7bA-Kt)Wtyx3QwY}sakMGP=~~`E#xS@-ui;z#UL!V_kwpZd^I6ZyLC2%yHKTUmKJ)XI>O5-o<#h;&L_!M$#UIhV*PZ$HD+tOXST7ktHJcTqoRQ#z_#+Xkd6qEyJoIv2p{_$UGpy8BD?jpl< z)sA$jJ^{PJDWcRg%k8H*ug8_DIdYiZ9gr~$gH5awPmepm^l572UU;PR`$ZrvOu!&U z@W9q3izwPbNS{p;82lUF6CNv3lapF&QD5J^0OwQwQLz?6lF96GrPI^a@aWx%YZ;p9 ziM~8QCh2J6P<8t-_Z>D;!93r`JSWMeKV3Wr*C}6Z*8W4Q*O}i_o(tF|j`Un-opn-`sL`tq`gvc!2)Bb&b@e3+*hJHs zkWAAbD={V~lF)ClMUIdUv5BY^mTK#wWzZapyuPXT`|)x3`rP`J$oiJ^*`MVXpPE*( zk}cWfTFZt+4Lcy6Ha~)EyZ;5(g=k^>GoACGsDRl(m2_K zWUyCoHb64e%bq3FF8yO|3i-H0P1kV)X>*ZQu8G)p@Oa8cTMotBf_)d08i9KQ&y9*_g4^{!Ux%7n3nP z?we3KumJ--AAA9lC)1%l(o#8~kNFvBA`aF;l(DBz~E{p-a6z|2Khls?{Kd8 zx_EFvbkMXVS~;E50E;3dsR-_>J^#j!9HOa?F#XX}MZ4U18@S9HURPwTAIj0|JybTQ z;shbcZj0@&%e_fH;-I>|H7H%vtuNcVfMu{T-YNQB67M)SDTT&GE$JuE*49`XncFO0 zl>FpNqGwoOcu&qyrM4QUzKtUz{4HP&P~i8T{bzss?E^aH3yUT69PV{*j9@{&m0`={GfxIOO9TNS~|J#Ep$d)n#~Nt(Z2a`!I^ zE&mu0k+@4qZTrbvWgKPg^xl$rVL1wo9!tcG;iouJxs@nu-V3(}U z-IsxHkH)^g=&OAj+%Dz^R$`x>GxTR=-o~DfTyy}xtKWp(T!N*Ky|yxOZp4BfN_xH6 zzjpz@wBZOJiu42tHn73v6Qv3mN8#GD)fh06oPB^Wftn50;uPjGQBPoZ#mnBV6oz`2 z5SFc|vS|K3oi3hRR)1-I2`2QP4jjdV#ZI8q8M?-*U3vv&|61ZFbNBv+0nQ*HX>~1~ z*XPjV(gW^}&iy9elKpPGKRSP$A83r&t{T}i?+Q{dg!lcvnd`?Q?v1bcEL3i1^NZhdu7w2J+^oj;nF0hn(z5)YzF*WS3+@XL(B`A1abe%+3L ztcyd4+y5!ITx{0#FbX(6gY{RFZtL33LfLFV@BWOd>BuL}!}oBH#PK$a|9to%K*W#o z=Qk$AV~YLZ&mAr<3bLR)MWEoKa7YlM9M7ICQa~4N0_EaBn!TwYB|H)F2_PuSA7wea zIGPM>G0equ$Pke|hvK90KbMYBm4Xy<@?|n6pNa512|_^u1G3||$g!SGkb}X!Pm0lI z#L2J#;^dsEr^B>Hz>;|R61UQR}SL?`?Pd5!W`3$q0p7Z`v-^5kh?ey;0Rok{;+y;$D3 zT)1Z9@7N@hUx~|pF4V&UU@E?i;T3u+sq3L;i-#HC-I=$u6j{|R$g**`QGYw6QVq;e zyjF6X*j|XkCgCdJv4o)sakIMwy*|o~Wg?AA2sc4VcGf@y&3w~8$X1+IPY2P*gd?oD zh%Mf!Gkr|kd?xVptw|o?M+w0!w9MN0QdWO)8=X7`RL#l66J2!0vf|@fl4(J<*V6e$ z6v3k2@Q(t7j`SJIPqlq+8;m5=lWXLEq8qT;v$c~rzk7%jN~Z%+m{5q9>#q!OKTEsO$WinT^bS?blC3G z(=RQ4jeE;FjjRjejo5OKcf5IR&%#Pj8(WedyuqNdbjTLsD0{@o>wta0>l9*np3-Gula0J`FxW4!SCRR0n#gd}S5{!S z1V+)D+$FN@M3{|536qm>ZY|UmEVb&B3Wp9x)8hdo<8X@;pW#}Ml;VT-GJejqZ8D22 z+2LfMg1L*yaf@3&0}S!0(Uynf5N#2xhS3BXm--QCtBQF_(y5hL=oP41FDt;W_|WC( z10gpq4C-N%%I)dndsBgH{hvxYy%87;O=dBBwFYTY{`ASrNhw-jaO0bPGXH| b4bApKQa1(SL2(Gw-^dgl^Jq$EF<$>KU6Ay3 literal 0 HcmV?d00001 diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index f4b7a48..dce9601 100644 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -15,6 +15,7 @@ export GSS=false export GSS_KRB5=false export GSS_PIPEFS=false export IDENTITY_UPCALL=default + #export PDSH="pdsh -S -Rssh -w" # eg, assert_env LUSTRE MDSNODES OSTNODES CLIENTS @@ -1080,6 +1081,11 @@ mdsmkfsopts() } formatall() { + if [ "$IAMDIR" == "yes" ]; then + MDS_MKFS_OPTS="$MDS_MKFS_OPTS --iam-dir" + MDSn_MKFS_OPTS="$MDSn_MKFS_OPTS --iam-dir" + fi + [ "$FSTYPE" ] && FSTYPE_OPT="--backfstype $FSTYPE" if [ ! -z $SEC ]; then diff --git a/lustre/utils/mkfs_lustre.c b/lustre/utils/mkfs_lustre.c index 0bd83b7..8f54f8b 100644 --- a/lustre/utils/mkfs_lustre.c +++ b/lustre/utils/mkfs_lustre.c @@ -98,6 +98,7 @@ char *progname; int verbose = 1; static int print_only = 0; static int failover = 0; +static int upgrade_to_18 = 0; void usage(FILE *out) { @@ -130,6 +131,7 @@ void usage(FILE *out) "\t\t--mkfsoptions= : format options\n" "\t\t--reformat: overwrite an existing disk\n" "\t\t--stripe-count-hint=#N : used for optimizing MDT inode size\n" + "\t\t--iam-dir: make use of IAM directory format on backfs, incompatible with ext3.\n" #else "\t\t--erase-params : erase all old parameter settings\n" "\t\t--nomgs: turn off MGS service on this MDT\n" @@ -716,7 +718,7 @@ void print_ldd(char *str, struct lustre_disk_data *ldd) printf("Lustre FS: %s\n", ldd->ldd_fsname); printf("Mount type: %s\n", MT_STR(ldd)); printf("Flags: %#x\n", ldd->ldd_flags); - printf(" (%s%s%s%s%s%s%s%s)\n", + printf(" (%s%s%s%s%s%s%s%s%s)\n", IS_MDT(ldd) ? "MDT ":"", IS_OST(ldd) ? "OST ":"", IS_MGS(ldd) ? "MGS ":"", @@ -724,6 +726,7 @@ void print_ldd(char *str, struct lustre_disk_data *ldd) ldd->ldd_flags & LDD_F_VIRGIN ? "first_time ":"", ldd->ldd_flags & LDD_F_UPDATE ? "update ":"", ldd->ldd_flags & LDD_F_WRITECONF ? "writeconf ":"", + ldd->ldd_flags & LDD_F_IAM_DIR ? "IAM_dir_format ":"", ldd->ldd_flags & LDD_F_UPGRADE14 ? "upgrade1.4 ":""); printf("Persistent mount opts: %s\n", ldd->ldd_mount_opts); printf("Parameters:%s\n", ldd->ldd_params); @@ -732,6 +735,67 @@ void print_ldd(char *str, struct lustre_disk_data *ldd) printf("\n"); } +static int touch_file(char *filename) +{ + int fd; + + if (filename == NULL) { + return 1; + } + + fd = open(filename, O_CREAT | O_TRUNC, 0600); + if (fd < 0) { + return 1; + } else { + close(fd); + return 0; + } +} + +/* keep it less than LL_FID_NAMELEN */ +#define DUMMY_FILE_NAME_LEN 25 +#define EXT3_DIRENT_SIZE DUMMY_FILE_NAME_LEN + +/* Need to add these many entries to this directory to make HTREE dir. */ +#define MIN_ENTRIES_REQ_FOR_HTREE ((L_BLOCK_SIZE / EXT3_DIRENT_SIZE)) + +static int add_dummy_files(char *dir) +{ + char fpname[PATH_MAX]; + int i; + int rc; + + for (i = 0; i < MIN_ENTRIES_REQ_FOR_HTREE; i++) { + snprintf(fpname, PATH_MAX, "%s/%0*d", dir, + DUMMY_FILE_NAME_LEN, i); + + rc = touch_file(fpname); + if (rc && rc != -EEXIST) { + fprintf(stderr, + "%s: Can't create dummy file %s: %s\n", + progname, fpname , strerror(errno)); + return rc; + } + } + return 0; +} + +static int __l_mkdir(char * filepnm, int mode , struct mkfs_opts *mop) +{ + int ret; + + ret = mkdir(filepnm, mode); + if (ret && ret != -EEXIST) + return ret; + + /* IAM mode supports ext3 directories of HTREE type only. So add dummy + * entries to new directory to create htree type of container for + * this directory. */ + if (mop->mo_ldd.ldd_flags & LDD_F_IAM_DIR) + return add_dummy_files(filepnm); + return 0; +} + /* Write the server config files */ int write_local_files(struct mkfs_opts *mop) { @@ -766,7 +830,7 @@ int write_local_files(struct mkfs_opts *mop) /* Set up initial directories */ sprintf(filepnm, "%s/%s", mntpt, MOUNT_CONFIGS_DIR); - ret = mkdir(filepnm, 0777); + ret = __l_mkdir(filepnm, 0777, mop); if ((ret != 0) && (errno != EEXIST)) { fprintf(stderr, "%s: Can't make configs dir %s (%s)\n", progname, filepnm, strerror(errno)); @@ -775,16 +839,6 @@ int write_local_files(struct mkfs_opts *mop) ret = 0; } - sprintf(filepnm, "%s/%s", mntpt, "ROOT"); - ret = mkdir(filepnm, 0777); - if ((ret != 0) && (errno != EEXIST)) { - fprintf(stderr, "%s: Can't make ROOT dir %s (%s)\n", - progname, filepnm, strerror(errno)); - goto out_umnt; - } else if (errno == EEXIST) { - ret = 0; - } - /* Save the persistent mount data into a file. Lustre must pre-read this file to get the real mount options. */ vprint("Writing %s\n", MOUNT_DATA_FILE); @@ -797,7 +851,6 @@ int write_local_files(struct mkfs_opts *mop) } fwrite(&mop->mo_ldd, sizeof(mop->mo_ldd), 1, filep); fclose(filep); - /* COMPAT_146 */ #ifdef TUNEFS /* Check for upgrade */ @@ -859,7 +912,6 @@ int write_local_files(struct mkfs_opts *mop) #endif /* end COMPAT_146 */ - out_umnt: umount(mntpt); out_rmdir: @@ -1102,6 +1154,7 @@ int parse_opts(int argc, char *const argv[], struct mkfs_opts *mop, char **mountopts) { static struct option long_opt[] = { + {"iam-dir", 0, 0, 'a'}, {"backfstype", 1, 0, 'b'}, {"stripe-count-hint", 1, 0, 'c'}, {"comment", 1, 0, 'u'}, @@ -1129,6 +1182,7 @@ int parse_opts(int argc, char *const argv[], struct mkfs_opts *mop, {"reformat", 0, 0, 'r'}, {"verbose", 0, 0, 'v'}, {"writeconf", 0, 0, 'w'}, + {"upgrade_to_18", 0, 0, 'U'}, {0, 0, 0, 0} }; char *optstring = "b:c:C:d:ef:Ghi:k:L:m:MnNo:Op:Pqru:vw"; @@ -1138,6 +1192,11 @@ int parse_opts(int argc, char *const argv[], struct mkfs_opts *mop, while ((opt = getopt_long(argc, argv, optstring, long_opt, &longidx)) != EOF) { switch (opt) { + case 'a': { + if (IS_MDT(&mop->mo_ldd)) + mop->mo_ldd.ldd_flags |= LDD_F_IAM_DIR; + break; + } case 'b': { int i = 0; while (i < LDD_MT_LAST) { @@ -1289,6 +1348,9 @@ int parse_opts(int argc, char *const argv[], struct mkfs_opts *mop, case 'w': mop->mo_ldd.ldd_flags |= LDD_F_WRITECONF; break; + case 'U': + upgrade_to_18 = 1; + break; default: if (opt != '?') { fatal(); @@ -1308,227 +1370,6 @@ int parse_opts(int argc, char *const argv[], struct mkfs_opts *mop, return 0; } -#include - -#define LDISKFS_IOC_GETVERSION _IOR('f', 3, long) - -#ifndef TUNEFS /* mkfs.lustre */ -static int mkfs_iam_insert(int key_need_convert, char *keybuf, - int rec_need_convert, char *recbuf, char *filename) -{ - int fd; - int ret; - struct iam_uapi_info ua; - - fd = iam_open(filename, &ua); - if (fd < 0) { - fprintf(stderr, "failed to iam_open %s\n", filename); - return 1; - } - - ret = iam_insert(fd, &ua, - key_need_convert, keybuf, - rec_need_convert, recbuf); - iam_close(fd); - if (ret) { - fprintf(stderr, "failed to iam_insert %s\n", filename); - return 1; - } else { - return 0; - } -} - -static int touch_file(char *filename) -{ - int fd; - - if (filename == NULL) { - return 1; - } - - fd = open(filename, O_CREAT | O_TRUNC, 0600); - if (fd < 0) { - return 1; - } else { - close(fd); - return 0; - } -} - -static int get_generation(char *filename, unsigned long *result) -{ - int fd; - int ret; - - if (filename == NULL) { - return 1; - } - - fd = open(filename, O_RDONLY); - if (fd < 0) { - fprintf(stderr, "%s: failed to open %s\n", - __FUNCTION__, filename); - return 1; - } - - ret = ioctl(fd, LDISKFS_IOC_GETVERSION, result); - close(fd); - - return ((ret < 0) ? ret : 0); -} - -static int mkfs_mdt(struct mkfs_opts *mop) -{ - char mntpt[] = "/tmp/mntXXXXXX"; - char fstype[] = "ldiskfs"; - char filepnm[128]; - char recbuf[64]; - char *source; - int ret; - unsigned long generation; - struct stat st; - - source = mop->mo_device; - if (mop->mo_flags & MO_IS_LOOP) { - source = mop->mo_loopdev; - } - - if ((source == NULL) || (*source == 0)) { - return 1; - } - - if (!mkdtemp(mntpt)) { - fprintf(stderr, "%s: failed to mkdtemp %s\n", - __FUNCTION__, mntpt); - return errno; - } - - ret = mount(source, mntpt, fstype, 0, NULL); - if (ret) { - goto out_rmdir; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "seq_ctl"); - ret = touch_file(filepnm); - if (ret) { - goto out_umount; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "seq_srv"); - ret = touch_file(filepnm); - if (ret) { - goto out_umount; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "last_received"); - ret = touch_file(filepnm); - if (ret) { - goto out_umount; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "lov_objid"); - ret = touch_file(filepnm); - if (ret) { - goto out_umount; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "root"); - ret = iam_creat(filepnm, FMT_LVAR, L_BLOCK_SIZE, 4, 17, 4); - if (ret) { - goto out_umount; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "fld"); - ret = iam_creat(filepnm, FMT_LFIX, L_BLOCK_SIZE, 8, 8, 4); - if (ret) { - goto out_umount; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "orphans"); - ret = iam_creat(filepnm, FMT_LFIX, L_BLOCK_SIZE, 20, 8, 4); - if (ret) { - goto out_umount; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "oi.16"); - ret = iam_creat(filepnm, FMT_LFIX, L_BLOCK_SIZE, 16, 8, 4); - if (ret) { - goto out_umount; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "oi.5"); - ret = iam_creat(filepnm, FMT_LFIX, L_BLOCK_SIZE, 5, 8, 4); - if (ret) { - goto out_umount; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, CAPA_KEYS); - ret = touch_file(filepnm); - if (ret) { - goto out_umount; - } - - umount(mntpt); - ret = mount(source, mntpt, fstype, 0, NULL); - if (ret) { - goto out_rmdir; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "root"); - ret = iam_polymorph(filepnm, 040755); - if (ret) { - perror("IAM_IOC_POLYMORPH"); - goto out_umount; - } - - umount(mntpt); - ret = mount(source, mntpt, fstype, 0, NULL); - if (ret) { - goto out_rmdir; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "fld"); - ret = mkfs_iam_insert(1, "0000000000000002", 1, "0000000000000000", filepnm); - if (ret) { - goto out_umount; - } - - ret = mkfs_iam_insert(1, "0000000000000001", 1, "0000000000000000", filepnm); - if (ret) { - goto out_umount; - } - - snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "root"); - ret = stat(filepnm, &st); - if (ret) { - goto out_umount; - } - - ret = get_generation(filepnm, &generation); - if (ret) { - goto out_umount; - } - - snprintf(recbuf, sizeof(recbuf) - 1, "110000000000000001%8.8x%8.8x", - (unsigned int)st.st_ino, (unsigned int)generation); - ret = mkfs_iam_insert(0, ".", 1, recbuf, filepnm); - if (ret) { - goto out_umount; - } - - ret = mkfs_iam_insert(0, "..", 1, recbuf, filepnm); - if (ret) { - goto out_umount; - } - -out_umount: - umount(mntpt); -out_rmdir: - rmdir(mntpt); - return ret; -} -#endif - int main(int argc, char *const argv[]) { struct mkfs_opts mop; @@ -1758,16 +1599,6 @@ int main(int argc, char *const argv[]) goto out; } -#ifndef TUNEFS /* mkfs.lustre */ - if (IS_MDT(ldd)) { - ret = mkfs_mdt(&mop); - if (ret != 0) { - fprintf(stderr, "failed to mkfs_mdt\n"); - goto out; - } - } -#endif - out: loop_cleanup(&mop); -- 1.8.3.1