From 5165cdd4b063d523e5ae261f47818b5ba2bbc7cc Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Fri, 21 Sep 2012 23:45:32 +0400 Subject: [PATCH] LU-1303 mds: integration lod/osp into the stack - config handler recognizes MDS config and replace LOV/OSC devices with LOD/OSP - the devices in MDS stack are setup from bottom to top using obd_connect() namely MDT->MDD->LOD->OSD, when new OSPs arrive, they connect to the OSD and LOD connects to every OSP). all the devices are referenced by exports, preventing them from early release. - once the whole configuration is processed, prepare() method of the top device is called, that in turn call ->prepare() down through the stack, etc. at this point the stack is considered ready to access storage, start recovery and do regular processing. - at umount ->ldo_process_config(LCFG_CLEANUP) is called down through the stack asking devices to release the resources (like objects referenced for internal purposes), then the series of calls to obd_disconnect()'s starting from the top MDT causes all the devices to become unreferenced and subject to release by zombie thread. - obd_notify() mechanism is not used by MDT/MDD/LOD/OSD, it's still used by OSP to be aware of current state: - to learn when the first connection to OST happens and let clients to connect to MDS, MDS uses obd_health_check() and caches positive result. - during object creation LOD uses dt_statfs() and data returned by OSP tells LOD whether specific OST active/inactive, space available, number of precreated objects, etc. - LOD takes care of striping: it maintains the list of available OST represented locally by OSP devices. on create, LOD finds suitable OSPs, create objects by means of OSD API and stores striping info into a local object. similarly LOD handles object destroy and UID/GID changes: load striping and execute on them. - llog is used by OSP as a local library, cookies are not visible out side of OSP. thus neither MDT nor MDD needs to care about cookies, prepare buffers. MDD doesn't need to send RPCs to destroy objects on behalf of evicted clients. all is hidden behind OSP's object destroy method. - OSP generates llog records for OST object being destroyed and, once corresponded local transaction is committed, send OST_DESTROY RPC. once OST_DESTROY is reported to be committed, corresponded llog record is cancelled. - the same logic is used for UID/GID changes. Signed-off-by: Alex Zhuravlev Change-Id: Ifb282ace94f583ffa86020b763f22922e5d0b032 Reviewed-on: http://review.whamcloud.com/4087 Reviewed-by: Andreas Dilger Tested-by: Hudson Tested-by: Maloo Reviewed-by: Mike Pershin --- lustre-iokit/mds-survey/mds-survey | 2 +- lustre/mdd/mdd_device.c | 282 ++++++++----- lustre/mdd/mdd_dir.c | 189 ++++----- lustre/mdd/mdd_internal.h | 6 +- lustre/mdd/mdd_lfsck.c | 4 +- lustre/mdd/mdd_lov.c | 809 ------------------------------------- lustre/mdd/mdd_lproc.c | 5 +- lustre/mdd/mdd_object.c | 560 +++---------------------- lustre/mdd/mdd_orphans.c | 79 ++-- lustre/mdd/mdd_trans.c | 10 - lustre/mdt/mdt_handler.c | 467 ++++++++++----------- lustre/mdt/mdt_lib.c | 57 ++- lustre/mdt/mdt_open.c | 59 +-- lustre/mdt/mdt_recovery.c | 24 +- lustre/mdt/mdt_reint.c | 43 +- lustre/obdclass/llog_cat.c | 1 + lustre/obdclass/obd_config.c | 27 ++ lustre/obdclass/obd_mount.c | 36 +- lustre/obdecho/echo_client.c | 1 + lustre/osd-ldiskfs/osd_compat.c | 4 +- lustre/osd-ldiskfs/osd_handler.c | 22 +- lustre/osp/osp_dev.c | 2 +- lustre/osp/osp_object.c | 12 +- lustre/osp/osp_precreate.c | 106 ++++- lustre/tests/lustre-rsync-test.sh | 3 + lustre/tests/recovery-small.sh | 4 +- lustre/tests/sanity.sh | 5 + lustre/tests/test-framework.sh | 2 +- 28 files changed, 803 insertions(+), 2018 deletions(-) diff --git a/lustre-iokit/mds-survey/mds-survey b/lustre-iokit/mds-survey/mds-survey index 9c529bf..ffa062e 100755 --- a/lustre-iokit/mds-survey/mds-survey +++ b/lustre-iokit/mds-survey/mds-survey @@ -187,7 +187,7 @@ if (( $stripe_count > 0 )); then for ((i=0; i < $ndevs; i++)); do host=${host_names[$i]} obd=$(remote_shell $host $lctl device_list | awk "{if (\$2 == \"UP\" && - \$3 == \"osc\") { print \$4 } }") + (\$3 == \"osc\" || \$3 == \"osp\")) { print \$4 } }") if [ -z "$obd" ]; then echo "Need obdfilter to test stripe_count" exit 1 diff --git a/lustre/mdd/mdd_device.c b/lustre/mdd/mdd_device.c index 4ef9265..9d33861 100644 --- a/lustre/mdd/mdd_device.c +++ b/lustre/mdd/mdd_device.c @@ -72,27 +72,73 @@ static struct lu_kmem_descr mdd_caches[] = { } }; -static int mdd_device_init(const struct lu_env *env, struct lu_device *d, - const char *name, struct lu_device *next) +static int mdd_connect_to_next(const struct lu_env *env, struct mdd_device *m, + const char *nextdev) { - struct mdd_device *mdd = lu2mdd_dev(d); - int rc; - ENTRY; + struct obd_connect_data *data = NULL; + struct lu_device *lud = mdd2lu_dev(m); + struct obd_device *obd; + int rc; + ENTRY; + + LASSERT(m->mdd_child_exp == NULL); + + OBD_ALLOC(data, sizeof(*data)); + if (data == NULL) + GOTO(out, rc = -ENOMEM); + + obd = class_name2obd(nextdev); + if (obd == NULL) { + CERROR("can't locate next device: %s\n", nextdev); + GOTO(out, rc = -ENOTCONN); + } + + data->ocd_connect_flags = OBD_CONNECT_VERSION; + data->ocd_version = LUSTRE_VERSION_CODE; + + rc = obd_connect(NULL, &m->mdd_child_exp, obd, &obd->obd_uuid, data, NULL); + if (rc) { + CERROR("cannot connect to next dev %s (%d)\n", nextdev, rc); + GOTO(out, rc); + } + + lud->ld_site = m->mdd_child_exp->exp_obd->obd_lu_dev->ld_site; + LASSERT(lud->ld_site); + m->mdd_child = lu2dt_dev(m->mdd_child_exp->exp_obd->obd_lu_dev); + lu_dev_add_linkage(lud->ld_site, lud); - mdd->mdd_child = lu2dt_dev(next); +out: + if (data) + OBD_FREE(data, sizeof(*data)); + RETURN(rc); +} + +static int mdd_init0(const struct lu_env *env, struct mdd_device *mdd, + struct lu_device_type *t, struct lustre_cfg *lcfg) +{ + int rc; + ENTRY; + + mdd->mdd_md_dev.md_lu_dev.ld_ops = &mdd_lu_ops; + mdd->mdd_md_dev.md_ops = &mdd_ops; - /* Prepare transactions callbacks. */ - mdd->mdd_txn_cb.dtc_txn_start = NULL; - mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb; - mdd->mdd_txn_cb.dtc_txn_commit = NULL; - mdd->mdd_txn_cb.dtc_cookie = mdd; - mdd->mdd_txn_cb.dtc_tag = LCT_MD_THREAD; - CFS_INIT_LIST_HEAD(&mdd->mdd_txn_cb.dtc_linkage); - mdd->mdd_atime_diff = MAX_ATIME_DIFF; + rc = mdd_connect_to_next(env, mdd, lustre_cfg_string(lcfg, 3)); + if (rc) + RETURN(rc); + + mdd->mdd_atime_diff = MAX_ATIME_DIFF; /* sync permission changes */ mdd->mdd_sync_permission = 1; - rc = mdd_procfs_init(mdd, name); + dt_conf_get(env, mdd->mdd_child, &mdd->mdd_dt_conf); + + /* we are using service name but not mdd obd name + * for compatibility reasons. + * It is passed from MDT in lustre_cfg[2] buffer */ + rc = mdd_procfs_init(mdd, lustre_cfg_string(lcfg, 2)); + if (rc < 0) + obd_disconnect(mdd->mdd_child_exp); + RETURN(rc); } @@ -102,6 +148,9 @@ static struct lu_device *mdd_device_fini(const struct lu_env *env, struct mdd_device *mdd = lu2mdd_dev(d); int rc; + if (d->ld_site) + lu_dev_del_linkage(d->ld_site, d); + rc = mdd_procfs_fini(mdd); if (rc) { CERROR("proc fini error %d \n", rc); @@ -119,20 +168,22 @@ static void mdd_device_shutdown(const struct lu_env *env, ENTRY; mdd_lfsck_cleanup(env, m); mdd_changelog_fini(env, m); - dt_txn_callback_del(m->mdd_child, &m->mdd_txn_cb); if (m->mdd_dot_lustre_objs.mdd_obf) mdd_object_put(env, m->mdd_dot_lustre_objs.mdd_obf); if (m->mdd_dot_lustre) mdd_object_put(env, m->mdd_dot_lustre); - if (m->mdd_obd_dev) - mdd_fini_obd(env, m, cfg); orph_index_fini(env, m); if (m->mdd_capa != NULL) { lu_object_put(env, &m->mdd_capa->do_lu); m->mdd_capa = NULL; } + lu_site_purge(env, m->mdd_md_dev.md_lu_dev.ld_site, -1); /* remove upcall device*/ md_upcall_fini(&m->mdd_md_dev); + + if (m->mdd_child_exp) + obd_disconnect(m->mdd_child_exp); + EXIT; } @@ -759,17 +810,25 @@ static int obf_xattr_get(const struct lu_env *env, struct md_object *obj, struct lu_buf *buf, const char *name) { + struct mdd_device *mdd = mdo2mdd(obj); + struct mdd_object *root; + struct lu_fid rootfid; int rc = 0; - /* XXX: a temp. solution till LOD/OSP is landed */ + /* + * .lustre returns default striping which is 'stored' + * in the root + */ if (strcmp(name, XATTR_NAME_LOV) == 0) { - if (buf->lb_buf == NULL) { - rc = sizeof(struct lov_user_md); - } else if (buf->lb_len >= sizeof(struct lov_user_md)) { - rc = mdd_get_default_md(md2mdd_obj(obj), buf->lb_buf); - } else { - rc = -ERANGE; - } + rc = dt_root_get(env, mdd->mdd_child, &rootfid); + if (rc) + return rc; + root = mdd_object_find(env, mdd, &rootfid); + if (IS_ERR(root)) + return PTR_ERR(root); + rc = mdo_xattr_get(env, root, buf, name, + mdd_object_capa(env, md2mdd_obj(obj))); + mdd_object_put(env, root); } return rc; @@ -859,14 +918,14 @@ static int obf_lookup(const struct lu_env *env, struct md_object *p, sscanf(name, SFID, RFID(f)); if (!fid_is_sane(f)) { CWARN("%s: bad FID format [%s], should be "DFID"\n", - mdd->mdd_obd_dev->obd_name, lname->ln_name, + mdd2obd_dev(mdd)->obd_name, lname->ln_name, (__u64)FID_SEQ_NORMAL, 1, 0); GOTO(out, rc = -EINVAL); } if (!fid_is_norm(f)) { CWARN("%s: "DFID" is invalid, sequence should be " - ">= "LPX64"\n", mdd->mdd_obd_dev->obd_name, PFID(f), + ">= "LPX64"\n", mdd2obd_dev(mdd)->obd_name, PFID(f), (__u64)FID_SEQ_NORMAL); GOTO(out, rc = -EINVAL); } @@ -1012,17 +1071,13 @@ static int mdd_process_config(const struct lu_env *env, GOTO(out, rc); dt->dd_ops->dt_conf_get(env, dt, &m->mdd_dt_conf); - rc = mdd_init_obd(env, m, cfg); - if (rc) { - CERROR("lov init error %d\n", rc); - GOTO(out, rc); - } - mdd_changelog_init(env, m); break; case LCFG_CLEANUP: + rc = next->ld_ops->ldo_process_config(env, next, cfg); lu_dev_del_linkage(d->ld_site, d); mdd_device_shutdown(env, m, cfg); + break; default: rc = next->ld_ops->ldo_process_config(env, next, cfg); break; @@ -1031,67 +1086,15 @@ out: RETURN(rc); } -#if 0 -static int mdd_lov_set_nextid(const struct lu_env *env, - struct mdd_device *mdd) -{ - struct mds_obd *mds = &mdd->mdd_obd_dev->u.mds; - int rc; - ENTRY; - - LASSERT(mds->mds_lov_objids != NULL); - rc = obd_set_info_async(mds->mds_lov_exp, strlen(KEY_NEXT_ID), - KEY_NEXT_ID, mds->mds_lov_desc.ld_tgt_count, - mds->mds_lov_objids, NULL); - - RETURN(rc); -} - -static int mdd_cleanup_unlink_llog(const struct lu_env *env, - struct mdd_device *mdd) -{ - /* XXX: to be implemented! */ - return 0; -} -#endif - static int mdd_recovery_complete(const struct lu_env *env, struct lu_device *d) { struct mdd_device *mdd = lu2mdd_dev(d); struct lu_device *next = &mdd->mdd_child->dd_lu_dev; - struct obd_device *obd = mdd2obd_dev(mdd); int rc; ENTRY; LASSERT(mdd != NULL); - LASSERT(obd != NULL); -#if 0 - /* XXX: Do we need this in new stack? */ - rc = mdd_lov_set_nextid(env, mdd); - if (rc) { - CERROR("mdd_lov_set_nextid() failed %d\n", - rc); - RETURN(rc); - } - - /* XXX: cleanup unlink. */ - rc = mdd_cleanup_unlink_llog(env, mdd); - if (rc) { - CERROR("mdd_cleanup_unlink_llog() failed %d\n", - rc); - RETURN(rc); - } -#endif - /* Call that with obd_recovering = 1 just to update objids */ - obd_notify(obd->u.mds.mds_lov_obd, NULL, (obd->obd_async_recov ? - OBD_NOTIFY_SYNC_NONBLOCK : OBD_NOTIFY_SYNC), NULL); - - /* Drop obd_recovering to 0 and call o_postrecov to recover mds_lov */ - cfs_spin_lock(&obd->obd_dev_lock); - obd->obd_recovering = 0; - cfs_spin_unlock(&obd->obd_dev_lock); - obd->obd_type->typ_dt_ops->o_postrecov(obd); /* XXX: orphans handling. */ __mdd_orphan_cleanup(env, mdd); @@ -1115,7 +1118,6 @@ static int mdd_prepare(const struct lu_env *env, if (rc) GOTO(out, rc); - dt_txn_callback_add(mdd->mdd_child, &mdd->mdd_txn_cb); root = dt_store_open(env, mdd->mdd_child, "", mdd_root_dir_name, &mdd->mdd_root_fid); if (!IS_ERR(root)) { @@ -1223,15 +1225,8 @@ static int mdd_update_capa_key(const struct lu_env *env, struct md_device *m, struct lustre_capa_key *key) { - struct mds_capa_info info = { .uuid = NULL, .capa = key }; - struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev); - struct obd_export *lov_exp = mdd2obd_dev(mdd)->u.mds.mds_lov_exp; - int rc; - ENTRY; - - rc = obd_set_info_async(env, lov_exp, sizeof(KEY_CAPA_KEY), - KEY_CAPA_KEY, sizeof(info), &info, NULL); - RETURN(rc); + /* we do not support capabilities ... */ + return -EINVAL; } static int mdd_llog_ctxt_get(const struct lu_env *env, struct md_device *m, @@ -1243,6 +1238,18 @@ static int mdd_llog_ctxt_get(const struct lu_env *env, struct md_device *m, return (*h == NULL ? -ENOENT : 0); } +static struct lu_device *mdd_device_free(const struct lu_env *env, + struct lu_device *lu) +{ + struct mdd_device *m = lu2mdd_dev(lu); + ENTRY; + + LASSERT(cfs_atomic_read(&lu->ld_ref) == 0); + md_device_fini(&m->mdd_md_dev); + OBD_FREE_PTR(m); + RETURN(NULL); +} + static struct lu_device *mdd_device_alloc(const struct lu_env *env, struct lu_device_type *t, struct lustre_cfg *lcfg) @@ -1254,30 +1261,86 @@ static struct lu_device *mdd_device_alloc(const struct lu_env *env, if (m == NULL) { l = ERR_PTR(-ENOMEM); } else { - md_device_init(&m->mdd_md_dev, t); + int rc; + l = mdd2lu_dev(m); - l->ld_ops = &mdd_lu_ops; - m->mdd_md_dev.md_ops = &mdd_ops; - md_upcall_init(&m->mdd_md_dev, NULL); + md_device_init(&m->mdd_md_dev, t); + rc = mdd_init0(env, m, t, lcfg); + if (rc != 0) { + mdd_device_free(env, l); + l = ERR_PTR(rc); + } } return l; } -static struct lu_device *mdd_device_free(const struct lu_env *env, - struct lu_device *lu) +/* + * we use exports to track all mdd users + */ +static int mdd_obd_connect(const struct lu_env *env, struct obd_export **exp, + struct obd_device *obd, struct obd_uuid *cluuid, + struct obd_connect_data *data, void *localdata) { - struct mdd_device *m = lu2mdd_dev(lu); - ENTRY; + struct mdd_device *mdd = lu2mdd_dev(obd->obd_lu_dev); + struct lustre_handle conn; + int rc; + ENTRY; - LASSERT(cfs_atomic_read(&lu->ld_ref) == 0); - md_device_fini(&m->mdd_md_dev); - OBD_FREE_PTR(m); - RETURN(NULL); + CDEBUG(D_CONFIG, "connect #%d\n", mdd->mdd_connects); + + rc = class_connect(&conn, obd, cluuid); + if (rc) + RETURN(rc); + + *exp = class_conn2export(&conn); + + /* Why should there ever be more than 1 connect? */ + LASSERT(mdd->mdd_connects == 0); + mdd->mdd_connects++; + + RETURN(0); +} + +/* + * once last export (we don't count self-export) disappeared + * mdd can be released + */ +static int mdd_obd_disconnect(struct obd_export *exp) +{ + struct obd_device *obd = exp->exp_obd; + struct mdd_device *mdd = lu2mdd_dev(obd->obd_lu_dev); + int rc, release = 0; + ENTRY; + + mdd->mdd_connects--; + if (mdd->mdd_connects == 0) + release = 1; + + rc = class_disconnect(exp); + + if (rc == 0 && release) + class_manual_cleanup(obd); + RETURN(rc); +} + +static int mdd_obd_health_check(const struct lu_env *env, + struct obd_device *obd) +{ + struct mdd_device *mdd = lu2mdd_dev(obd->obd_lu_dev); + int rc; + ENTRY; + + LASSERT(mdd); + rc = obd_health_check(env, mdd->mdd_child_exp->exp_obd); + RETURN(rc); } static struct obd_ops mdd_obd_device_ops = { - .o_owner = THIS_MODULE + .o_owner = THIS_MODULE, + .o_connect = mdd_obd_connect, + .o_disconnect = mdd_obd_disconnect, + .o_health_check = mdd_obd_health_check }; /* context key constructor/destructor: mdd_ucred_key_init, mdd_ucred_key_fini */ @@ -1639,7 +1702,6 @@ static struct lu_device_type_operations mdd_device_type_ops = { .ldto_device_alloc = mdd_device_alloc, .ldto_device_free = mdd_device_free, - .ldto_device_init = mdd_device_init, .ldto_device_fini = mdd_device_fini }; diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index ba8d76a..1fb0ffd 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -912,7 +912,7 @@ int mdd_declare_finish_unlink(const struct lu_env *env, if (rc) return rc; - return mdd_declare_object_kill(env, obj, ma, handle); + return mdo_declare_destroy(env, obj, handle); } /* caller should take a lock before calling */ @@ -921,7 +921,6 @@ int mdd_finish_unlink(const struct lu_env *env, struct thandle *th) { int rc = 0; - int reset = 1; int is_dir = S_ISDIR(ma->ma_attr.la_mode); ENTRY; @@ -945,13 +944,9 @@ int mdd_finish_unlink(const struct lu_env *env, PFID(mdd_object_fid(obj)), obj->mod_count); } else { - rc = mdd_object_kill(env, obj, ma, th); - if (rc == 0) - reset = 0; + rc = mdo_destroy(env, obj, th); } } - if (reset) - ma->ma_valid &= ~(MA_LOV | MA_COOKIE); RETURN(rc); } @@ -1140,15 +1135,6 @@ cleanup: stop: mdd_trans_stop(env, mdd, rc, handle); -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 55, 0) - if (rc == 0 && ma->ma_valid & MA_COOKIE && ma->ma_valid & MA_LOV && - ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_UNLINK_DESTROY) - /* Since echo client is incapable of destorying ost object, - * it will destory the object here. */ - rc = mdd_lovobj_unlink(env, mdd, mdd_cobj, la, ma, 1); -#else -#warning "please remove this after 2.4 (LOD/OSP)." -#endif return rc; } @@ -1169,27 +1155,6 @@ static int mdd_cd_sanity_check(const struct lu_env *env, } -static int mdd_declare_create_data(const struct lu_env *env, - struct mdd_device *mdd, - struct mdd_object *obj, - int lmm_size, - struct thandle *handle) -{ - struct lu_buf *buf = &mdd_env_info(env)->mti_buf; - int rc; - - buf->lb_buf = NULL; - buf->lb_len = lmm_size; - rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV, - 0, handle); - if (rc) - return rc; - - rc = mdd_declare_lov_objid_update(env, mdd, handle); - - return rc; -} - static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, struct md_object *cobj, const struct md_op_spec *spec, struct md_attr *ma) @@ -1197,25 +1162,30 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, struct mdd_device *mdd = mdo2mdd(cobj); struct mdd_object *mdd_pobj = md2mdd_obj(pobj); struct mdd_object *son = md2mdd_obj(cobj); - struct lov_mds_md *lmm = NULL; - int lmm_size = 0; struct thandle *handle; - struct lu_attr *attr = &mdd_env_info(env)->mti_la_for_fix; + const struct lu_buf *buf; + struct lu_attr *attr = &mdd_env_info(env)->mti_cattr; int rc; ENTRY; + /* do not let users to create stripes via .lustre/ + * mdd_obf_setup() sets IMMUTE_OBJ on this directory */ + if (pobj && mdd_pobj->mod_flags & IMMUTE_OBJ) + RETURN(-ENOENT); + rc = mdd_cd_sanity_check(env, son); if (rc) RETURN(rc); if (!md_should_create(spec->sp_cr_flags)) RETURN(0); - lmm_size = ma->ma_lmm_size; - - rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, ma); - if (rc) - RETURN(rc); + /* + * there are following use cases for this function: + * 1) late striping - file was created with MDS_OPEN_DELAY_CREATE + * striping can be specified or not + * 2) CMD? + */ rc = mdd_la_get(env, son, attr, mdd_object_capa(env, son)); if (rc) RETURN(rc); @@ -1227,44 +1197,46 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, if (IS_ERR(handle)) GOTO(out_free, rc = PTR_ERR(handle)); - rc = mdd_declare_create_data(env, mdd, son, lmm_size, handle); - if (rc) - GOTO(stop, rc); - - rc = mdd_trans_start(env, mdd, handle); - if (rc) - GOTO(stop, rc); - /* * XXX: Setting the lov ea is not locked but setting the attr is locked? * Should this be fixed? */ + CDEBUG(D_OTHER, "ea %p/%u, cr_flags %Lo, no_create %u\n", + spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen, + spec->sp_cr_flags, spec->no_create); + + if (spec->no_create) { + /* replay case */ + buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata, + spec->u.sp_ea.eadatalen); + } else if (!(spec->sp_cr_flags & MDS_OPEN_HAS_OBJS)) { + if (spec->sp_cr_flags & MDS_OPEN_HAS_EA) { + /* lfs setstripe */ + buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata, + spec->u.sp_ea.eadatalen); + } else { + buf = &LU_BUF_NULL; + } + } else { + /* MDS_OPEN_HAS_OBJS is not used anymore ? */ + LBUG(); + } - /* Replay creates has objects already */ -#if 0 - if (spec->no_create) { - CDEBUG(D_INFO, "we already have lov ea\n"); - rc = mdd_lov_set_md(env, mdd_pobj, son, - (struct lov_mds_md *)spec->u.sp_ea.eadata, - spec->u.sp_ea.eadatalen, handle, 0); - } else -#endif - /* No need mdd_lsm_sanity_check here */ - rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, - lmm_size, handle, 0); + rc = dt_declare_xattr_set(env, mdd_object_child(son), buf, + XATTR_NAME_LOV, 0, handle); + if (rc) + GOTO(stop, rc); - /* update lov_objid data, must be before transaction stop! */ - if (rc == 0) - mdd_lov_objid_update(mdd, lmm); + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(stop, rc); + rc = dt_xattr_set(env, mdd_object_child(son), buf, XATTR_NAME_LOV, + 0, handle, mdd_object_capa(env, son)); stop: - mdd_trans_stop(env, mdd, rc, handle); + mdd_trans_stop(env, mdd, rc, handle); out_free: - /* Finish mdd_lov_create() stuff. */ - /* if no_create == 0 (not replay), we free lmm allocated by - * mdd_lov_create() */ - mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec); - RETURN(rc); + RETURN(rc); } /* Get fid from name and parent */ @@ -1459,13 +1431,12 @@ static int mdd_create_sanity_check(const struct lu_env *env, static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd, struct mdd_object *p, struct mdd_object *c, const struct lu_name *name, - struct lu_attr *attr, int lmm_size, + struct lu_attr *attr, int got_def_acl, struct thandle *handle, const struct md_op_spec *spec) { struct mdd_thread_info *info = mdd_env_info(env); - struct lu_buf *buf = &mdd_env_info(env)->mti_buf; int rc = 0; rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec); @@ -1512,10 +1483,17 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd, if (rc) GOTO(out, rc); - rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, - 0, handle); - if (rc) - GOTO(out, rc); + /* replay case, create LOV EA from client data */ + if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) { + const struct lu_buf *buf; + + buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata, + spec->u.sp_ea.eadatalen); + rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, + 0, handle); + if (rc) + GOTO(out, rc); + } if (S_ISLNK(attr->la_mode)) { rc = dt_declare_record_write(env, mdd_object_child(c), @@ -1533,8 +1511,6 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd, if (rc) return rc; - rc = mdd_declare_lov_objid_update(env, mdd, handle); - out: return rc; } @@ -1553,12 +1529,11 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, struct mdd_object *son = md2mdd_obj(child); struct mdd_device *mdd = mdo2mdd(pobj); struct lu_attr *attr = &ma->ma_attr; - struct lov_mds_md *lmm = NULL; struct thandle *handle; struct lu_attr *pattr = &info->mti_pattr; struct dynlock_handle *dlh; const char *name = lname->ln_name; - int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0; + int rc, created = 0, initialized = 0, inserted = 0; int got_def_acl = 0; ENTRY; @@ -1608,19 +1583,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, RETURN(rc); if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET)) - GOTO(out_pending, rc = -EINPROGRESS); - - /* - * No RPC inside the transaction, so OST objects should be created at - * first. - */ - if (S_ISREG(attr->la_mode)) { - lmm_size = ma->ma_lmm_size; - rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, - spec, ma); - if (rc) - GOTO(out_pending, rc); - } + GOTO(out_free, rc = -EINPROGRESS); if (!S_ISLNK(attr->la_mode)) { struct lu_buf *acl_buf; @@ -1644,7 +1607,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, GOTO(out_free, rc = PTR_ERR(handle)); rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr, - got_def_acl, lmm_size, handle, spec); + got_def_acl, handle, spec); if (rc) GOTO(out_stop, rc); @@ -1680,6 +1643,22 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname, son, attr, handle, spec); + + /* + * in case of replay we just set LOVEA provided by the client + * XXX: I think it would be interesting to try "old" way where + * MDT calls this xattr_set(LOV) in a different transaction. + * probably this way we code can be made better. + */ + if (rc == 0 && + (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) { + const struct lu_buf *buf; + + buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata, + spec->u.sp_ea.eadatalen); + rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle, + BYPASS_CAPA); + } mdd_write_unlock(env, son); if (rc) /* @@ -1698,13 +1677,6 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, inserted = 1; - /* No need mdd_lsm_sanity_check here */ - rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0); - if (rc) { - CERROR("error on stripe info copy %d \n", rc); - GOTO(cleanup, rc); - } - if (S_ISLNK(attr->la_mode)) { struct md_ucred *uc = md_ucred(env); struct dt_object *dt = mdd_object_child(son); @@ -1765,10 +1737,6 @@ cleanup: mdd_write_unlock(env, son); } - /* update lov_objid data, must be before transaction stop! */ - if (rc == 0) - mdd_lov_objid_update(mdd, lmm); - mdd_pdo_write_unlock(env, mdd_pobj, dlh); out_trans: if (rc == 0) @@ -1780,9 +1748,6 @@ out_trans: out_stop: mdd_trans_stop(env, mdd, rc, handle); out_free: - /* finish lov_create stuff, free all temporary data */ - mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec); -out_pending: /* The child object shouldn't be cached anymore */ if (rc) cfs_set_bit(LU_OBJECT_HEARD_BANSHEE, diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index 9ed853c..d8eb869 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -110,13 +110,12 @@ struct md_lfsck { struct mdd_device { struct md_device mdd_md_dev; + struct obd_export *mdd_child_exp; struct dt_device *mdd_child; - struct obd_device *mdd_obd_dev; struct lu_fid mdd_root_fid; struct dt_device_param mdd_dt_conf; struct dt_object *mdd_orphans; /* PENDING directory */ struct dt_object *mdd_capa; - struct dt_txn_callback mdd_txn_cb; cfs_proc_dir_entry_t *mdd_proc_entry; struct lprocfs_stats *mdd_stats; struct mdd_changelog mdd_cl; @@ -125,6 +124,7 @@ struct mdd_device { struct mdd_dot_lustre_objs mdd_dot_lustre_objs; struct md_lfsck mdd_lfsck; unsigned int mdd_sync_permission; + int mdd_connects; }; enum mod_flags { @@ -529,7 +529,7 @@ static inline struct dt_object* mdd_object_child(struct mdd_object *o) static inline struct obd_device *mdd2obd_dev(struct mdd_device *mdd) { - return mdd->mdd_obd_dev; + return (mdd->mdd_md_dev.md_lu_dev.ld_obd); } static inline struct mdd_device *mdd_obj2mdd_dev(struct mdd_object *obj) diff --git a/lustre/mdd/mdd_lfsck.c b/lustre/mdd/mdd_lfsck.c index e3b2d3e..73281ef 100644 --- a/lustre/mdd/mdd_lfsck.c +++ b/lustre/mdd/mdd_lfsck.c @@ -48,7 +48,7 @@ static inline char *mdd_lfsck2name(struct md_lfsck *lfsck) struct mdd_device *mdd; mdd = container_of0(lfsck, struct mdd_device, mdd_lfsck); - return mdd->mdd_obd_dev->obd_name; + return mdd2obd_dev(mdd)->obd_name; } void mdd_lfsck_set_speed(struct md_lfsck *lfsck, __u32 limit) @@ -320,7 +320,7 @@ int mdd_lfsck_setup(const struct lu_env *env, struct mdd_device *mdd) lu_object_put(env, &obj->do_lu); if (rc == -ENOTSUPP) { CERROR("%s: Lustre LFSCK unsupported on this device.\n", - mdd->mdd_obd_dev->obd_name); + mdd2obd_dev(mdd)->obd_name); rc = 0; } return rc; diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index 76b046f..b6a0015 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -56,145 +56,6 @@ #include "mdd_internal.h" -static int mdd_notify(struct obd_device *host, struct obd_device *watched, - enum obd_notify_event ev, void *owner, void *data) -{ - struct mdd_device *mdd = owner; - int rc = 0; - ENTRY; - - LASSERT(owner != NULL); - switch (ev) - { - case OBD_NOTIFY_ACTIVE: - case OBD_NOTIFY_SYNC: - case OBD_NOTIFY_SYNC_NONBLOCK: - rc = md_do_upcall(NULL, &mdd->mdd_md_dev, - MD_LOV_SYNC, data); - break; - case OBD_NOTIFY_CONFIG: - rc = md_do_upcall(NULL, &mdd->mdd_md_dev, - MD_LOV_CONFIG, data); - break; - default: - CDEBUG(D_INFO, "Unhandled notification %#x\n", ev); - } - - RETURN(rc); -} - -/* The obd is created for handling data stack for mdd */ -int mdd_init_obd(const struct lu_env *env, struct mdd_device *mdd, - struct lustre_cfg *cfg) -{ - char *dev = lustre_cfg_string(cfg, 0); - int rc, name_size, uuid_size; - char *name, *uuid; - __u32 mds_id; - struct lustre_cfg_bufs *bufs; - struct lustre_cfg *lcfg; - struct obd_device *obd; - ENTRY; - - mds_id = lu_site2md(mdd2lu_dev(mdd)->ld_site)->ms_node_id; - name_size = strlen(MDD_OBD_NAME) + 35; - uuid_size = strlen(MDD_OBD_UUID) + 35; - - OBD_ALLOC(name, name_size); - OBD_ALLOC(uuid, uuid_size); - if (name == NULL || uuid == NULL) - GOTO(cleanup_mem, rc = -ENOMEM); - - OBD_ALLOC_PTR(bufs); - if (!bufs) - GOTO(cleanup_mem, rc = -ENOMEM); - - snprintf(name, strlen(MDD_OBD_NAME) + 35, "%s-%s", - MDD_OBD_NAME, dev); - - snprintf(uuid, strlen(MDD_OBD_UUID) + 35, "%s-%s", - MDD_OBD_UUID, dev); - - lustre_cfg_bufs_reset(bufs, name); - lustre_cfg_bufs_set_string(bufs, 1, MDD_OBD_TYPE); - lustre_cfg_bufs_set_string(bufs, 2, uuid); - lustre_cfg_bufs_set_string(bufs, 3, (char*)dev/* MDD_OBD_PROFILE */); - lustre_cfg_bufs_set_string(bufs, 4, (char*)dev); - - lcfg = lustre_cfg_new(LCFG_ATTACH, bufs); - OBD_FREE_PTR(bufs); - if (!lcfg) - GOTO(cleanup_mem, rc = -ENOMEM); - - rc = class_attach(lcfg); - if (rc) - GOTO(lcfg_cleanup, rc); - - obd = class_name2obd(name); - if (!obd) { - CERROR("Can not find obd %s\n", MDD_OBD_NAME); - LBUG(); - } - - cfs_spin_lock(&obd->obd_dev_lock); - obd->obd_recovering = 1; - cfs_spin_unlock(&obd->obd_dev_lock); - obd->u.mds.mds_id = mds_id; - obd->u.obt.obt_osd_properties.osd_max_ea_size = - mdd->mdd_dt_conf.ddp_max_ea_size; - - rc = class_setup(obd, lcfg); - if (rc) - GOTO(class_detach, rc); - - /* - * Add here for obd notify mechanism, when adding a new ost, the mds - * will notify this mdd. - */ - obd->obd_upcall.onu_upcall = mdd_notify; - obd->obd_upcall.onu_owner = mdd; - mdd->mdd_obd_dev = obd; - - EXIT; -class_detach: - if (rc) - class_detach(obd, lcfg); -lcfg_cleanup: - lustre_cfg_free(lcfg); -cleanup_mem: - if (name) - OBD_FREE(name, name_size); - if (uuid) - OBD_FREE(uuid, uuid_size); - return rc; -} - -int mdd_fini_obd(const struct lu_env *env, struct mdd_device *mdd, - struct lustre_cfg *lcfg) -{ - struct obd_device *obd; - int rc; - ENTRY; - - obd = mdd2obd_dev(mdd); - LASSERT(obd); - - rc = class_cleanup(obd, lcfg); - if (rc) - GOTO(lcfg_cleanup, rc); - - obd->obd_upcall.onu_upcall = NULL; - obd->obd_upcall.onu_owner = NULL; - rc = class_detach(obd, lcfg); - if (rc) - GOTO(lcfg_cleanup, rc); - mdd->mdd_obd_dev = NULL; - - EXIT; -lcfg_cleanup: - return rc; -} - int mdd_get_md(const struct lu_env *env, struct mdd_object *obj, void *md, int *md_size, const char *name) { @@ -221,673 +82,3 @@ int mdd_get_md(const struct lu_env *env, struct mdd_object *obj, RETURN(rc); } -int mdd_get_md_locked(const struct lu_env *env, struct mdd_object *obj, - void *md, int *md_size, const char *name) -{ - int rc = 0; - mdd_read_lock(env, obj, MOR_TGT_CHILD); - rc = mdd_get_md(env, obj, md, md_size, name); - mdd_read_unlock(env, obj); - return rc; -} - -static int mdd_lov_set_stripe_md(const struct lu_env *env, - struct mdd_object *obj, struct lu_buf *buf, - struct thandle *handle) -{ - struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); - struct obd_device *obd = mdd2obd_dev(mdd); - struct obd_export *lov_exp = obd->u.mds.mds_lov_exp; - struct lov_stripe_md *lsm = NULL; - int rc; - ENTRY; - - LASSERT(S_ISDIR(mdd_object_type(obj)) || S_ISREG(mdd_object_type(obj))); - rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp, 0, - &lsm, buf->lb_buf); - if (rc) - RETURN(rc); - obd_free_memmd(lov_exp, &lsm); - - rc = mdd_xattr_set_txn(env, obj, buf, XATTR_NAME_LOV, 0, handle); - - CDEBUG(D_INFO, "set lov ea of "DFID" rc %d \n", PFID(mdo2fid(obj)), rc); - RETURN(rc); -} - -/* - * Permission check is done before call it, - * no need check again. - */ -static int mdd_lov_set_dir_md(const struct lu_env *env, - struct mdd_object *obj, struct lu_buf *buf, - struct thandle *handle) -{ - struct lov_user_md *lum = NULL; - int rc = 0; - ENTRY; - - LASSERT(S_ISDIR(mdd_object_type(obj))); - lum = (struct lov_user_md*)buf->lb_buf; - - /* if { size, offset, count } = { 0, -1, 0 } and no pool - * (i.e. all default values specified) then delete default - * striping from dir. */ - if (LOVEA_DELETE_VALUES(lum->lmm_stripe_size, lum->lmm_stripe_count, - lum->lmm_stripe_offset) && - lum->lmm_magic != LOV_USER_MAGIC_V3) { - rc = mdd_xattr_set_txn(env, obj, &LU_BUF_NULL, - XATTR_NAME_LOV, 0, handle); - if (rc == -ENODATA) - rc = 0; - CDEBUG(D_INFO, "delete lov ea of "DFID" rc %d \n", - PFID(mdo2fid(obj)), rc); - } else { - rc = mdd_lov_set_stripe_md(env, obj, buf, handle); - } - RETURN(rc); -} - -int mdd_lsm_sanity_check(const struct lu_env *env, struct mdd_object *obj) -{ - struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; - struct md_ucred *uc = md_ucred(env); - int rc; - ENTRY; - - rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA); - if (rc) - RETURN(rc); - - if ((uc->mu_fsuid != tmp_la->la_uid) && - !mdd_capable(uc, CFS_CAP_FOWNER)) - rc = mdd_permission_internal_locked(env, obj, tmp_la, - MAY_WRITE, MOR_TGT_CHILD); - - RETURN(rc); -} - -int mdd_lov_set_md(const struct lu_env *env, struct mdd_object *pobj, - struct mdd_object *child, struct lov_mds_md *lmmp, - int lmm_size, struct thandle *handle, int set_stripe) -{ - struct lu_buf *buf; - cfs_umode_t mode; - int rc = 0; - ENTRY; - - buf = mdd_buf_get(env, lmmp, lmm_size); - mode = mdd_object_type(child); - if (S_ISREG(mode) && lmm_size > 0) { - if (set_stripe) { - rc = mdd_lov_set_stripe_md(env, child, buf, handle); - } else { - rc = mdd_xattr_set_txn(env, child, buf, - XATTR_NAME_LOV, 0, handle); - } - } else if (S_ISDIR(mode)) { - if (lmmp == NULL && lmm_size == 0) { - struct mdd_device *mdd = mdd_obj2mdd_dev(child); - struct lov_mds_md *lmm = mdd_max_lmm_get(env, mdd); - int size = sizeof(struct lov_mds_md_v3); - - /* Get parent dir stripe and set */ - if (pobj != NULL) - rc = mdd_get_md_locked(env, pobj, lmm, &size, - XATTR_NAME_LOV); - if (rc > 0) { - buf = mdd_buf_get(env, lmm, size); - rc = mdd_xattr_set_txn(env, child, buf, - XATTR_NAME_LOV, 0, - handle); - if (rc) - CERROR("error on copy stripe info: rc " - "= %d\n", rc); - } - } else { - LASSERT(lmmp != NULL && lmm_size > 0); - rc = mdd_lov_set_dir_md(env, child, buf, handle); - } - } - CDEBUG(D_INFO, "Set lov md %p size %d for fid "DFID" rc %d\n", - lmmp, lmm_size, PFID(mdo2fid(child)), rc); - RETURN(rc); -} - -int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm) -{ - /* copy mds_lov code is using wrong layer */ - return mds_lov_prepare_objids(mdd->mdd_obd_dev, lmm); -} - -int mdd_declare_lov_objid_update(const struct lu_env *env, - struct mdd_device *mdd, - struct thandle *handle) -{ - struct obd_device *obd = mdd2obd_dev(mdd); - int size; - - /* in prepare we create local files */ - if (unlikely(mdd->mdd_capa == NULL)) - return 0; - - /* XXX: this is a temporary solution to declare llog changes - * will be fixed in 2.3 with new llog implementation */ - - size = obd->u.mds.mds_lov_desc.ld_tgt_count * sizeof(obd_id); - return dt_declare_record_write(env, mdd->mdd_capa, size, 0, handle); -} - -void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm) -{ - /* copy mds_lov code is using wrong layer */ - mds_lov_update_objids(mdd->mdd_obd_dev, lmm); -} - -void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd, - struct lov_mds_md *lmm, int lmm_size, - const struct md_op_spec *spec) -{ - if (lmm && !spec->no_create) - OBD_FREE_LARGE(lmm, lmm_size); -} - -int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, - struct mdd_object *parent, struct mdd_object *child, - struct lov_mds_md **lmm, int *lmm_size, - const struct md_op_spec *spec, struct md_attr *ma) -{ - struct obd_device *obd = mdd2obd_dev(mdd); - struct obd_export *lov_exp = obd->u.mds.mds_lov_exp; - struct lu_site *site = mdd2lu_dev(mdd)->ld_site; - struct obdo *oa; - struct lov_stripe_md *lsm = NULL; - const void *eadata = spec->u.sp_ea.eadata; - __u64 create_flags = spec->sp_cr_flags; - struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti; - struct lu_attr *la = &ma->ma_attr; - int rc = 0; - ENTRY; - - if (!md_should_create(create_flags)) { - *lmm_size = 0; - RETURN(0); - } - oti_init(oti, NULL); - - /* replay case, has objects already, only get lov from eadata */ - if (spec->no_create != 0) { - *lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata; - *lmm_size = spec->u.sp_ea.eadatalen; - if (*lmm_size == lov_mds_md_size((*lmm)->lmm_stripe_count, - (*lmm)->lmm_magic)) { - RETURN(0); - } else { - CERROR("incorrect lsm received during recovery\n"); - RETURN(-EPROTO); - } - } - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) - GOTO(out_ids, rc = -ENOMEM); - - LASSERT(lov_exp != NULL); - oa = &mdd_env_info(env)->mti_oa; - - oa->o_uid = 0; /* must have 0 uid / gid on OST */ - oa->o_gid = 0; - oa->o_seq = mdt_to_obd_objseq(lu_site2md(site)->ms_node_id); - oa->o_mode = S_IFREG | 0600; - oa->o_id = fid_ver_oid(mdd_object_fid(child)); - oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS | - OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP; - oa->o_size = 0; - - if (!(create_flags & MDS_OPEN_HAS_OBJS)) { - if (create_flags & MDS_OPEN_HAS_EA) { - LASSERT(eadata != NULL); - rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp, - 0, &lsm, (void*)eadata); - if (rc) - GOTO(out_oti, rc); - } else { - /* get lov ea from parent and set to lov */ - struct lov_mds_md *_lmm; - int _lmm_size = mdd_lov_mdsize(env, mdd); - - LASSERT(parent != NULL); - - /* - * can not create child's lov_mds_md by access it - * thru .lustre path - */ - if (mdd_object_obf(parent)) - GOTO(out_oti, rc = -EBADFD); - - _lmm = mdd_max_lmm_get(env, mdd); - if (_lmm == NULL) - GOTO(out_oti, rc = -ENOMEM); - - rc = mdd_get_md_locked(env, parent, _lmm, - &_lmm_size, - XATTR_NAME_LOV); - if (rc > 0) { - _lmm_size = mdd_lov_mdsize(env, mdd); - rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, - lov_exp, _lmm_size, - &lsm, _lmm); - } - if (rc) - GOTO(out_oti, rc); - } - - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_OPEN_WAIT_CREATE, 10); - rc = obd_create(env, lov_exp, oa, &lsm, oti); - if (rc) { - if (rc > 0) { - CERROR("Create error for "DFID": %d\n", - PFID(mdo2fid(child)), rc); - rc = -EIO; - } - GOTO(out_oti, rc); - } - - if (ma->ma_valid & MA_LAY_GEN) - /* If we already have a lsm, the file is not new and we - * are about to change the layout, so we have to bump - * the generation. It is worth noting that old versions - * will be confused by a non-zero gen, that's why - * OBD_INCOMPAT_LMM_VER has been introduced */ - lsm->lsm_layout_gen = ma->ma_layout_gen + 1; - else - /* Start with a null generation for backward - * compatiblity with old versions */ - lsm->lsm_layout_gen = 0; - - LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq); - } else { - LASSERT(eadata != NULL); - rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm, - (void*)eadata); - if (rc) - GOTO(out_oti, rc); - - if (ma->ma_valid & MA_LAY_GEN) - lsm->lsm_layout_gen = ma->ma_layout_gen; - else - lsm->lsm_layout_gen = 0; - } - - lsm->lsm_object_id = fid_ver_oid(mdd_object_fid(child)); - lsm->lsm_object_seq = fid_seq(mdd_object_fid(child)); - /* - * Sometimes, we may truncate some object(without lsm) then open it - * (with write flags), so creating lsm above. The Nonzero(truncated) - * size should tell ost, since size attr is in charge by OST. - */ - if (la->la_size && la->la_valid & LA_SIZE) { - struct obd_info *oinfo = &mdd_env_info(env)->mti_oi; - - memset(oinfo, 0, sizeof(*oinfo)); - - /* When setting attr to ost, FLBKSZ is not needed. */ - oa->o_valid &= ~OBD_MD_FLBLKSZ; - obdo_from_la(oa, la, LA_TYPE | LA_ATIME | LA_MTIME | - LA_CTIME | LA_SIZE); - /* - * XXX: Pack lustre id to OST, in OST, it will be packed by - * filter_fid, but can not see what is the usages. So just pack - * o_seq o_ver here, maybe fix it after this cycle. - */ - obdo_set_parent_fid(oa, mdd_object_fid(child)); - oinfo->oi_oa = oa; - oinfo->oi_md = lsm; - oinfo->oi_capa = NULL; - oinfo->oi_policy.l_extent.start = la->la_size; - oinfo->oi_policy.l_extent.end = OBD_OBJECT_EOF; - - rc = obd_punch_rqset(lov_exp, oinfo, oti); - if (rc) { - CERROR("Error setting attrs for "DFID": rc %d\n", - PFID(mdo2fid(child)), rc); - if (rc > 0) { - CERROR("obd_setattr for "DFID" rc %d\n", - PFID(mdo2fid(child)), rc); - rc = -EIO; - } - GOTO(out_oti, rc); - } - } - /* blksize should be changed after create data object */ - la->la_valid |= LA_BLKSIZE; - la->la_blksize = oa->o_blksize; - *lmm = NULL; - rc = obd_packmd(lov_exp, lmm, lsm); - if (rc < 0) { - CERROR("Cannot pack lsm, err = %d\n", rc); - GOTO(out_oti, rc); - } - if (mdd_lov_objid_prepare(mdd, *lmm) != 0) { - CERROR("Not have memory for update objid\n"); - OBD_FREE(*lmm, rc); - *lmm = NULL; - GOTO(out_oti, rc = -ENOMEM); - } - *lmm_size = rc; - rc = 0; - EXIT; -out_oti: - oti_free_cookies(oti); -out_ids: - if (lsm) - obd_free_memmd(lov_exp, &lsm); - - return rc; -} - -/* - * used when destroying orphans and from mds_reint_unlink() when MDS wants to - * destroy objects on OSS. - */ -int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd, - struct mdd_object *obj, struct lu_attr *la, - struct md_attr *ma, int log_unlink) -{ - struct obd_device *obd = mdd2obd_dev(mdd); - struct obd_export *lov_exp = obd->u.mds.mds_lov_exp; - struct lov_stripe_md *lsm = NULL; - struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti; - struct obdo *oa = &mdd_env_info(env)->mti_oa; - struct lu_site *site = mdd2lu_dev(mdd)->ld_site; - struct lov_mds_md *lmm = ma->ma_lmm; - int lmm_size = ma->ma_lmm_size; - struct llog_cookie *logcookies = ma->ma_cookie; - int rc; - ENTRY; - - if (lmm_size == 0) - RETURN(0); - - rc = obd_unpackmd(lov_exp, &lsm, lmm, lmm_size); - if (rc < 0) { - CERROR("Error unpack md %p\n", lmm); - RETURN(rc); - } else { - LASSERT(rc >= sizeof(*lsm)); - rc = 0; - } - - oa->o_id = lsm->lsm_object_id; - oa->o_seq = mdt_to_obd_objseq(lu_site2md(site)->ms_node_id); - oa->o_mode = la->la_mode & S_IFMT; - oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP; - - oti_init(oti, NULL); - if (log_unlink && logcookies) { - oa->o_valid |= OBD_MD_FLCOOKIE; - oti->oti_logcookies = logcookies; - } - - if (!(ma->ma_attr_flags & MDS_UNLINK_DESTROY)) - oa->o_flags = OBD_FL_DELORPHAN; - - CDEBUG(D_INFO, "destroying OSS object "LPU64":"LPU64"\n", oa->o_seq, - oa->o_id); - - rc = obd_destroy(env, lov_exp, oa, lsm, oti, NULL, NULL); - - obd_free_memmd(lov_exp, &lsm); - RETURN(rc); -} - -/* - * called with obj locked. - */ -int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd, - struct mdd_object *obj, struct lu_attr *la) -{ - struct md_attr *ma = &mdd_env_info(env)->mti_ma; - int rc; - ENTRY; - - LASSERT(mdd_write_locked(env, obj) != 0); - - if (unlikely(!S_ISREG(mdd_object_type(obj)))) - RETURN(0); - - if (unlikely(la->la_nlink != 0)) { - CWARN("Attempt to destroy OSS object when nlink == %d\n", - la->la_nlink); - RETURN(0); - } - - ma->ma_lmm_size = mdd_lov_mdsize(env, mdd); - ma->ma_lmm = mdd_max_lmm_get(env, mdd); - ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd); - ma->ma_cookie = mdd_max_cookie_get(env, mdd); - if (ma->ma_lmm == NULL || ma->ma_cookie == NULL) - RETURN(rc = -ENOMEM); - - /* get lov ea */ - - rc = mdd_get_md(env, obj, ma->ma_lmm, &ma->ma_lmm_size, - XATTR_NAME_LOV); - - if (rc <= 0) { - CWARN("Get lov ea failed for "DFID" rc = %d\n", - PFID(mdo2fid(obj)), rc); - if (rc == 0) - rc = -ENOENT; - RETURN(rc); - } - - ma->ma_valid = MA_LOV; - - rc = mdd_unlink_log(env, mdd, obj, ma); - if (rc) { - CWARN("mds unlink log for "DFID" failed: %d\n", - PFID(mdo2fid(obj)), rc); - RETURN(rc); - } - - if (ma->ma_valid & MA_COOKIE) - rc = mdd_lovobj_unlink(env, mdd, obj, la, ma, 1); - - RETURN(rc); -} - -int mdd_declare_unlink_log(const struct lu_env *env, struct mdd_object *obj, - struct md_attr *ma, struct thandle *handle) -{ - struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); - int rc, i; - __u16 stripe; - - LASSERT(obj); - LASSERT(ma); - - if (!S_ISREG(lu_object_attr(&obj->mod_obj.mo_lu))) - return 0; - - rc = mdd_lmm_get_locked(env, obj, ma); - if (rc || !(ma->ma_valid & MA_LOV)) - return rc; - - LASSERT(ma->ma_lmm); - if (le32_to_cpu(ma->ma_lmm->lmm_magic) != LOV_MAGIC_V1 && - le32_to_cpu(ma->ma_lmm->lmm_magic) != LOV_MAGIC_V3) { - CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n", - mdd->mdd_obd_dev->obd_name, - le32_to_cpu(ma->ma_lmm->lmm_magic), - PFID(lu_object_fid(&obj->mod_obj.mo_lu))); - return -EINVAL; - } - - stripe = le16_to_cpu(ma->ma_lmm->lmm_stripe_count); - if (stripe == LOV_ALL_STRIPES); - stripe = mdd2obd_dev(mdd)->u.mds.mds_lov_desc.ld_tgt_count; - - for (i = 0; i < stripe; i++) { - rc = mdd_declare_llog_record(env, mdd, - sizeof(struct llog_unlink_rec), - handle); - if (rc) - return rc; - } - - return rc; -} - -int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd, - struct mdd_object *mdd_cobj, struct md_attr *ma) -{ - LASSERT(ma->ma_valid & MA_LOV); - - if ((ma->ma_cookie_size > 0) && - (mds_log_op_unlink(mdd2obd_dev(mdd), ma->ma_lmm, ma->ma_lmm_size, - ma->ma_cookie, ma->ma_cookie_size) > 0)) { - CDEBUG(D_HA, "DEBUG: unlink log is added for object "DFID"\n", - PFID(mdd_object_fid(mdd_cobj))); - ma->ma_valid |= MA_COOKIE; - } - return 0; -} - -int mdd_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid, - struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies, int cookies_size) -{ - struct mds_obd *mds = &obd->u.mds; - struct lov_stripe_md *lsm = NULL; - struct llog_setattr64_rec *lsr; - struct llog_ctxt *ctxt; - int rc; - ENTRY; - - if (IS_ERR(mds->mds_lov_obd)) - RETURN(PTR_ERR(mds->mds_lov_obd)); - - rc = obd_unpackmd(mds->mds_lov_exp, &lsm, lmm, lmm_size); - if (rc < 0) - RETURN(rc); - - OBD_ALLOC(lsr, sizeof(*lsr)); - if (!lsr) - GOTO(out, rc = -ENOMEM); - - /* prepare setattr log record */ - lsr->lsr_hdr.lrh_len = lsr->lsr_tail.lrt_len = sizeof(*lsr); - lsr->lsr_hdr.lrh_type = MDS_SETATTR64_REC; - lsr->lsr_uid = uid; - lsr->lsr_gid = gid; - - /* write setattr log */ - ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); - rc = llog_obd_add(NULL, ctxt, &lsr->lsr_hdr, lsm, logcookies, - cookies_size / sizeof(struct llog_cookie)); - - llog_ctxt_put(ctxt); - - OBD_FREE(lsr, sizeof(*lsr)); - out: - obd_free_memmd(mds->mds_lov_exp, &lsm); - RETURN(rc); -} - -int mdd_setattr_log(const struct lu_env *env, struct mdd_device *mdd, - const struct md_attr *ma, - struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies, int cookies_size) -{ - struct obd_device *obd = mdd2obd_dev(mdd); - - /* journal chown/chgrp in llog, just like unlink */ - if (lmm_size > 0) { - CDEBUG(D_INFO, "setattr llog for uid/gid=%lu/%lu\n", - (unsigned long)ma->ma_attr.la_uid, - (unsigned long)ma->ma_attr.la_gid); - return mdd_log_op_setattr(obd, ma->ma_attr.la_uid, - ma->ma_attr.la_gid, lmm, - lmm_size, logcookies, - cookies_size); - } else - return 0; -} - -static int mdd_osc_setattr_async(struct obd_device *obd, __u32 uid, __u32 gid, - struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies, const struct lu_fid *parent, - struct obd_capa *oc) -{ - struct mds_obd *mds = &obd->u.mds; - struct obd_trans_info oti = { 0 }; - struct obd_info oinfo = { { { 0 } } }; - int rc; - ENTRY; - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OST_SETATTR)) - RETURN(0); - - /* first get memory EA */ - OBDO_ALLOC(oinfo.oi_oa); - if (!oinfo.oi_oa) - RETURN(-ENOMEM); - - LASSERT(lmm); - - rc = obd_unpackmd(mds->mds_lov_exp, &oinfo.oi_md, lmm, lmm_size); - if (rc < 0) { - CERROR("Error unpack md %p for obj "DFID"\n", lmm, - PFID(parent)); - GOTO(out, rc); - } - - /* then fill oa */ - oinfo.oi_oa->o_uid = uid; - oinfo.oi_oa->o_gid = gid; - oinfo.oi_oa->o_id = oinfo.oi_md->lsm_object_id; - oinfo.oi_oa->o_seq = oinfo.oi_md->lsm_object_seq; - oinfo.oi_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP | - OBD_MD_FLUID | OBD_MD_FLGID; - if (logcookies) { - oinfo.oi_oa->o_valid |= OBD_MD_FLCOOKIE; - oti.oti_logcookies = logcookies; - } - - obdo_set_parent_fid(oinfo.oi_oa, parent); - oinfo.oi_capa = oc; - - /* do async setattr from mds to ost not waiting for responses. */ - rc = obd_setattr_async(mds->mds_lov_exp, &oinfo, &oti, NULL); - if (rc) - CDEBUG(D_INODE, "mds to ost setattr objid 0x"LPX64 - " on ost error %d\n", oinfo.oi_md->lsm_object_id, rc); -out: - if (oinfo.oi_md) - obd_free_memmd(mds->mds_lov_exp, &oinfo.oi_md); - OBDO_FREE(oinfo.oi_oa); - RETURN(rc); -} - -int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj, - struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies) -{ - struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); - struct obd_device *obd = mdd2obd_dev(mdd); - struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; - const struct lu_fid *fid = mdd_object_fid(obj); - int rc = 0; - ENTRY; - - mdd_read_lock(env, obj, MOR_TGT_CHILD); - rc = mdo_attr_get(env, obj, tmp_la, mdd_object_capa(env, obj)); - mdd_read_unlock(env, obj); - if (rc) - RETURN(rc); - - rc = mdd_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm, - lmm_size, logcookies, fid, NULL); - RETURN(rc); -} - diff --git a/lustre/mdd/mdd_lproc.c b/lustre/mdd/mdd_lproc.c index 47560b8..1ca8cfc 100644 --- a/lustre/mdd/mdd_lproc.c +++ b/lustre/mdd/mdd_lproc.c @@ -55,12 +55,13 @@ static const char *mdd_counter_names[LPROC_MDD_NR] = { int mdd_procfs_init(struct mdd_device *mdd, const char *name) { struct lprocfs_static_vars lvars; - struct lu_device *ld = &mdd->mdd_md_dev.md_lu_dev; struct obd_type *type; int rc; ENTRY; - type = ld->ld_type->ldt_obd_type; + /* at the moment there is no linkage between lu_type + * and obd_type, so we lookup obd_type this way */ + type = class_search_type(LUSTRE_MDD_NAME); LASSERT(name != NULL); LASSERT(type != NULL); diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index ba4b81a..f9596ee 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -180,61 +180,6 @@ int mdd_buf_grow(const struct lu_env *env, ssize_t len) return 0; } -struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env, - struct mdd_device *mdd) -{ - struct mdd_thread_info *mti = mdd_env_info(env); - int max_cookie_size; - - max_cookie_size = mdd_lov_cookiesize(env, mdd); - if (unlikely(mti->mti_max_cookie_size < max_cookie_size)) { - if (mti->mti_max_cookie) - OBD_FREE_LARGE(mti->mti_max_cookie, - mti->mti_max_cookie_size); - mti->mti_max_cookie = NULL; - mti->mti_max_cookie_size = 0; - } - if (unlikely(mti->mti_max_cookie == NULL)) { - OBD_ALLOC_LARGE(mti->mti_max_cookie, max_cookie_size); - if (likely(mti->mti_max_cookie != NULL)) - mti->mti_max_cookie_size = max_cookie_size; - } - if (likely(mti->mti_max_cookie != NULL)) - memset(mti->mti_max_cookie, 0, mti->mti_max_cookie_size); - return mti->mti_max_cookie; -} - -struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size) -{ - struct mdd_thread_info *mti = mdd_env_info(env); - - if (unlikely(mti->mti_max_lmm_size < size)) { - int rsize = size_roundup_power2(size); - - if (mti->mti_max_lmm_size > 0) { - LASSERT(mti->mti_max_lmm); - OBD_FREE_LARGE(mti->mti_max_lmm, - mti->mti_max_lmm_size); - mti->mti_max_lmm = NULL; - mti->mti_max_lmm_size = 0; - } - - OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize); - if (likely(mti->mti_max_lmm != NULL)) - mti->mti_max_lmm_size = rsize; - } - return mti->mti_max_lmm; -} - -struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env, - struct mdd_device *mdd) -{ - int max_lmm_size; - - max_lmm_size = mdd_lov_mdsize(env, mdd); - return mdd_max_lmm_buffer(env, max_lmm_size); -} - struct lu_object *mdd_object_alloc(const struct lu_env *env, const struct lu_object_header *hdr, struct lu_device *d) @@ -571,145 +516,17 @@ int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj) RETURN(rc); } -/* get only inode attributes */ -int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj, - struct md_attr *ma) -{ - int rc = 0; - ENTRY; - - if (ma->ma_valid & MA_INODE) - RETURN(0); - - rc = mdd_la_get(env, mdd_obj, &ma->ma_attr, - mdd_object_capa(env, mdd_obj)); - if (rc == 0) - ma->ma_valid |= MA_INODE; - RETURN(rc); -} - -int mdd_get_default_md(struct mdd_object *mdd_obj, struct lov_mds_md *lmm) -{ - struct lov_desc *ldesc; - struct mdd_device *mdd = mdo2mdd(&mdd_obj->mod_obj); - struct lov_user_md *lum = (struct lov_user_md*)lmm; - ENTRY; - - if (!lum) - RETURN(0); - - ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc; - LASSERT(ldesc != NULL); - - lum->lmm_magic = LOV_MAGIC_V1; - lum->lmm_object_seq = FID_SEQ_LOV_DEFAULT; - lum->lmm_pattern = ldesc->ld_pattern; - lum->lmm_stripe_size = ldesc->ld_default_stripe_size; - lum->lmm_stripe_count = ldesc->ld_default_stripe_count; - lum->lmm_stripe_offset = ldesc->ld_default_stripe_offset; - - RETURN(sizeof(*lum)); -} - -static int is_rootdir(struct mdd_object *mdd_obj) -{ - const struct mdd_device *mdd_dev = mdd_obj2mdd_dev(mdd_obj); - const struct lu_fid *fid = mdo2fid(mdd_obj); - - return lu_fid_eq(&mdd_dev->mdd_root_fid, fid); -} - -int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj, - struct md_attr *ma) -{ - struct mdd_thread_info *info = mdd_env_info(env); - int size; - int rc = -EINVAL; - ENTRY; - - LASSERT(info != NULL); - LASSERT(ma->ma_big_lmm_used == 0); - - if (ma->ma_lmm_size == 0) { - CERROR("No buffer to hold %s xattr of object "DFID"\n", - XATTR_NAME_LOV, PFID(mdd_object_fid(obj))); - RETURN(rc); - } - - rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV, - mdd_object_capa(env, obj)); - if (rc < 0) - RETURN(rc); - - /* big_lmm may need to grow */ - size = rc; - mdd_max_lmm_buffer(env, size); - if (info->mti_max_lmm == NULL) - RETURN(-ENOMEM); - - LASSERT(info->mti_max_lmm_size >= size); - rc = mdd_get_md(env, obj, info->mti_max_lmm, &size, - XATTR_NAME_LOV); - if (rc < 0) - RETURN(rc); - - ma->ma_big_lmm_used = 1; - ma->ma_valid |= MA_LOV; - ma->ma_lmm = info->mti_max_lmm; - ma->ma_lmm_size = size; - LASSERT(size == rc); - RETURN(rc); -} - -/* get lov EA only */ -static int __mdd_lmm_get(const struct lu_env *env, - struct mdd_object *mdd_obj, struct md_attr *ma) -{ - int rc; - ENTRY; - - if (ma->ma_valid & MA_LOV) - RETURN(0); - - rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size, - XATTR_NAME_LOV); - if (rc == -ERANGE) - rc = mdd_big_lmm_get(env, mdd_obj, ma); - else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj)) - rc = mdd_get_default_md(mdd_obj, ma->ma_lmm); - - if (rc > 0) { - ma->ma_lmm_size = rc; - ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen; - ma->ma_valid |= MA_LOV | MA_LAY_GEN; - rc = 0; - } - RETURN(rc); -} - -int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj, - struct md_attr *ma) -{ - int rc; - ENTRY; - - mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD); - rc = __mdd_lmm_get(env, mdd_obj, ma); - mdd_read_unlock(env, mdd_obj); - RETURN(rc); -} - /* * No permission check is needed. */ int mdd_attr_get(const struct lu_env *env, struct md_object *obj, struct md_attr *ma) { - struct mdd_object *mdd_obj = md2mdd_obj(obj); - int rc; - + int rc; ENTRY; - rc = mdd_iattr_get(env, mdd_obj, ma); + + return mdd_la_get(env, md2mdd_obj(obj), &ma->ma_attr, + mdd_object_capa(env, md2mdd_obj(obj))); RETURN(rc); } @@ -721,9 +538,6 @@ static int mdd_xattr_get(const struct lu_env *env, const char *name) { struct mdd_object *mdd_obj = md2mdd_obj(obj); - struct mdd_device *mdd = mdo2mdd(obj); - struct lu_fid rootfid; - int is_root; int rc; ENTRY; @@ -739,20 +553,6 @@ static int mdd_xattr_get(const struct lu_env *env, mdd_object_capa(env, mdd_obj)); mdd_read_unlock(env, mdd_obj); - dt_root_get(env, mdd->mdd_child, &rootfid); - is_root = lu_fid_eq(mdd_object_fid(mdd_obj), &rootfid); - - /* XXX: a temp. solution till LOD/OSP is landed */ - if (rc == -ENODATA && strcmp(name, XATTR_NAME_LOV) == 0 && is_root) { - if (buf->lb_buf == NULL) { - rc = sizeof(struct lov_user_md); - } else if (buf->lb_len >= sizeof(struct lov_user_md)) { - rc = mdd_get_default_md(mdd_obj, buf->lb_buf); - } else { - rc = -ERANGE; - } - } - RETURN(rc); } @@ -813,12 +613,22 @@ int mdd_declare_object_create_internal(const struct lu_env *env, int rc; ENTRY; - if (feat != &dt_directory_features && feat != NULL) + if (feat != &dt_directory_features && feat != NULL) { dof->dof_type = DFT_INDEX; - else - dof->dof_type = dt_mode_to_dft(attr->la_mode); + dof->u.dof_idx.di_feat = feat; - dof->u.dof_idx.di_feat = feat; + } else { + dof->dof_type = dt_mode_to_dft(attr->la_mode); + if (dof->dof_type == DFT_REGULAR) { + dof->u.dof_reg.striped = + md_should_create(spec->sp_cr_flags); + if (spec->sp_cr_flags & MDS_OPEN_HAS_EA) + dof->u.dof_reg.striped = 0; + /* is this replay? */ + if (spec->no_create) + dof->u.dof_reg.striped = 0; + } + } rc = mdo_declare_create_obj(env, c, attr, NULL, dof, handle); @@ -832,27 +642,16 @@ int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p, { struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint; struct dt_object_format *dof = &mdd_env_info(env)->mti_dof; - const struct dt_index_features *feat = spec->sp_feat; int rc; ENTRY; - if (!mdd_object_exists(c)) { - struct dt_object *next = mdd_object_child(c); - LASSERT(next); - - if (feat != &dt_directory_features && feat != NULL) - dof->dof_type = DFT_INDEX; - else - dof->dof_type = dt_mode_to_dft(attr->la_mode); + LASSERT(!mdd_object_exists(c)); - dof->u.dof_idx.di_feat = feat; + rc = mdo_create_obj(env, c, attr, hint, dof, handle); - rc = mdo_create_obj(env, c, attr, hint, dof, handle); - LASSERT(ergo(rc == 0, mdd_object_exists(c))); - } else - rc = -EEXIST; + LASSERT(ergo(rc == 0, mdd_object_exists(c))); - RETURN(rc); + RETURN(rc); } /** @@ -911,40 +710,6 @@ int mdd_attr_check_set_internal(const struct lu_env *env, RETURN(rc); } -int mdd_attr_check_set_internal_locked(const struct lu_env *env, - struct mdd_object *obj, - struct lu_attr *attr, - struct thandle *handle, - int needacl) -{ - int rc; - ENTRY; - - needacl = needacl && (attr->la_valid & LA_MODE); - if (needacl) - mdd_write_lock(env, obj, MOR_TGT_CHILD); - rc = mdd_attr_check_set_internal(env, obj, attr, handle, needacl); - if (needacl) - mdd_write_unlock(env, obj); - RETURN(rc); -} - -int __mdd_xattr_set(const struct lu_env *env, struct mdd_object *obj, - const struct lu_buf *buf, const char *name, - int fl, struct thandle *handle) -{ - struct lustre_capa *capa = mdd_object_capa(env, obj); - int rc = -EINVAL; - ENTRY; - - if (buf->lb_buf && buf->lb_len > 0) - rc = mdo_xattr_set(env, obj, buf, name, 0, handle, capa); - else if (buf->lb_buf == NULL && buf->lb_len == 0) - rc = mdo_xattr_del(env, obj, name, handle, capa); - - RETURN(rc); -} - /* * This gives the same functionality as the code between * sys_chmod and inode_setattr @@ -1270,82 +1035,12 @@ stop: } /** - * Should be called with write lock held. - * - * \see mdd_lma_set_locked(). - */ -static int __mdd_lma_set(const struct lu_env *env, struct mdd_object *mdd_obj, - const struct md_attr *ma, struct thandle *handle) -{ - struct mdd_thread_info *info = mdd_env_info(env); - struct lu_buf *buf; - struct lustre_mdt_attrs *lma = - (struct lustre_mdt_attrs *) info->mti_xattr_buf; - int lmasize = sizeof(struct lustre_mdt_attrs); - int rc = 0; - - ENTRY; - - /* Either HSM or SOM part is not valid, we need to read it before */ - if ((!ma->ma_valid) & (MA_HSM | MA_SOM)) { - rc = mdd_get_md(env, mdd_obj, lma, &lmasize, XATTR_NAME_LMA); - if (rc <= 0) - RETURN(rc); - - lustre_lma_swab(lma); - } else { - memset(lma, 0, lmasize); - } - - /* Copy HSM data */ - if (ma->ma_valid & MA_HSM) { - lma->lma_flags |= ma->ma_hsm.mh_flags & HSM_FLAGS_MASK; - lma->lma_compat |= LMAC_HSM; - } - - /* Copy SOM data */ - if (ma->ma_valid & MA_SOM) { - LASSERT(ma->ma_som != NULL); - if (ma->ma_som->msd_ioepoch == IOEPOCH_INVAL) { - lma->lma_compat &= ~LMAC_SOM; - } else { - lma->lma_compat |= LMAC_SOM; - lma->lma_ioepoch = ma->ma_som->msd_ioepoch; - lma->lma_som_size = ma->ma_som->msd_size; - lma->lma_som_blocks = ma->ma_som->msd_blocks; - lma->lma_som_mountid = ma->ma_som->msd_mountid; - } - } - - /* Copy FID */ - memcpy(&lma->lma_self_fid, mdo2fid(mdd_obj), sizeof(lma->lma_self_fid)); - - lustre_lma_swab(lma); - buf = mdd_buf_get(env, lma, lmasize); - rc = __mdd_xattr_set(env, mdd_obj, buf, XATTR_NAME_LMA, 0, handle); - - RETURN(rc); -} - -/** * Save LMA extended attributes with data from \a ma. * * HSM and Size-On-MDS data will be extracted from \ma if they are valid, if * not, LMA EA will be first read from disk, modified and write back. * */ -static int mdd_lma_set_locked(const struct lu_env *env, - struct mdd_object *mdd_obj, - const struct md_attr *ma, struct thandle *handle) -{ - int rc; - - mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); - rc = __mdd_lma_set(env, mdd_obj, ma, handle); - mdd_write_unlock(env, mdd_obj); - return rc; -} - /* Precedence for choosing record type when multiple * attributes change: setattr > mtime > ctime > atime * (ctime changes when mtime does, plus chmod/chown. @@ -1379,13 +1074,10 @@ static int mdd_attr_set_changelog(const struct lu_env *env, static int mdd_declare_attr_set(const struct lu_env *env, struct mdd_device *mdd, struct mdd_object *obj, - const struct md_attr *ma, - struct lov_mds_md *lmm, + const struct lu_attr *attr, struct thandle *handle) { - struct lu_buf *buf = &mdd_env_info(env)->mti_buf; - struct lu_attr *attr = (struct lu_attr *) &ma->ma_attr; - int rc, i; + int rc; rc = mdo_declare_attr_set(env, obj, attr, handle); if (rc) @@ -1395,24 +1087,6 @@ static int mdd_declare_attr_set(const struct lu_env *env, if (rc) return rc; - if (ma->ma_valid & MA_LOV) { - buf->lb_buf = NULL; - buf->lb_len = ma->ma_lmm_size; - rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LOV, - 0, handle); - if (rc) - return rc; - } - - if (ma->ma_valid & (MA_HSM | MA_SOM)) { - buf->lb_buf = NULL; - buf->lb_len = sizeof(struct lustre_mdt_attrs); - rc = mdo_declare_xattr_set(env, obj, buf, XATTR_NAME_LMA, - 0, handle); - if (rc) - return rc; - } - #ifdef CONFIG_FS_POSIX_ACL if (attr->la_valid & LA_MODE) { mdd_read_lock(env, obj, MOR_TGT_CHILD); @@ -1435,37 +1109,6 @@ static int mdd_declare_attr_set(const struct lu_env *env, } #endif - /* basically the log is the same as in unlink case */ - if (lmm) { - __u16 stripe; - - if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V1 && - le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_V3) { - CERROR("%s: invalid LOV_MAGIC %08x on object "DFID"\n", - mdd->mdd_obd_dev->obd_name, - le32_to_cpu(lmm->lmm_magic), - PFID(lu_object_fid(&obj->mod_obj.mo_lu))); - return -EINVAL; - } - - stripe = le16_to_cpu(lmm->lmm_stripe_count); - if (stripe == LOV_ALL_STRIPES) { - struct lov_desc *ldesc; - - ldesc = &mdd->mdd_obd_dev->u.mds.mds_lov_desc; - LASSERT(ldesc != NULL); - stripe = ldesc->ld_tgt_count; - } - - for (i = 0; i < stripe; i++) { - rc = mdd_declare_llog_record(env, mdd, - sizeof(struct llog_unlink_rec), - handle); - if (rc) - return rc; - } - } - return rc; } @@ -1476,13 +1119,16 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj, struct mdd_object *mdd_obj = md2mdd_obj(obj); struct mdd_device *mdd = mdo2mdd(obj); struct thandle *handle; - struct lov_mds_md *lmm = NULL; - struct llog_cookie *logcookies = NULL; - int rc, lmm_size = 0, cookie_size = 0; struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; const struct lu_attr *la = &ma->ma_attr; + int rc; ENTRY; + /* we do not use ->attr_set() for LOV/SOM/HSM EA any more */ + LASSERT((ma->ma_valid & MA_LOV) == 0); + LASSERT((ma->ma_valid & MA_HSM) == 0); + LASSERT((ma->ma_valid & MA_SOM) == 0); + *la_copy = ma->ma_attr; rc = mdd_fix_attr(env, mdd_obj, la_copy, ma->ma_attr_flags); if (rc) @@ -1492,26 +1138,11 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj, if (la->la_valid == LA_ATIME && la_copy->la_valid == 0) RETURN(0); - if (S_ISREG(mdd_object_type(mdd_obj)) && - ma->ma_attr.la_valid & (LA_UID | LA_GID)) { - lmm_size = mdd_lov_mdsize(env, mdd); - lmm = mdd_max_lmm_get(env, mdd); - if (lmm == NULL) - RETURN(-ENOMEM); - - rc = mdd_get_md_locked(env, mdd_obj, lmm, &lmm_size, - XATTR_NAME_LOV); - - if (rc < 0) - RETURN(rc); - } - handle = mdd_trans_create(env, mdd); if (IS_ERR(handle)) RETURN(PTR_ERR(handle)); - rc = mdd_declare_attr_set(env, mdd, mdd_obj, ma, - lmm_size > 0 ? lmm : NULL, handle); + rc = mdd_declare_attr_set(env, mdd, mdd_obj, la, handle); if (rc) GOTO(stop, rc); @@ -1533,66 +1164,13 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj, mdd_flags_xlate(mdd_obj, la_copy->la_flags); } else if (la_copy->la_valid) { /* setattr */ rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1); - /* journal chown/chgrp in llog, just like unlink */ - if (rc == 0 && lmm_size){ - cookie_size = mdd_lov_cookiesize(env, mdd); - logcookies = mdd_max_cookie_get(env, mdd); - if (logcookies == NULL) - GOTO(cleanup, rc = -ENOMEM); - - if (mdd_setattr_log(env, mdd, ma, lmm, lmm_size, - logcookies, cookie_size) <= 0) - logcookies = NULL; - } } - if (rc == 0 && ma->ma_valid & MA_LOV) { - cfs_umode_t mode; - - mode = mdd_object_type(mdd_obj); - if (S_ISREG(mode) || S_ISDIR(mode)) { - rc = mdd_lsm_sanity_check(env, mdd_obj); - if (rc) - GOTO(cleanup, rc); - - rc = mdd_lov_set_md(env, NULL, mdd_obj, ma->ma_lmm, - ma->ma_lmm_size, handle, 1); - } - - } - if (rc == 0 && ma->ma_valid & (MA_HSM | MA_SOM)) { - cfs_umode_t mode; - - mode = mdd_object_type(mdd_obj); - if (S_ISREG(mode)) - rc = mdd_lma_set_locked(env, mdd_obj, ma, handle); - - } -cleanup: if (rc == 0) rc = mdd_attr_set_changelog(env, obj, handle, - ma->ma_attr.la_valid); + la->la_valid); stop: mdd_trans_stop(env, mdd, rc, handle); - if (rc == 0 && (lmm != NULL && lmm_size > 0 )) { - /*set obd attr, if needed*/ - rc = mdd_lov_setattr_async(env, mdd_obj, lmm, lmm_size, - logcookies); - } - RETURN(rc); -} - -int mdd_xattr_set_txn(const struct lu_env *env, struct mdd_object *obj, - const struct lu_buf *buf, const char *name, int fl, - struct thandle *handle) -{ - int rc; - ENTRY; - - mdd_write_lock(env, obj, MOR_TGT_CHILD); - rc = __mdd_xattr_set(env, obj, buf, name, fl, handle); - mdd_write_unlock(env, obj); - RETURN(rc); } @@ -1889,12 +1467,6 @@ static int mdd_open(const struct lu_env *env, struct md_object *obj, int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj, struct md_attr *ma, struct thandle *handle) { - int rc; - - rc = mdd_declare_unlink_log(env, obj, ma, handle); - if (rc) - return rc; - return mdo_declare_destroy(env, obj, handle); } @@ -1903,20 +1475,10 @@ int mdd_declare_object_kill(const struct lu_env *env, struct mdd_object *obj, int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj, struct md_attr *ma, struct thandle *handle) { - int rc = 0; + int rc; ENTRY; - if (S_ISREG(mdd_object_type(obj))) { - /* Return LOV & COOKIES unconditionally here. We clean evth up. - * Caller must be ready for that. */ - rc = __mdd_lmm_get(env, obj, ma); - if ((ma->ma_valid & MA_LOV)) - rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj), - obj, ma); - } - - if (rc == 0) - rc = mdo_destroy(env, obj, handle); + rc = mdo_destroy(env, obj, handle); RETURN(rc); } @@ -1932,7 +1494,7 @@ static int mdd_declare_close(const struct lu_env *env, if (rc) return rc; - return mdd_declare_object_kill(env, obj, ma, handle); + return mdo_declare_destroy(env, obj, handle); } /* @@ -1944,8 +1506,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, struct mdd_object *mdd_obj = md2mdd_obj(obj); struct mdd_device *mdd = mdo2mdd(obj); struct thandle *handle = NULL; - int rc; - int is_orphan = 0, reset = 1; + int rc, is_orphan = 0; ENTRY; if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) { @@ -2014,35 +1575,26 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, /* Object maybe not in orphan list originally, it is rare case for * mdd_finish_unlink() failure. */ if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) { - /* MDS_CLOSE_CLEANUP means destroy OSS objects by MDS. */ - if (ma->ma_valid & MA_FLAGS && - ma->ma_attr_flags & MDS_CLOSE_CLEANUP) { - rc = mdd_lov_destroy(env, mdd, mdd_obj, &ma->ma_attr); - } else { - if (handle == NULL) { - handle = mdd_trans_create(env, mdo2mdd(obj)); - if (IS_ERR(handle)) - GOTO(out, rc = PTR_ERR(handle)); - - rc = mdd_declare_object_kill(env, mdd_obj, ma, - handle); - if (rc) - GOTO(out, rc); - - rc = mdd_declare_changelog_store(env, mdd, - NULL, handle); - if (rc) - GOTO(stop, rc); - - rc = mdd_trans_start(env, mdo2mdd(obj), handle); - if (rc) - GOTO(out, rc); - } + if (handle == NULL) { + handle = mdd_trans_create(env, mdo2mdd(obj)); + if (IS_ERR(handle)) + GOTO(out, rc = PTR_ERR(handle)); + + rc = mdo_declare_destroy(env, mdd_obj, handle); + if (rc) + GOTO(out, rc); + + rc = mdd_declare_changelog_store(env, mdd, + NULL, handle); + if (rc) + GOTO(stop, rc); + + rc = mdd_trans_start(env, mdo2mdd(obj), handle); + if (rc) + GOTO(out, rc); + } - rc = mdd_object_kill(env, mdd_obj, ma, handle); - if (rc == 0) - reset = 0; - } + rc = mdo_destroy(env, mdd_obj, handle); if (rc != 0) CERROR("Error when prepare to delete Object "DFID" , " @@ -2052,8 +1604,6 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj, EXIT; out: - if (reset) - ma->ma_valid &= ~(MA_LOV | MA_COOKIE); mdd_write_unlock(env, mdd_obj); diff --git a/lustre/mdd/mdd_orphans.c b/lustre/mdd/mdd_orphans.c index 68f633b..5188691 100644 --- a/lustre/mdd/mdd_orphans.c +++ b/lustre/mdd/mdd_orphans.c @@ -159,9 +159,12 @@ int orph_declare_index_insert(const struct lu_env *env, struct thandle *th) { struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); + struct dt_key *key; int rc; - rc = dt_declare_insert(env, mdd->mdd_orphans, NULL, NULL, th); + key = orph_key_fill(env, mdo2fid(obj), ORPH_OP_UNLINK); + + rc = dt_declare_insert(env, mdd->mdd_orphans, NULL, key, th); if (rc) return rc; @@ -240,50 +243,18 @@ out: RETURN(rc); } -/** - * Destroy OSD object on MDD and associated OST objects. - * - * \param obj orphan object - * \param mdd used for sending llog msg to osts - * - * \retval 0 success - * \retval -ve error - */ -static int orphan_object_kill(const struct lu_env *env, - struct mdd_object *obj, - struct mdd_device *mdd, - struct thandle *th) -{ - struct lu_attr *la = &mdd_env_info(env)->mti_la; - int rc = 0; - ENTRY; - - /* No need to lock this object as its recovery phase, and - * no other thread can access it. But we need to lock it - * as its precondition for osd api we using. */ - - mdo_ref_del(env, obj, th); - if (S_ISDIR(mdd_object_type(obj))) { - mdo_ref_del(env, obj, th); - mdd_orphan_ref_del(env, mdd, th); - } else { - /* regular file , cleanup linked ost objects */ - rc = mdd_la_get(env, obj, la, BYPASS_CAPA); - if (rc == 0) - rc = mdd_lov_destroy(env, mdd, obj, la); - } - mdo_destroy(env, obj, th); - RETURN(rc); -} - int orph_declare_index_delete(const struct lu_env *env, struct mdd_object *obj, + struct thandle *th) { struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); + struct dt_key *key; int rc; - rc = dt_declare_delete(env, mdd->mdd_orphans, NULL, th); + key = orph_key_fill(env, mdo2fid(obj), ORPH_OP_UNLINK); + + rc = dt_declare_delete(env, mdd->mdd_orphans, key, th); if (rc) return rc; @@ -353,18 +324,9 @@ static int orphan_object_destroy(const struct lu_env *env, { struct thandle *th = NULL; struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); - struct md_attr *ma = &mdd_env_info(env)->mti_ma; int rc = 0; ENTRY; - /* init ma */ - ma->ma_lmm_size = mdd_lov_mdsize(env, mdd); - ma->ma_lmm = mdd_max_lmm_get(env, mdd); - ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd); - ma->ma_cookie = mdd_max_cookie_get(env, mdd); - ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE; - ma->ma_valid = 0; - th = mdd_trans_create(env, mdd); if (IS_ERR(th)) { CERROR("Cannot get thandle\n"); @@ -374,7 +336,7 @@ static int orphan_object_destroy(const struct lu_env *env, if (rc) GOTO(stop, rc); - rc = mdd_declare_object_kill(env, obj, ma, th); + rc = mdo_declare_destroy(env, obj, th); if (rc) GOTO(stop, rc); @@ -386,9 +348,14 @@ static int orphan_object_destroy(const struct lu_env *env, if (likely(obj->mod_count == 0)) { mdd_orphan_write_lock(env, mdd); rc = mdd_orphan_delete_obj(env, mdd, key, th); - if (rc == 0) - orphan_object_kill(env, obj, mdd, th); - else + if (rc == 0) { + mdo_ref_del(env, obj, th); + if (S_ISDIR(mdd_object_type(obj))) { + mdo_ref_del(env, obj, th); + mdd_orphan_ref_del(env, mdd, th); + } + rc = mdo_destroy(env, obj, th); + } else CERROR("could not delete object: rc = %d\n",rc); mdd_orphan_write_unlock(env, mdd); } @@ -430,7 +397,7 @@ static int orph_key_test_and_del(const struct lu_env *env, if (rc) /* so replay-single.sh test_37 works */ CERROR("%s: error unlinking orphan "DFID" from " "PENDING: rc = %d\n", - mdd->mdd_obd_dev->obd_name, PFID(lf), rc); + mdd2obd_dev(mdd)->obd_name, PFID(lf), rc); } else { mdd_write_lock(env, mdo, MOR_TGT_CHILD); if (likely(mdo->mod_count > 0)) { @@ -476,7 +443,7 @@ static int orph_index_iterate(const struct lu_env *env, if (IS_ERR(it)) { rc = PTR_ERR(it); CERROR("%s: cannot clean PENDING: rc = %d\n", - mdd->mdd_obd_dev->obd_name, rc); + mdd2obd_dev(mdd)->obd_name, rc); GOTO(out, rc); } @@ -485,7 +452,7 @@ static int orph_index_iterate(const struct lu_env *env, GOTO(out_put, rc); if (rc == 0) { CERROR("%s: error loading iterator to clean PENDING\n", - mdd->mdd_obd_dev->obd_name); + mdd2obd_dev(mdd)->obd_name); /* Index contains no zero key? */ GOTO(out_put, rc = -EIO); } @@ -499,14 +466,14 @@ static int orph_index_iterate(const struct lu_env *env, rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH); if (rc != 0) { CERROR("%s: fail to get FID for orphan it: rc = %d\n", - mdd->mdd_obd_dev->obd_name, rc); + mdd2obd_dev(mdd)->obd_name, rc); goto next; } fid_le_to_cpu(&fid, &ent->lde_fid); if (!fid_is_sane(&fid)) { CERROR("%s: bad FID "DFID" cleaning PENDING\n", - mdd->mdd_obd_dev->obd_name, PFID(&fid)); + mdd2obd_dev(mdd)->obd_name, PFID(&fid)); goto next; } diff --git a/lustre/mdd/mdd_trans.c b/lustre/mdd/mdd_trans.c index 3e74980..8da825a 100644 --- a/lustre/mdd/mdd_trans.c +++ b/lustre/mdd/mdd_trans.c @@ -48,16 +48,6 @@ #include "mdd_internal.h" -int mdd_txn_stop_cb(const struct lu_env *env, struct thandle *txn, - void *cookie) -{ - struct mdd_device *mdd = cookie; - struct obd_device *obd = mdd2obd_dev(mdd); - - LASSERT(obd); - return mds_lov_write_objids(obd); -} - struct thandle *mdd_trans_create(const struct lu_env *env, struct mdd_device *mdd) { diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index c523462..0afd419 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -871,7 +871,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, } if (reqbody->valid & OBD_MD_FLMODEASIZE) { - repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize; + repbody->max_cookiesize = 0; repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize; repbody->valid |= OBD_MD_FLMODEASIZE; CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & " @@ -1833,9 +1833,9 @@ static int mdt_reint_internal(struct mdt_thread_info *info, req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, info->mti_rr.rr_eadatalen); + /* llog cookies are always 0, the field is kept for compatibility */ if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, - info->mti_mdt->mdt_max_cookiesize); + req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0); rc = req_capsule_server_pack(pill); if (rc != 0) { @@ -2286,8 +2286,12 @@ static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt, rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt); if (rc || ctxt == NULL) { - CERROR("Can't get mdd ctxt %d\n", rc); - return rc; + /* XXX: no support for changelogs yet - in another patch */ + /*CERROR("Can't get mdd ctxt %d\n", rc);*/ +#if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 90, 0) +#error "do not forget about changelogs" +#endif + return 0; } rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx); @@ -2892,8 +2896,8 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags) req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, info->mti_body->eadatasize); if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, - info->mti_mdt->mdt_max_cookiesize); + req_capsule_set_size(pill, &RMF_LOGCOOKIES, + RCL_SERVER, 0); rc = req_capsule_server_pack(pill); } @@ -4637,8 +4641,11 @@ static void mdt_stack_fini(const struct lu_env *env, lu_dev_del_linkage(top->ld_site, top); + lu_site_purge(env, top->ld_site, -1); + bufs = &info->mti_u.bufs; /* process cleanup, pass mdt obd name to get obd umount flags */ + /* another purpose is to let all layers to release their objects */ lustre_cfg_bufs_reset(bufs, obd->obd_name); if (obd->obd_force) strcat(flags, "F"); @@ -4654,72 +4661,16 @@ static void mdt_stack_fini(const struct lu_env *env, top->ld_ops->ldo_process_config(env, top, lcfg); lustre_cfg_free(lcfg); - lu_stack_fini(env, top); + lu_site_purge(env, top->ld_site, -1); + m->mdt_child = NULL; m->mdt_bottom = NULL; - obd_disconnect(m->mdt_bottom_exp); -} - -static struct lu_device *mdt_layer_setup(struct lu_env *env, - const char *typename, - struct lu_device *child, - struct lustre_cfg *cfg) -{ - const char *dev = lustre_cfg_string(cfg, 0); - struct obd_type *type; - struct lu_device_type *ldt; - struct lu_device *d; - int rc; - ENTRY; - - /* find the type */ - type = class_get_type(typename); - if (!type) { - CERROR("Unknown type: '%s'\n", typename); - GOTO(out, rc = -ENODEV); - } - - rc = lu_env_refill((struct lu_env *)env); - if (rc != 0) { - CERROR("Failure to refill session: '%d'\n", rc); - GOTO(out_type, rc); - } - - ldt = type->typ_lu; - if (ldt == NULL) { - CERROR("type: '%s'\n", typename); - GOTO(out_type, rc = -EINVAL); - } - - ldt->ldt_obd_type = type; - d = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg); - if (IS_ERR(d)) { - CERROR("Cannot allocate device: '%s'\n", typename); - GOTO(out_type, rc = -ENODEV); - } - - LASSERT(child->ld_site); - d->ld_site = child->ld_site; + obd_disconnect(m->mdt_child_exp); + m->mdt_child_exp = NULL; - type->typ_refcnt++; - rc = ldt->ldt_ops->ldto_device_init(env, d, dev, child); - if (rc) { - CERROR("can't init device '%s', rc %d\n", typename, rc); - GOTO(out_alloc, rc); - } - lu_device_get(d); - lu_ref_add(&d->ld_reference, "lu-stack", &lu_site_init); - - lu_dev_add_linkage(d->ld_site, d); - RETURN(d); -out_alloc: - ldt->ldt_ops->ldto_device_free(env, d); - type->typ_refcnt--; -out_type: - class_put_type(type); -out: - return ERR_PTR(rc); + obd_disconnect(m->mdt_bottom_exp); + m->mdt_child_exp = NULL; } static int mdt_connect_to_next(const struct lu_env *env, struct mdt_device *m, @@ -4757,87 +4708,147 @@ out: RETURN(rc); } -static int mdt_stack_init(struct lu_env *env, - struct mdt_device *m, - struct lustre_cfg *cfg, - struct lustre_mount_info *lmi) +static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt, + struct lustre_cfg *cfg) { - struct lu_device *d = &m->mdt_md_dev.md_lu_dev; - struct lu_device *tmp; - struct md_device *md; - struct lu_device *child_lu_dev; - char *osdname; - int rc; + char *dev = lustre_cfg_string(cfg, 0); + int rc, name_size, uuid_size; + char *name, *uuid, *p; + struct lustre_cfg_bufs *bufs; + struct lustre_cfg *lcfg; + struct obd_device *obd; + struct lustre_profile *lprof; + struct lu_site *site; ENTRY; - /* find bottom osd */ - OBD_ALLOC(osdname, MTI_NAME_MAXLEN); - if (osdname == NULL) - RETURN(-ENOMEM); + /* in 1.8 we had the only device in the stack - MDS. + * 2.0 introduces MDT, MDD, OSD; MDT starts others internally. + * in 2.3 OSD is instantiated by obd_mount.c, so we need + * to generate names and setup MDT, MDD. MDT will be using + * generated name to connect to MDD. for MDD the next device + * will be LOD with name taken from so called "profile" which + * is generated by mount_option line + * + * 1.8 MGS generates config. commands like this: + * #06 (104)mount_option 0: 1:lustre-MDT0000 2:lustre-mdtlov + * #08 (120)setup 0:lustre-MDT0000 1:dev 2:type 3:lustre-MDT0000 + * 2.0 MGS generates config. commands like this: + * #07 (112)mount_option 0: 1:lustre-MDT0000 2:lustre-MDT0000-mdtlov + * #08 (160)setup 0:lustre-MDT0000 1:lustre-MDT0000_UUID 2:0 + * 3:lustre-MDT0000-mdtlov 4:f + * + * we generate MDD name from MDT one, just replacing T with D + * + * after all the preparations, the logical equivalent will be + * #01 (160)setup 0:lustre-MDD0000 1:lustre-MDD0000_UUID 2:0 + * 3:lustre-MDT0000-mdtlov 4:f + * #02 (160)setup 0:lustre-MDT0000 1:lustre-MDT0000_UUID 2:0 + * 3:lustre-MDD0000 4:f + * + * notice we build the stack from down to top: MDD first, then MDT */ + + name_size = MAX_OBD_NAME; + uuid_size = MAX_OBD_NAME; + + OBD_ALLOC(name, name_size); + OBD_ALLOC(uuid, uuid_size); + if (name == NULL || uuid == NULL) + GOTO(cleanup_mem, rc = -ENOMEM); + + OBD_ALLOC_PTR(bufs); + if (!bufs) + GOTO(cleanup_mem, rc = -ENOMEM); + + strcpy(name, dev); + p = strstr(name, "-MDT"); + if (p == NULL) + GOTO(cleanup_mem, rc = -ENOMEM); + p[3] = 'D'; + + snprintf(uuid, MAX_OBD_NAME, "%s_UUID", name); + + lprof = class_get_profile(lustre_cfg_string(cfg, 0)); + if (lprof == NULL || lprof->lp_dt == NULL) { + CERROR("can't find the profile: %s\n", + lustre_cfg_string(cfg, 0)); + GOTO(cleanup_mem, rc = -EINVAL); + } + + lustre_cfg_bufs_reset(bufs, name); + lustre_cfg_bufs_set_string(bufs, 1, LUSTRE_MDD_NAME); + lustre_cfg_bufs_set_string(bufs, 2, uuid); + lustre_cfg_bufs_set_string(bufs, 3, lprof->lp_dt); + + lcfg = lustre_cfg_new(LCFG_ATTACH, bufs); + if (!lcfg) + GOTO(free_bufs, rc = -ENOMEM); - snprintf(osdname, MTI_NAME_MAXLEN, "%s-osd", lustre_cfg_string(cfg, 0)); - rc = mdt_connect_to_next(env, m, osdname, &m->mdt_bottom_exp); - OBD_FREE(osdname, MTI_NAME_MAXLEN); + rc = class_attach(lcfg); if (rc) - RETURN(rc); + GOTO(lcfg_cleanup, rc); - tmp = m->mdt_bottom_exp->exp_obd->obd_lu_dev; - LASSERT(tmp); - m->mdt_bottom = lu2dt_dev(tmp); + obd = class_name2obd(name); + if (!obd) { + CERROR("Can not find obd %s (%s in config)\n", + MDD_OBD_NAME, lustre_cfg_string(cfg, 0)); + GOTO(class_detach, rc = -EINVAL); + } - /* initialize site's pointers: md_site, top device */ - d->ld_site = tmp->ld_site; - d->ld_site->ls_top_dev = d; - m->mdt_mite.ms_lu = tmp->ld_site; - tmp->ld_site->ld_md_site = &m->mdt_mite; - LASSERT(d->ld_site); - d = tmp; + lustre_cfg_free(lcfg); - tmp = mdt_layer_setup(env, LUSTRE_MDD_NAME, d, cfg); - if (IS_ERR(tmp)) { - GOTO(out, rc = PTR_ERR(tmp)); - } - d = tmp; - md = lu2md_dev(d); + lustre_cfg_bufs_reset(bufs, name); + lustre_cfg_bufs_set_string(bufs, 1, uuid); + lustre_cfg_bufs_set_string(bufs, 2, dev); + lustre_cfg_bufs_set_string(bufs, 3, lprof->lp_dt); - tmp = mdt_layer_setup(env, LUSTRE_CMM_NAME, d, cfg); - if (IS_ERR(tmp)) { - GOTO(out, rc = PTR_ERR(tmp)); - } - d = tmp; - /*set mdd upcall device*/ - md_upcall_dev_set(md, lu2md_dev(d)); + lcfg = lustre_cfg_new(LCFG_SETUP, bufs); - md = lu2md_dev(d); - /*set cmm upcall device*/ - md_upcall_dev_set(md, &m->mdt_md_dev); + rc = class_setup(obd, lcfg); + if (rc) + GOTO(class_detach, rc); - m->mdt_child = lu2md_dev(d); + /* connect to MDD we just setup */ + rc = mdt_connect_to_next(env, mdt, name, &mdt->mdt_child_exp); + if (rc) + RETURN(rc); - /* process setup config */ - tmp = &m->mdt_md_dev.md_lu_dev; - rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg); - if (rc) - GOTO(out, rc); + site = mdt->mdt_child_exp->exp_obd->obd_lu_dev->ld_site; + LASSERT(site); + LASSERT(mdt->mdt_md_dev.md_lu_dev.ld_site == NULL); + mdt->mdt_md_dev.md_lu_dev.ld_site = site; + site->ls_top_dev = &mdt->mdt_md_dev.md_lu_dev; + mdt->mdt_child = lu2md_dev(mdt->mdt_child_exp->exp_obd->obd_lu_dev); - /* initialize local objects */ - child_lu_dev = &m->mdt_child->md_lu_dev; - rc = child_lu_dev->ld_ops->ldo_prepare(env, - &m->mdt_md_dev.md_lu_dev, - child_lu_dev); + /* now connect to bottom OSD */ + snprintf(name, MAX_OBD_NAME, "%s-osd", dev); + rc = mdt_connect_to_next(env, mdt, name, &mdt->mdt_bottom_exp); if (rc) - GOTO(out, rc); + RETURN(rc); + mdt->mdt_bottom = + lu2dt_dev(mdt->mdt_bottom_exp->exp_obd->obd_lu_dev); - rc = m->mdt_child->md_ops->mdo_root_get(env, m->mdt_child, - &m->mdt_md_root_fid); -out: - /* fini from last known good lu_device */ - if (rc) - mdt_stack_fini(env, m, d); + rc = lu_env_refill((struct lu_env *)env); + if (rc != 0) + CERROR("Failure to refill session: '%d'\n", rc); - return rc; + lu_dev_add_linkage(site, &mdt->mdt_md_dev.md_lu_dev); + + EXIT; +class_detach: + if (rc) + class_detach(obd, lcfg); +lcfg_cleanup: + lustre_cfg_free(lcfg); +free_bufs: + OBD_FREE_PTR(bufs); +cleanup_mem: + if (name) + OBD_FREE(name, name_size); + if (uuid) + OBD_FREE(uuid, uuid_size); + RETURN(rc); } /** @@ -5012,7 +5023,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, LASSERT(obd != NULL); m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */ - m->mdt_max_cookiesize = sizeof(struct llog_cookie); m->mdt_som_conf = 0; @@ -5059,7 +5069,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, obd->obd_lu_dev = &m->mdt_md_dev.md_lu_dev; /* init the stack */ - rc = mdt_stack_init((struct lu_env *)env, m, cfg, lmi); + rc = mdt_stack_init((struct lu_env *)env, m, cfg); if (rc) { CERROR("Can't init device stack, rc %d\n", rc); RETURN(rc); @@ -5067,6 +5077,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, s = m->mdt_md_dev.md_lu_dev.ld_site; mite = &m->mdt_mite; + s->ld_md_site = mite; /* set server index */ mite->ms_node_id = node_id; @@ -5091,14 +5102,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, if (rc) GOTO(err_fini_stack, rc); - rc = mdt_fld_init(env, obd->obd_name, m); - if (rc) - GOTO(err_lut, rc); - - rc = mdt_seq_init(env, obd->obd_name, m); - if (rc) - GOTO(err_fini_fld, rc); - snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name, LUSTRE_MDT_NAME"-%p", m); m->mdt_namespace = ldlm_namespace_new(obd, info->mti_u.ns_name, @@ -5165,8 +5168,6 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, GOTO(err_llog_cleanup, rc); } - target_recovery_init(&m->mdt_lut, mdt_recovery_handle); - rc = mdt_procfs_init(m, dev); if (rc) { CERROR("Can't init MDT lprocfs, rc %d\n", rc); @@ -5179,8 +5180,9 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, ping_evictor_start(); - if (obd->obd_recovering == 0) - mdt_postrecov(env, m); + /* recovery will be started upon mdt_prepare() + * when the whole stack is complete and ready + * to serve the requests */ mdt_init_capa_ctxt(env, m); @@ -5213,9 +5215,7 @@ err_free_ns: obd->obd_namespace = m->mdt_namespace = NULL; err_fini_seq: mdt_seq_fini(env, m); -err_fini_fld: mdt_fld_fini(env, m); -err_lut: lut_fini(env, &m->mdt_lut); err_fini_stack: mdt_stack_fini(env, m, md2lu_dev(m->mdt_child)); @@ -5386,9 +5386,54 @@ static int mdt_object_print(const struct lu_env *env, void *cookie, mdto->mot_ioepoch_count, mdto->mot_writecount); } +static int mdt_prepare(const struct lu_env *env, + struct lu_device *pdev, + struct lu_device *cdev) +{ + struct mdt_device *mdt = mdt_dev(cdev); + struct lu_device *next = &mdt->mdt_child->md_lu_dev; + struct obd_device *obd = cdev->ld_obd; + int rc; + + ENTRY; + + LASSERT(obd); + + rc = next->ld_ops->ldo_prepare(env, cdev, next); + if (rc) + RETURN(rc); + + rc = mdt_fld_init(env, obd->obd_name, mdt); + if (rc) + RETURN(rc); + + rc = mdt_seq_init(env, obd->obd_name, mdt); + if (rc) + RETURN(rc); + + rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child, + &mdt->mdt_md_root_fid); + if (rc) + RETURN(rc); + + LASSERT(!cfs_test_bit(MDT_FL_CFGLOG, &mdt->mdt_state)); + target_recovery_init(&mdt->mdt_lut, mdt_recovery_handle); + cfs_set_bit(MDT_FL_CFGLOG, &mdt->mdt_state); + LASSERT(obd->obd_no_conn); + cfs_spin_lock(&obd->obd_dev_lock); + obd->obd_no_conn = 0; + cfs_spin_unlock(&obd->obd_dev_lock); + + if (obd->obd_recovering == 0) + mdt_postrecov(env, mdt); + + RETURN(rc); +} + static const struct lu_device_operations mdt_lu_ops = { .ldo_object_alloc = mdt_object_alloc, .ldo_process_config = mdt_process_config, + .ldo_prepare = mdt_prepare, }; static const struct lu_object_operations mdt_obj_ops = { @@ -5559,6 +5604,18 @@ static int mdt_obd_connect(const struct lu_env *env, req = info->mti_pill->rc_req; mdt = mdt_dev(obd->obd_lu_dev); + /* + * first, check whether the stack is ready to handle requests + * XXX: probably not very appropriate method is used now + * at some point we should find a better one + */ + if (!cfs_test_bit(MDT_FL_SYNCED, &mdt->mdt_state)) { + rc = obd_health_check(env, mdt->mdt_child_exp->exp_obd); + if (rc) + RETURN(-EAGAIN); + cfs_set_bit(MDT_FL_SYNCED, &mdt->mdt_state); + } + rc = class_connect(&conn, obd, cluuid); if (rc) RETURN(rc); @@ -5665,42 +5722,19 @@ static int mdt_export_cleanup(struct obd_export *exp) if (!cfs_list_empty(&closing_list)) { struct md_attr *ma = &info->mti_attr; - int lmm_size; - int cookie_size; - - lmm_size = mdt->mdt_max_mdsize; - OBD_ALLOC_LARGE(ma->ma_lmm, lmm_size); - if (ma->ma_lmm == NULL) - GOTO(out_lmm, rc = -ENOMEM); - - cookie_size = mdt->mdt_max_cookiesize; - OBD_ALLOC_LARGE(ma->ma_cookie, cookie_size); - if (ma->ma_cookie == NULL) - GOTO(out_cookie, rc = -ENOMEM); /* Close any open files (which may also cause orphan unlinking). */ cfs_list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) { cfs_list_del_init(&mfd->mfd_list); - memset(&ma->ma_attr, 0, sizeof(ma->ma_attr)); - ma->ma_lmm_size = lmm_size; - ma->ma_cookie_size = cookie_size; - ma->ma_need = 0; - /* It is not for setattr, just tell MDD to send - * DESTROY RPC to OSS if needed */ - ma->ma_valid = MA_FLAGS; - ma->ma_attr_flags = MDS_CLOSE_CLEANUP; - /* Don't unlink orphan on failover umount, LU-184 */ - if (exp->exp_flags & OBD_OPT_FAILOVER) - ma->ma_attr_flags |= MDS_KEEP_ORPHAN; + ma->ma_need = ma->ma_valid = 0; + /* Don't unlink orphan on failover umount, LU-184 */ + if (exp->exp_flags & OBD_OPT_FAILOVER) { + ma->ma_valid = MA_FLAGS; + ma->ma_attr_flags |= MDS_KEEP_ORPHAN; + } mdt_mfd_close(info, mfd); } - OBD_FREE_LARGE(ma->ma_cookie, cookie_size); - ma->ma_cookie = NULL; -out_cookie: - OBD_FREE_LARGE(ma->ma_lmm, lmm_size); - ma->ma_lmm = NULL; } -out_lmm: info->mti_mdt = NULL; /* cleanup client slot early */ /* Do not erase record for recoverable client. */ @@ -5789,83 +5823,6 @@ static int mdt_destroy_export(struct obd_export *exp) RETURN(0); } -static void mdt_allow_cli(struct mdt_device *m, unsigned int flag) -{ - if (flag & CONFIG_LOG) - cfs_set_bit(MDT_FL_CFGLOG, &m->mdt_state); - - /* also notify active event */ - if (flag & CONFIG_SYNC) - cfs_set_bit(MDT_FL_SYNCED, &m->mdt_state); - - if (cfs_test_bit(MDT_FL_CFGLOG, &m->mdt_state) && - cfs_test_bit(MDT_FL_SYNCED, &m->mdt_state)) { - struct obd_device *obd = m->mdt_md_dev.md_lu_dev.ld_obd; - - /* Open for clients */ - if (obd->obd_no_conn) { - cfs_spin_lock(&obd->obd_dev_lock); - obd->obd_no_conn = 0; - cfs_spin_unlock(&obd->obd_dev_lock); - } - } -} - -static int mdt_upcall(const struct lu_env *env, struct md_device *md, - enum md_upcall_event ev, void *data) -{ - struct mdt_device *m = mdt_dev(&md->md_lu_dev); - struct md_device *next = m->mdt_child; - struct mdt_thread_info *mti; - int rc = 0; - ENTRY; - - switch (ev) { - case MD_LOV_SYNC: - rc = next->md_ops->mdo_maxsize_get(env, next, - &m->mdt_max_mdsize, - &m->mdt_max_cookiesize); - CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n", - m->mdt_max_mdsize, m->mdt_max_cookiesize); - mdt_allow_cli(m, CONFIG_SYNC); - if (data) - (*(__u64 *)data) = - m->mdt_lut.lut_obd->u.obt.obt_mount_count; - break; - case MD_NO_TRANS: - mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); - mti->mti_no_need_trans = 1; - CDEBUG(D_INFO, "disable mdt trans for this thread\n"); - break; - case MD_LOV_CONFIG: - /* Check that MDT is not yet configured */ - LASSERT(!cfs_test_bit(MDT_FL_CFGLOG, &m->mdt_state)); - break; - default: - CERROR("invalid event\n"); - rc = -EINVAL; - break; - } - RETURN(rc); -} - -static int mdt_obd_notify(struct obd_device *obd, - struct obd_device *watched, - enum obd_notify_event ev, void *data) -{ - struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); - ENTRY; - - switch (ev) { - case OBD_NOTIFY_CONFIG: - mdt_allow_cli(mdt, (unsigned long)data); - break; - default: - CDEBUG(D_INFO, "Unhandled notification %#x\n", ev); - } - RETURN(0); -} - static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key, void *val, int vallen) { @@ -6193,7 +6150,6 @@ static struct obd_ops mdt_obd_device_ops = { .o_destroy_export = mdt_destroy_export, .o_iocontrol = mdt_iocontrol, .o_postrecov = mdt_obd_postrecov, - .o_notify = mdt_obd_notify }; static struct lu_device* mdt_device_fini(const struct lu_env *env, @@ -6235,7 +6191,6 @@ static struct lu_device *mdt_device_alloc(const struct lu_env *env, l = ERR_PTR(rc); return l; } - md_upcall_init(&m->mdt_md_dev, mdt_upcall); } else l = ERR_PTR(-ENOMEM); return l; diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index b734013..d5b125b 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -672,25 +672,10 @@ int mdt_handle_last_unlink(struct mdt_thread_info *info, struct mdt_object *mo, mdt_pack_attr2body(info, repbody, la, mdt_object_fid(mo)); if (ma->ma_valid & MA_LOV) { - __u32 mode; - - if (mdt_object_exists(mo) < 0) - /* If it is a remote object, and we do not retrieve - * EA back unlink reg file*/ - mode = S_IFREG; - else - mode = lu_object_attr(&mo->mot_obj.mo_lu); - - LASSERT(ma->ma_lmm_size); - mdt_dump_lmm(D_INFO, ma->ma_lmm); - repbody->eadatasize = ma->ma_lmm_size; - if (S_ISREG(mode)) - repbody->valid |= OBD_MD_FLEASIZE; - else if (S_ISDIR(mode)) - repbody->valid |= OBD_MD_FLDIREA; - else - LBUG(); + CERROR("No need in LOV EA upon unlink\n"); + dump_stack(); } + repbody->eadatasize = 0; if (ma->ma_cookie_size && (ma->ma_valid & MA_COOKIE)) { repbody->aclsize = ma->ma_cookie_size; @@ -903,13 +888,6 @@ static int mdt_setattr_unpack(struct mdt_thread_info *info) } } - ma->ma_cookie_size = req_capsule_get_size(pill, &RMF_LOGCOOKIES, - RCL_CLIENT); - if (ma->ma_cookie_size) { - ma->ma_cookie = req_capsule_client_get(pill, &RMF_LOGCOOKIES); - ma->ma_valid |= MA_COOKIE; - } - rc = mdt_dlmreq_unpack(info); RETURN(rc); } @@ -1134,8 +1112,6 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info) ma->ma_attr_flags &= ~MDS_VTX_BYPASS; info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info)); - /* last unlink need LOV EA sent back */ - rr->rr_eadatalen = info->mti_mdt->mdt_max_mdsize; rc = mdt_dlmreq_unpack(info); RETURN(rc); @@ -1196,13 +1172,35 @@ static int mdt_rename_unpack(struct mdt_thread_info *info) ma->ma_attr_flags &= ~MDS_VTX_BYPASS; info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info)); - /* rename may contain unlink so we might need LOV EA sent back */ - rr->rr_eadatalen = info->mti_mdt->mdt_max_mdsize; rc = mdt_dlmreq_unpack(info); RETURN(rc); } +/* + * please see comment above LOV_MAGIC_V1_DEF + */ +static void mdt_fix_lov_magic(struct mdt_thread_info *info) +{ + struct mdt_reint_record *rr = &info->mti_rr; + struct lov_user_md_v1 *v1; + + v1 = (void *)rr->rr_eadata; + LASSERT(v1); + + if (unlikely(req_is_replay(mdt_info_req(info)))) { + if (v1->lmm_magic == LOV_USER_MAGIC_V1) { + v1->lmm_magic = LOV_MAGIC_V1_DEF; + } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) { + v1->lmm_magic = __swab32(LOV_MAGIC_V1_DEF); + } else if (v1->lmm_magic == LOV_USER_MAGIC_V3) { + v1->lmm_magic = LOV_MAGIC_V3_DEF; + } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { + v1->lmm_magic = __swab32(LOV_MAGIC_V3_DEF); + } + } +} + static int mdt_open_unpack(struct mdt_thread_info *info) { struct md_ucred *uc = mdt_ucred(info); @@ -1279,6 +1277,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info) sp->u.sp_ea.eadatalen = rr->rr_eadatalen; sp->u.sp_ea.eadata = rr->rr_eadata; sp->no_create = !!req_is_replay(req); + mdt_fix_lov_magic(info); } /* diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 816f570..8fb45fb 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -223,39 +223,48 @@ int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o, static int mdt_som_attr_set(struct mdt_thread_info *info, struct mdt_object *obj, __u64 ioepoch, int enable) { - struct md_attr *ma = &info->mti_attr; - int rc; + struct lustre_mdt_attrs *lma; + struct md_attr *ma = &info->mti_attr; + struct lu_buf *buf = &info->mti_buf; + struct md_object *next = mdt_object_child(obj); + struct mdt_device *mdt = info->mti_mdt; + struct lu_attr *la = &ma->ma_attr; + int rc; ENTRY; CDEBUG(D_INODE, "Size-on-MDS attribute %s for epoch "LPU64 " on "DFID".\n", enable ? "update" : "disabling", ioepoch, PFID(mdt_object_fid(obj))); - ma->ma_valid |= MA_SOM; - ma->ma_som = &info->mti_u.som.data; - if (enable) { - struct mdt_device *mdt = info->mti_mdt; - struct lu_attr *la = &ma->ma_attr; - - ma->ma_som->msd_ioepoch = ioepoch; - ma->ma_som->msd_size = la->la_valid & LA_SIZE ? la->la_size : 0; - ma->ma_som->msd_blocks = la->la_valid & LA_BLOCKS ? - la->la_blocks : 0; - ma->ma_som->msd_mountid = mdt->mdt_lut.lut_obd->u.obt.obt_mount_count; - ma->ma_attr.la_valid &= LA_ATIME | LA_MTIME | LA_CTIME; - } else { - ma->ma_som->msd_ioepoch = IOEPOCH_INVAL; - ma->ma_attr.la_valid &= LA_ATIME; - } + lma = (struct lustre_mdt_attrs *) info->mti_xattr_buf; + CLASSERT(sizeof(info->mti_xattr_buf) >= sizeof(*lma)); + + buf->lb_buf = lma; + buf->lb_len = sizeof(info->mti_xattr_buf); + rc = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_LMA); + if (rc > 0) { + lustre_lma_swab(lma); + } else if (rc == -ENODATA) { + memset(lma, 0, sizeof(*lma)); + } else { + RETURN(rc); + } + + /* Copy FID */ + memcpy(&lma->lma_self_fid, mdt_object_fid(obj), sizeof(lma->lma_self_fid)); + + /* Copy SOM data */ + lma->lma_ioepoch = ioepoch; + lma->lma_som_size = la->la_valid & LA_SIZE ? la->la_size : 0; + lma->lma_som_blocks = la->la_valid & LA_BLOCKS ? la->la_blocks : 0; + lma->lma_som_mountid = mdt->mdt_lut.lut_obd->u.obt.obt_mount_count; + if (enable) + lma->lma_compat |= LMAC_SOM; + else + lma->lma_compat &= ~LMAC_SOM; - /* Since we have opened the file, it is unnecessary - * to check permission when close it. Between the "open" - * and "close", maybe someone has changed the file mode - * or flags, or the file created mode do not permit wirte, - * and so on. Just set MDS_PERM_BYPASS for all the cases. */ - ma->ma_attr_flags |= MDS_PERM_BYPASS | MDS_SOM; + rc = mo_xattr_set(info->mti_env, next, buf, XATTR_NAME_LMA, 0); - rc = mdt_attr_set(info, obj, ma, 0); RETURN(rc); } diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index 956dcfb..93fd646 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -555,8 +555,6 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, struct obd_device *obd, struct lustre_sb_info *lsi) { - struct lu_fid fid; - struct dt_object *o; int rc = 0; ENTRY; @@ -574,28 +572,8 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb); rc = mdt_server_data_init(env, mdt, lsi); - if (rc) - RETURN(rc); - o = dt_store_open(env, mdt->mdt_bottom, "", CAPA_KEYS, &fid); - if (!IS_ERR(o)) { - mdt->mdt_ck_obj = o; - rc = mdt_capa_keys_init(env, mdt); - if (rc) - GOTO(put_ck_object, rc); - } else { - rc = PTR_ERR(o); - CERROR("cannot open %s: rc = %d\n", CAPA_KEYS, rc); - GOTO(disconnect_exports, rc); - } - RETURN(0); - -put_ck_object: - lu_object_put(env, &o->do_lu); - mdt->mdt_ck_obj = NULL; -disconnect_exports: - class_disconnect_exports(obd); - return rc; + RETURN(rc); } void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt) diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index c2227e7..d4f2c6f 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -52,17 +52,7 @@ static inline void mdt_reint_init_ma(struct mdt_thread_info *info, struct md_attr *ma) { - ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD); - ma->ma_lmm_size = req_capsule_get_size(info->mti_pill, - &RMF_MDT_MD, RCL_SERVER); - - ma->ma_cookie = req_capsule_server_get(info->mti_pill, - &RMF_LOGCOOKIES); - ma->ma_cookie_size = req_capsule_get_size(info->mti_pill, - &RMF_LOGCOOKIES, - RCL_SERVER); - - ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE; + ma->ma_need = MA_INODE; ma->ma_valid = 0; } @@ -556,20 +546,23 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, cfs_list_del_init(&mfd->mfd_list); cfs_spin_unlock(&med->med_open_lock); - /* Close the found mfd, update attributes. */ - ma->ma_lmm_size = info->mti_mdt->mdt_max_mdsize; - OBD_ALLOC_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize); - if (ma->ma_lmm == NULL) - GOTO(out_put, rc = -ENOMEM); - mdt_mfd_close(info, mfd); - - OBD_FREE_LARGE(ma->ma_lmm, info->mti_mdt->mdt_max_mdsize); - } else { + } else if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) { + LASSERT((ma->ma_valid & MA_LOV) == 0); rc = mdt_attr_set(info, mo, ma, rr->rr_flags); if (rc) GOTO(out_put, rc); - } + } else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) { + struct lu_buf *buf = &info->mti_buf; + LASSERT(ma->ma_attr.la_valid == 0); + buf->lb_buf = ma->ma_lmm; + buf->lb_len = ma->ma_lmm_size; + rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), + buf, XATTR_NAME_LOV, 0); + if (rc) + GOTO(out_put, rc); + } else + LBUG(); ma->ma_need = MA_INODE; ma->ma_valid = 0; @@ -713,8 +706,6 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, GOTO(out_unlock_parent, rc); mdt_reint_init_ma(info, ma); - if (!ma->ma_lmm || !ma->ma_cookie) - GOTO(out_unlock_parent, rc = -EINVAL); if (info->mti_cross_ref) { /* @@ -995,8 +986,6 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info) GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt)); mdt_reint_init_ma(info, ma); - if (!ma->ma_lmm || !ma->ma_cookie) - GOTO(out_unlock_tgt, rc = -EINVAL); rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir), mdt_object_child(mtgt), rr->rr_fid2, @@ -1011,7 +1000,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info) mdt_handle_last_unlink(info, mtgt, ma); EXIT; -out_unlock_tgt: + if (mtgt) mdt_object_unlock_put(info, mtgt, lh_tgt, rc); out_unlock_tgtdir: @@ -1289,8 +1278,6 @@ static int mdt_reint_rename(struct mdt_thread_info *info, /* step 5: rename it */ mdt_reint_init_ma(info, ma); - if (!ma->ma_lmm || !ma->ma_cookie) - GOTO(out_unlock_new, rc = -EINVAL); mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_RENAME_WRITE); diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index 399293c..3af0c8d 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -865,6 +865,7 @@ cat_cleanup: RETURN(rc); } +EXPORT_SYMBOL(cat_cancel_cb); /* helper to initialize catalog llog and process it to cancel */ int llog_cat_init_and_process(const struct lu_env *env, diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 3728a35..f2344e2 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -783,6 +783,7 @@ int class_add_conn(struct obd_device *obd, struct lustre_cfg *lcfg) } if (strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) && strcmp(obd->obd_type->typ_name, LUSTRE_OSC_NAME) && + strcmp(obd->obd_type->typ_name, LUSTRE_OSP_NAME) && strcmp(obd->obd_type->typ_name, LUSTRE_MGC_NAME)) { CERROR("can't add connection on non-client dev\n"); RETURN(-EINVAL); @@ -1431,6 +1432,32 @@ static int class_config_llog_handler(const struct lu_env *env, } } +#if defined(HAVE_SERVER_SUPPORT) && defined(__KERNEL__) + /* newer MDS replaces LOV/OSC with LOD/OSP */ + { + char *typename = lustre_cfg_string(lcfg, 1); + + if ((lcfg->lcfg_command == LCFG_ATTACH && typename && + strcmp(typename, LUSTRE_LOV_NAME) == 0) && + IS_MDT(s2lsi(clli->cfg_sb))) { + CDEBUG(D_CONFIG, + "For 2.x interoperability, rename obd " + "type from lov to lod (%s)\n", + s2lsi(clli->cfg_sb)->lsi_svname); + strcpy(typename, LUSTRE_LOD_NAME); + } + if ((lcfg->lcfg_command == LCFG_ATTACH && typename && + strcmp(typename, LUSTRE_OSC_NAME) == 0) && + IS_MDT(s2lsi(clli->cfg_sb))) { + CDEBUG(D_CONFIG, + "For 2.x interoperability, rename obd " + "type from osc to osp (%s)\n", + s2lsi(clli->cfg_sb)->lsi_svname); + strcpy(typename, LUSTRE_OSP_NAME); + } + } +#endif + if ((clli->cfg_flags & CFG_F_EXCLUDE) && (lcfg->lcfg_command == LCFG_LOV_ADD_OBD)) /* Add inactive instead */ diff --git a/lustre/obdclass/obd_mount.c b/lustre/obdclass/obd_mount.c index 8dd2830..3ccd934 100644 --- a/lustre/obdclass/obd_mount.c +++ b/lustre/obdclass/obd_mount.c @@ -1137,6 +1137,8 @@ static int server_start_targets(struct super_block *sb, struct vfsmount *mnt) struct obd_device *obd; struct lustre_sb_info *lsi = s2lsi(sb); struct config_llog_instance cfg; + struct lu_env env; + struct lu_device *dev; int rc; ENTRY; @@ -1227,12 +1229,6 @@ out_mgc: RETURN(-ENXIO); } - if ((lsi->lsi_lmd->lmd_flags & LMD_FLG_ABORT_RECOV) && - (OBP(obd, iocontrol))) { - obd_iocontrol(OBD_IOC_ABORT_RECOVERY, - obd->obd_self_export, 0, NULL, NULL); - } - server_notify_target(sb, obd); /* calculate recovery timeout, do it after lustre_process_log */ @@ -1240,6 +1236,34 @@ out_mgc: /* log has been fully processed */ obd_notify(obd, NULL, OBD_NOTIFY_CONFIG, (void *)CONFIG_LOG); + + /* log has been fully processed, let clients connect */ + dev = obd->obd_lu_dev; + if (dev && dev->ld_ops->ldo_prepare) { + rc = lu_env_init(&env, dev->ld_type->ldt_ctx_tags); + if (rc == 0) { + struct lu_context session_ctx; + + lu_context_init(&session_ctx, LCT_SESSION); + session_ctx.lc_thread = NULL; + lu_context_enter(&session_ctx); + env.le_ses = &session_ctx; + + dev->ld_ops->ldo_prepare(&env, NULL, dev); + + lu_env_fini(&env); + lu_context_exit(&session_ctx); + lu_context_fini(&session_ctx); + } + } + + /* abort recovery only on the complete stack: + * many devices can be involved */ + if ((lsi->lsi_lmd->lmd_flags & LMD_FLG_ABORT_RECOV) && + (OBP(obd, iocontrol))) { + obd_iocontrol(OBD_IOC_ABORT_RECOVERY, + obd->obd_self_export, 0, NULL, NULL); + } } RETURN(rc); diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index fc73c1e..5239db5 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -1549,6 +1549,7 @@ static int echo_create_md_object(const struct lu_env *env, lum->lmm_stripe_offset = stripe_offset; lum->lmm_pattern = 0; spec->u.sp_ea.eadata = lum; + spec->u.sp_ea.eadatalen = sizeof(*lum); spec->sp_cr_flags |= MDS_OPEN_HAS_EA; } } diff --git a/lustre/osd-ldiskfs/osd_compat.c b/lustre/osd-ldiskfs/osd_compat.c index 2c4098b..0c51b6f 100644 --- a/lustre/osd-ldiskfs/osd_compat.c +++ b/lustre/osd-ldiskfs/osd_compat.c @@ -488,14 +488,14 @@ static const struct named_oid oids[] = { { FID_SEQ_SRV_OID, "" /* "seq_srv" */ }, { MDD_ROOT_INDEX_OID, "" /* "ROOT" */ }, { MDD_ORPHAN_OID, "" /* "PENDING" */ }, - { MDD_LOV_OBJ_OID, "" /* LOV_OBJID */ }, + { MDD_LOV_OBJ_OID, LOV_OBJID }, { MDD_CAPA_KEYS_OID, "" /* CAPA_KEYS */ }, { MDT_LAST_RECV_OID, LAST_RCVD }, { LFSCK_BOOKMARK_OID, "" /* "lfsck_bookmark" */ }, { OTABLE_IT_OID, "" /* "otable iterator" */}, { OFD_LAST_RECV_OID, "" /* LAST_RCVD */ }, { OFD_LAST_GROUP_OID, "LAST_GROUP" }, - { LLOG_CATALOGS_OID, "" /* "CATALOGS" */ }, + { LLOG_CATALOGS_OID, "CATALOGS" }, { MGS_CONFIGS_OID, "" /* MOUNT_CONFIGS_DIR */ }, { OFD_HEALTH_CHECK_OID, HEALTH_CHECK }, { 0, NULL } diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 3264023..3c721d0 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -4677,25 +4677,9 @@ static int osd_prepare(const struct lu_env *env, struct lu_device *pdev, int result = 0; ENTRY; -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 55, 0) - /* Unfortunately, the current MDD implementation relies on some specific - * code to be executed in the OSD layer. Since OFD now also uses the OSD - * module, we need a way to skip the metadata-specific code when running - * with OFD. - * The hack here is to check the type of the parent device which is - * either MD (i.e. MDD device) with the current MDT stack or DT (i.e. - * OFD device) on an OST. As a reminder, obdfilter does not use the OSD - * layer and still relies on lvfs. This hack won't work any more when - * LOD is landed since LOD is of DT type. - * This code should be removed once the orion MDT changes (LOD/OSP, ...) - * have been landed */ - osd->od_is_md = lu_device_is_md(pdev); -#else -#warning "all is_md checks must be removed from osd-ldiskfs" -#endif - - if (osd->od_is_md) { - /* 1. setup local objects */ + if (dev->ld_site && lu_device_is_md(dev->ld_site->ls_top_dev)) { + /* MDT/MDD still use old infrastructure to create + * special files */ result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev)); if (result) diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 7866264..9c533a9 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -246,7 +246,7 @@ static int osp_process_config(const struct lu_env *env, lprocfs_osp_init_vars(&lvars); LASSERT(d->opd_obd); - rc = class_process_proc_param(PARAM_OSP, lvars.obd_vars, + rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, lcfg, d->opd_obd); if (rc > 0) rc = 0; diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index a0489f1..61a7588 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -93,7 +93,12 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, * * 2) send synchronous truncate RPC with just assigned id */ - LASSERT(attr); + + /* there are few places in MDD code still passing NULL + * XXX: to be fixed soon */ + if (attr == NULL) + RETURN(0); + if (attr->la_valid & LA_SIZE && attr->la_size > 0) { LASSERT(!dt_object_exists(dt)); osp_object_assign_id(env, d, o); @@ -171,8 +176,11 @@ static int osp_declare_object_create(const struct lu_env *env, /* * There can be gaps in precreated ids and record to unlink llog + * XXX: we do not handle gaps yet, implemented before solution + * was found to be racy, so we disabled that. there is no + * point in making useless but expensive llog declaration. */ - rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); + /* rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); */ if (unlikely(!fid_is_zero(fid))) { /* replay case: caller knows fid */ diff --git a/lustre/osp/osp_precreate.c b/lustre/osp/osp_precreate.c index dfcb2a2..53cfef9 100644 --- a/lustre/osp/osp_precreate.c +++ b/lustre/osp/osp_precreate.c @@ -312,6 +312,56 @@ out_req: RETURN(rc); } + +static int osp_get_lastid_from_ost(struct osp_device *d) +{ + struct ptlrpc_request *req; + struct obd_import *imp; + obd_id *reply; + char *tmp; + int rc; + + imp = d->opd_obd->u.cli.cl_import; + LASSERT(imp); + + req = ptlrpc_request_alloc(imp, &RQF_OST_GET_INFO_LAST_ID); + if (req == NULL) + RETURN(-ENOMEM); + + req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, + RCL_CLIENT, sizeof(KEY_LAST_ID)); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); + memcpy(tmp, KEY_LAST_ID, sizeof(KEY_LAST_ID)); + + req->rq_no_delay = req->rq_no_resend = 1; + ptlrpc_request_set_replen(req); + rc = ptlrpc_queue_wait(req); + if (rc) { + /* bad-bad OST.. let sysadm sort this out */ + ptlrpc_set_import_active(imp, 0); + GOTO(out, rc); + } + + reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID); + if (reply == NULL) + GOTO(out, rc = -EPROTO); + + d->opd_last_used_id = *reply; + CDEBUG(D_HA, "%s: got last_id "LPU64" from OST\n", + d->opd_obd->obd_name, d->opd_last_used_id); + +out: + ptlrpc_req_finished(req); + RETURN(rc); + +} + /** * asks OST to clean precreate orphans * and gets next id for new objects @@ -328,22 +378,32 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d) LASSERT(d->opd_recovery_completed); LASSERT(d->opd_pre_reserved == 0); + CDEBUG(D_HA, "%s: going to cleanup orphans since "LPU64"\n", + d->opd_obd->obd_name, d->opd_last_used_id); + + if (d->opd_last_used_id < 2) { + /* lastid looks strange... ask OST */ + rc = osp_get_lastid_from_ost(d); + if (rc) + GOTO(out, rc); + } + imp = d->opd_obd->u.cli.cl_import; LASSERT(imp); req = ptlrpc_request_alloc(imp, &RQF_OST_CREATE); if (req == NULL) - RETURN(-ENOMEM); + GOTO(out, rc = -ENOMEM); rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE); if (rc) { ptlrpc_request_free(req); - RETURN(rc); + GOTO(out, rc); } body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); if (body == NULL) - GOTO(out_req, rc = -EPROTO); + GOTO(out, rc = -EPROTO); body->oa.o_flags = OBD_FL_DELORPHAN; body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP; @@ -358,14 +418,12 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d) req->rq_no_resend = req->rq_no_delay = 1; rc = ptlrpc_queue_wait(req); - if (rc) { - ptlrpc_set_import_active(imp, 0); - GOTO(out_req, rc); - } + if (rc) + GOTO(out, rc); body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); if (body == NULL) - GOTO(out_req, rc = -EPROTO); + GOTO(out, rc = -EPROTO); /* * OST provides us with id new pool starts from in body->oa.o_id @@ -384,16 +442,14 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d) d->opd_pre_grow_slow = 0; cfs_spin_unlock(&d->opd_pre_lock); - /* now we can wakeup all users awaiting for objects */ - osp_pre_update_status(d, rc); - cfs_waitq_signal(&d->opd_pre_user_waitq); - CDEBUG(D_HA, "Got last_id "LPU64" from OST, last_used is "LPU64 ", next "LPU64"\n", body->oa.o_id, le64_to_cpu(d->opd_last_used_id), d->opd_pre_next); -out_req: - ptlrpc_req_finished(req); +out: + if (req) + ptlrpc_req_finished(req); + RETURN(rc); } @@ -506,6 +562,18 @@ static int osp_precreate_thread(void *_arg) if (rc) { CERROR("%s: cannot cleanup orphans: rc = %d\n", d->opd_obd->obd_name, rc); + /* we can't proceed from here, OST seem to + * be in a bad shape, better to wait for + * a new instance of the server and repeat + * from the beginning. notify possible waiters + * this OSP isn't quite functional yet */ + osp_pre_update_status(d, rc); + cfs_waitq_signal(&d->opd_pre_user_waitq); + l_wait_event(d->opd_pre_waitq, + !osp_precreate_running(d) || + d->opd_new_connection, &lwi); + continue; + } } @@ -591,6 +659,7 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d) struct l_wait_info lwi; cfs_time_t expire = cfs_time_shift(obd_timeout); int precreated, rc; + int count = 0; ENTRY; @@ -612,6 +681,15 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d) break; } +#if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 90, 0) +#error "remove this before the release" +#endif + /* + * to address Andreas's concern on possible busy-loop + * between this thread and osp_precreate_send() + */ + LASSERT(count++ < 1000); + /* * increase number of precreations */ diff --git a/lustre/tests/lustre-rsync-test.sh b/lustre/tests/lustre-rsync-test.sh index de9f428..b07d1e7 100644 --- a/lustre/tests/lustre-rsync-test.sh +++ b/lustre/tests/lustre-rsync-test.sh @@ -20,6 +20,9 @@ ALWAYS_EXCEPT="$LRSYNC_EXCEPT" [ "$ALWAYS_EXCEPT$EXCEPT" ] && \ echo "Skipping tests: `echo $ALWAYS_EXCEPT $EXCEPT`" +# disable till changelogs from orion landed (LU-2034) +ALWAYS_EXCEPT="1 2 3 4 5 6 7 8 9" + KILL=/bin/kill TMP=${TMP:-/tmp} diff --git a/lustre/tests/recovery-small.sh b/lustre/tests/recovery-small.sh index 45cf76e..5a26d02 100755 --- a/lustre/tests/recovery-small.sh +++ b/lustre/tests/recovery-small.sh @@ -2,8 +2,8 @@ set -e -# bug 5494 5493 -ALWAYS_EXCEPT="24 52 $RECOVERY_SMALL_EXCEPT" +# bug 5494 5493 LU2034 +ALWAYS_EXCEPT="24 52 60 $RECOVERY_SMALL_EXCEPT" export MULTIOP=${MULTIOP:-multiop} PTLDEBUG=${PTLDEBUG:--1} diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index b2c6cf8..fdec8e0 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -11,6 +11,11 @@ ONLY=${ONLY:-"$*"} ALWAYS_EXCEPT=" 27u 42a 42b 42c 42d 45 51d 68b $SANITY_EXCEPT" # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT! +# with LOD/OSP landing +# bug number for skipped tests: LU2036 LU2034 +ALWAYS_EXCEPT=" 76 160 $ALWAYS_EXCEPT" + + # Tests that fail on uml CPU=`awk '/model/ {print $4}' /proc/cpuinfo` # buffer i/o errs sock spc runas diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index ea998e0..b8e0101 100644 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -471,9 +471,9 @@ load_modules_local() { load_module osd-ldiskfs/osd_ldiskfs fi load_module mdt/mdt - load_module cmm/cmm load_module ost/ost load_module lod/lod + load_module osp/osp if [ "$USE_OFD" == yes ]; then load_module ofd/ofd else -- 1.8.3.1