int mds_lov_write_objids(struct obd_device *obd);
void mds_lov_update_objids(struct obd_device *obd, obd_id *ids);
+void mds_objids_from_lmm(obd_id *, struct lov_mds_md *, struct lov_desc *);
+int mds_postrecov(struct obd_device *);
+
/* ioctls for trying requests */
#define IOC_REQUEST_TYPE 'f'
#define IOC_REQUEST_MIN_NR 30
const struct lu_fid *sp_pfid;
/* eadata for regular files */
struct md_spec_reg {
+ /* lov objs exist already */
+ int no_lov_create;
const void *eadata;
int eadatalen;
} sp_ea;
obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
OBD_NOTIFY_SYNC, NULL);
*/
+ LASSERT(mdd);
+ LASSERT(mdd->mdd_obd_dev);
+
+ mdd->mdd_obd_dev->obd_recovering = 0;
+ //mdd->mdd_obd_dev->obd_type->typ_dt_ops->
+ mds_postrecov(mdd->mdd_obd_dev);
/* TODO: orphans handling */
rc = next->ld_ops->ldo_recovery_complete(ctxt, next);
+
RETURN(rc);
}
GOTO(cleanup, rc);
inserted = 1;
- rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm, lmm_size, handle, 0);
+ /* replay creates has objects already */
+ if (spec->u.sp_ea.no_lov_create)
+ rc = mdd_lov_set_md(ctxt, mdd_pobj, son,
+ (struct lov_mds_md *)spec->u.sp_ea.eadata,
+ spec->u.sp_ea.eadatalen, handle, 0);
+ else
+ rc = mdd_lov_set_md(ctxt, mdd_pobj, son, lmm,
+ lmm_size, handle, 0);
if (rc) {
CERROR("error on stripe info copy %d \n", rc);
GOTO(cleanup, rc);
LBUG();
}
obd->u.mds.mds_id = index;
+ obd->obd_recovering = 1;
rc = class_setup(obd, lcfg);
if (rc)
GOTO(class_detach, rc);
*/
obd->obd_upcall.onu_owner = mdd;
obd->obd_upcall.onu_upcall = mdd_lov_update;
-
mdd->mdd_obd_dev = obd;
class_detach:
if (rc)
mds_lov_update_objids(mdd->mdd_obd_dev, info->mti_oti.oti_objid);
}
+static void mdd_lov_objid_from_lmm(const struct lu_context *ctx,
+ struct mdd_device *mdd,
+ struct lov_mds_md *lmm)
+{
+ struct mds_obd *mds = &mdd->mdd_obd_dev->u.mds;
+ struct mdd_thread_info *info = mdd_ctx_info(ctx);
+ mds_objids_from_lmm(info->mti_oti.oti_objid, lmm, &mds->mds_lov_desc);
+}
+
static void mdd_lov_objid_free(const struct lu_context *ctxt,
struct mdd_device *mdd)
{
!(create_flags & FMODE_WRITE))
RETURN(0);
- OBD_FAIL_RETURN((OBD_FAIL_MDS_ALLOC_OBDO), -ENOMEM);
+ oti_init(oti, NULL);
+ rc = mdd_lov_objid_alloc(ctxt, mdd);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* replay case, should get lov from eadata */
+ if (spec->u.sp_ea.no_lov_create != 0) {
+ mdd_lov_objid_from_lmm(ctxt, mdd, (struct lov_mds_md *)eadata);
+ RETURN(0);
+ }
+
+ if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_ALLOC_OBDO))
+ GOTO(out_ids, rc = -ENOMEM);
LASSERT(lov_exp != NULL);
oa = obdo_alloc();
if (oa == NULL)
- RETURN(-ENOMEM);
+ GOTO(out_ids, rc = -ENOMEM);
oa->o_uid = 0; /* must have 0 uid / gid on OST */
oa->o_gid = 0;
OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP;
oa->o_size = 0;
- oti_init(oti, NULL);
- rc = mdd_lov_objid_alloc(ctxt, mdd);
- if (rc != 0)
- GOTO(out_oa, rc);
-
if (!(create_flags & MDS_OPEN_HAS_OBJS)) {
if (create_flags & MDS_OPEN_HAS_EA) {
LASSERT(eadata != NULL);
out_oa:
oti_free_cookies(oti);
obdo_free(oa);
+out_ids:
if (lsm)
obd_free_memmd(lov_exp, &lsm);
if (rc != 0)
out:
RETURN(rc);
}
+EXPORT_SYMBOL(mds_postrecov);
/* We need to be able to stop an mds_lov_synchronize */
static int mds_lov_early_clean(struct obd_device *obd)
.o_llog_init = mds_llog_init,
.o_llog_finish = mds_llog_finish,
.o_notify = mds_notify,
- // .o_health_check = mds_cmd_health_check,
+ // .o_health_check = mds_cmd_health_check,
};
static int __init mds_cmd_init(void)
* set_nextid(). The class driver can help us here, because
* it can use the obd_recovering flag to determine when the
* the OBD is full available. */
+ /* MDD device will care about that
if (!obd->obd_recovering)
rc = mds_postrecov(obd);
+ */
RETURN(rc);
err_reg:
le64_to_cpu(lmm->lmm_objects[i].l_object_id);
}
}
-
+EXPORT_SYMBOL(mds_objids_from_lmm);
EXIT;
}
+int mdt_postrecov(const struct lu_context *, struct mdt_device *);
+
static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
struct lu_device_type *ldt, struct lustre_cfg *cfg)
{
rc = mdt_fs_setup(ctx, m);
if (rc)
GOTO(err_stop_service, rc);
+ if(obd->obd_recovering == 0)
+ mdt_postrecov(ctx, m);
RETURN(0);
err_stop_service:
RETURN(rc);
}
-int mdt_postrecov(struct obd_device *obd)
+int mdt_postrecov(const struct lu_context *ctx, struct mdt_device *mdt)
{
- struct lu_context ctxt;
- struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
struct lu_device *ld = md2lu_dev(mdt->mdt_child);
int rc;
ENTRY;
+ rc = ld->ld_ops->ldo_recovery_complete(ctx, ld);
+ RETURN(rc);
+}
+
+int mdt_obd_postrecov(struct obd_device *obd)
+{
+ struct lu_context ctxt;
+ int rc;
rc = lu_context_init(&ctxt, LCT_MD_THREAD);
if (rc)
RETURN(rc);
lu_context_enter(&ctxt);
- rc = ld->ld_ops->ldo_recovery_complete(&ctxt, ld);
+ rc = mdt_postrecov(&ctxt, mdt_dev(obd->obd_lu_dev));
lu_context_exit(&ctxt);
lu_context_fini(&ctxt);
- RETURN(rc);
+ return rc;
}
static struct obd_ops mdt_obd_device_ops = {
.o_init_export = mdt_init_export,
.o_destroy_export = mdt_destroy_export,
.o_iocontrol = mdt_iocontrol,
- .o_postrecov = mdt_postrecov
+ .o_postrecov = mdt_obd_postrecov
};
if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
struct md_create_spec *sp = &info->mti_spec;
+ struct ptlrpc_request *req = mdt_info_req(info);
sp->u.sp_ea.eadata = req_capsule_client_get(pill,
&RMF_EADATA);
sp->u.sp_ea.eadatalen = req_capsule_get_size(pill,
&RMF_EADATA,
RCL_CLIENT);
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+ sp->u.sp_ea.no_lov_create = 1;
}
RETURN(result);
CDEBUG(rc == 0 ? D_INFO : D_ERROR,
"zeroing out client idx %u in %s rc %d\n",
med->med_lr_idx, LAST_RCVD, rc);
-
- if (!test_and_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
- CERROR("MDS client %u: bit already clear in bitmap!!\n",
- med->med_lr_idx);
- LBUG();
- }
-
+
+ spin_lock(&mdt->mdt_client_bitmap_lock);
+ clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap);
+ spin_unlock(&mdt->mdt_client_bitmap_lock);
/* Make sure the server's last_transno is up to date. Do this
* after the client is freed so we know all the client's
* transactions have been committed. */
if (mti->mti_transno > mdt->mdt_last_transno)
mdt->mdt_last_transno = mti->mti_transno;
}
- spin_unlock(&mdt->mdt_transno_lock);
/* sometimes the reply message has not been successfully packed */
LASSERT(req != NULL && req->rq_repmsg != NULL);
CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
mti->mti_transno, req->rq_export->exp_obd->obd_last_committed);
- spin_lock(&mdt->mdt_transno_lock);
req->rq_transno = mti->mti_transno;
lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno);
target_committed_to_req(req);