* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
LASSERTF(rc == 0, "rc = %d\n", rc);
/* VBR: set export last committed version */
exp->exp_last_committed = last_transno;
- cfs_spin_lock(&exp->exp_lock);
- exp->exp_connecting = 0;
- exp->exp_in_recovery = 0;
- cfs_spin_unlock(&exp->exp_lock);
- obd->obd_max_recoverable_clients++;
- class_export_put(exp);
-
- CDEBUG(D_OTHER, "client at idx %d has last_transno="LPU64"\n",
- cl_idx, last_transno);
- /* protect __u64 value update */
- cfs_spin_lock(&mdt->mdt_lut.lut_translock);
- mdt->mdt_lut.lut_last_transno = max(last_transno,
- mdt->mdt_lut.lut_last_transno);
- cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
+ spin_lock(&exp->exp_lock);
+ exp->exp_connecting = 0;
+ exp->exp_in_recovery = 0;
+ spin_unlock(&exp->exp_lock);
+ obd->obd_max_recoverable_clients++;
+ class_export_put(exp);
+
+ CDEBUG(D_OTHER, "client at idx %d has last_transno ="LPU64"\n",
+ cl_idx, last_transno);
+ /* protect __u64 value update */
+ spin_lock(&mdt->mdt_lut.lut_translock);
+ mdt->mdt_lut.lut_last_transno = max(last_transno,
+ mdt->mdt_lut.lut_last_transno);
+ spin_unlock(&mdt->mdt_lut.lut_translock);
}
err_client:
struct dt_object *obj;
struct lu_attr *la;
unsigned long last_rcvd_size;
+ __u32 index;
__u64 mount_count;
int rc;
ENTRY;
CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
+ rc = server_name2index(obd->obd_name, &index, NULL);
+ if (rc < 0) {
+ CERROR("%s: Can not get index from obd_name: rc = %d\n",
+ obd->obd_name, rc);
+ RETURN(rc);
+ }
+
mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
LASSERT(mti != NULL);
la = &mti->mti_attr.ma_attr;
lsd->lsd_client_size = LR_CLIENT_SIZE;
lsd->lsd_feature_compat = OBD_COMPAT_MDT;
lsd->lsd_feature_rocompat = OBD_ROCOMPAT_LOVOBJID;
- lsd->lsd_feature_incompat = OBD_INCOMPAT_MDT |
- OBD_INCOMPAT_COMMON_LR |
- OBD_INCOMPAT_MULTI_OI;
- } else {
- LCONSOLE_WARN("%s: used disk, loading\n", obd->obd_name);
+ lsd->lsd_feature_incompat = OBD_INCOMPAT_MDT |
+ OBD_INCOMPAT_COMMON_LR |
+ OBD_INCOMPAT_MULTI_OI;
+ lsd->lsd_osd_index = index;
+ } else {
+ LCONSOLE_WARN("%s: used disk, loading\n", obd->obd_name);
rc = tgt_server_data_read(env, &mdt->mdt_lut);
if (rc) {
CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
lsd->lsd_feature_compat |= OBD_COMPAT_MDT;
lsd->lsd_feature_incompat |= OBD_INCOMPAT_MDT |
OBD_INCOMPAT_COMMON_LR;
- }
+ if (lsd->lsd_osd_index != index) {
+ LCONSOLE_ERROR_MSG(0x157, "%s: index %d in last rcvd is"
+ "different with the index %d in"
+ "config log, It might be disk"
+ "corruption!\n", obd->obd_name,
+ lsd->lsd_osd_index, index);
+ GOTO(out, rc = -EINVAL);
+ }
+ }
mount_count = lsd->lsd_mount_count;
if (lsd->lsd_feature_incompat & ~MDT_INCOMPAT_SUPP) {
lsd->lsd_feature_compat |= OBD_COMPAT_20;
}
- if (lsi->lsi_flags & LDD_F_IAM_DIR)
- lsd->lsd_feature_incompat |= OBD_INCOMPAT_IAM_DIR;
-
lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
- cfs_spin_lock(&mdt->mdt_lut.lut_translock);
- mdt->mdt_lut.lut_last_transno = lsd->lsd_last_transno;
- cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
+ spin_lock(&mdt->mdt_lut.lut_translock);
+ mdt->mdt_lut.lut_last_transno = lsd->lsd_last_transno;
+ spin_unlock(&mdt->mdt_lut.lut_translock);
CDEBUG(D_INODE, "========BEGIN DUMPING LAST_RCVD========\n");
CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
if (rc)
GOTO(err_client, rc);
- cfs_spin_lock(&mdt->mdt_lut.lut_translock);
- /* obd_last_committed is used for compatibility
- * with other lustre recovery code */
- obd->obd_last_committed = mdt->mdt_lut.lut_last_transno;
- cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
+ spin_lock(&mdt->mdt_lut.lut_translock);
+ /* obd_last_committed is used for compatibility
+ * with other lustre recovery code */
+ obd->obd_last_committed = mdt->mdt_lut.lut_last_transno;
+ spin_unlock(&mdt->mdt_lut.lut_translock);
obd->u.obt.obt_mount_count = mount_count + 1;
obd->u.obt.obt_instance = (__u32)obd->u.obt.obt_mount_count;
ted = &req->rq_export->exp_target_data;
LASSERT(ted);
- cfs_mutex_lock(&ted->ted_lcd_lock);
- lcd = ted->ted_lcd;
- /* if the export has already been disconnected, we have no last_rcvd slot,
- * update server data with latest transno then */
- if (lcd == NULL) {
- cfs_mutex_unlock(&ted->ted_lcd_lock);
+ mutex_lock(&ted->ted_lcd_lock);
+ lcd = ted->ted_lcd;
+ /* if the export has already been disconnected, we have no last_rcvd
+ * slot, update server data with latest transno then */
+ if (lcd == NULL) {
+ mutex_unlock(&ted->ted_lcd_lock);
CWARN("commit transaction for disconnected client %s: rc %d\n",
req->rq_export->exp_client_uuid.uuid, rc);
err = tgt_server_data_write(mti->mti_env, &mdt->mdt_lut, th);
lcd->lcd_last_close_transno,
mti->mti_transno, req_is_replay(req));
if (req_is_replay(req)) {
- cfs_spin_lock(&req->rq_export->exp_lock);
- req->rq_export->exp_vbr_failed = 1;
- cfs_spin_unlock(&req->rq_export->exp_lock);
- }
- cfs_mutex_unlock(&ted->ted_lcd_lock);
+ spin_lock(&req->rq_export->exp_lock);
+ req->rq_export->exp_vbr_failed = 1;
+ spin_unlock(&req->rq_export->exp_lock);
+ }
+ mutex_unlock(&ted->ted_lcd_lock);
RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
}
lcd->lcd_last_close_transno = mti->mti_transno;
lcd->lcd_last_transno,
mti->mti_transno, req_is_replay(req));
if (req_is_replay(req)) {
- cfs_spin_lock(&req->rq_export->exp_lock);
- req->rq_export->exp_vbr_failed = 1;
- cfs_spin_unlock(&req->rq_export->exp_lock);
- }
- cfs_mutex_unlock(&ted->ted_lcd_lock);
+ spin_lock(&req->rq_export->exp_lock);
+ req->rq_export->exp_vbr_failed = 1;
+ spin_unlock(&req->rq_export->exp_lock);
+ }
+ mutex_unlock(&ted->ted_lcd_lock);
RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
}
lcd->lcd_last_transno = mti->mti_transno;
lcd->lcd_last_data = mti->mti_opdata;
}
- if ((mti->mti_exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0) {
+ if (exp_connect_flags(mti->mti_exp) & OBD_CONNECT_LIGHTWEIGHT) {
/* Although lightweight (LW) connections have no slot in
* last_rcvd, we still want to maintain the in-memory
* lsd_client_data structure in order to properly handle reply
struct lu_target *tg = &mdt->mdt_lut;
bool update = false;
- cfs_mutex_unlock(&ted->ted_lcd_lock);
+ mutex_unlock(&ted->ted_lcd_lock);
err = 0;
/* All operations performed by LW clients are synchronous and
* we store the committed transno in the last_rcvd header */
- cfs_spin_lock(&tg->lut_translock);
+ spin_lock(&tg->lut_translock);
if (mti->mti_transno > tg->lut_lsd.lsd_last_transno) {
tg->lut_lsd.lsd_last_transno = mti->mti_transno;
update = true;
}
- cfs_spin_unlock(&tg->lut_translock);
+ spin_unlock(&tg->lut_translock);
if (update)
err = tgt_server_data_write(mti->mti_env, tg, th);
} else if (off <= 0) {
CERROR("%s: client idx %d has offset %lld\n",
mdt2obd_dev(mdt)->obd_name, ted->ted_lr_idx, off);
- cfs_mutex_unlock(&ted->ted_lcd_lock);
+ mutex_unlock(&ted->ted_lcd_lock);
err = -EINVAL;
} else {
err = tgt_client_data_write(mti->mti_env, &mdt->mdt_lut, lcd,
&off, th);
- cfs_mutex_unlock(&ted->ted_lcd_lock);
- }
- RETURN(err);
+ mutex_unlock(&ted->ted_lcd_lock);
+ }
+ RETURN(err);
}
extern struct lu_context_key mdt_thread_key;
if (rc)
return rc;
- if (mti->mti_mos != NULL)
+ /* we probably should not set local transno to the remote object
+ * on another storage, What about VBR on remote object? XXX */
+ if (mti->mti_mos != NULL && !mdt_object_remote(mti->mti_mos))
rc = dt_declare_version_set(env, mdt_obj2dt(mti->mti_mos), th);
return rc;
}
mti->mti_has_trans = 1;
- cfs_spin_lock(&mdt->mdt_lut.lut_translock);
+ spin_lock(&mdt->mdt_lut.lut_translock);
if (txn->th_result != 0) {
if (mti->mti_transno != 0) {
- CERROR("Replay transno "LPU64" failed: rc %d\n",
- mti->mti_transno, txn->th_result);
+ CERROR("Replay transno "LPU64" failed: rc %d\n",
+ mti->mti_transno, txn->th_result);
+ return 0;
}
} else if (mti->mti_transno == 0) {
mti->mti_transno = ++ mdt->mdt_lut.lut_last_transno;
if (mti->mti_transno > mdt->mdt_lut.lut_last_transno)
mdt->mdt_lut.lut_last_transno = mti->mti_transno;
}
- cfs_spin_unlock(&mdt->mdt_lut.lut_translock);
+ spin_unlock(&mdt->mdt_lut.lut_translock);
/* sometimes the reply message has not been successfully packed */
LASSERT(req != NULL && req->rq_repmsg != NULL);
/** VBR: set new versions */
- if (txn->th_result == 0 && mti->mti_mos != NULL) {
+ /* we probably should not set local transno to the remote object
+ * on another storage, What about VBR on remote object? XXX */
+ if (txn->th_result == 0 && mti->mti_mos != NULL &&
+ !mdt_object_remote(mti->mti_mos)) {
+
dt_version_set(env, mdt_obj2dt(mti->mti_mos),
mti->mti_transno, txn);
mti->mti_mos = NULL;
int i;
/* CAVEAT EMPTOR: spinlock order */
- cfs_spin_lock(&exp->exp_lock);
+ spin_lock(&exp->exp_lock);
cfs_list_for_each (tmp, &exp->exp_outstanding_replies) {
oldrep = cfs_list_entry(tmp, struct ptlrpc_reply_state,
rs_exp_list);
oldrep->rs_opc);
svcpt = oldrep->rs_svcpt;
- cfs_spin_lock(&svcpt->scp_rep_lock);
+ spin_lock(&svcpt->scp_rep_lock);
cfs_list_del_init (&oldrep->rs_exp_list);
oldrep->rs_nlocks = 0;
DEBUG_REQ(D_HA, req, "stole locks for");
- cfs_spin_lock(&oldrep->rs_lock);
- ptlrpc_schedule_difficult_reply (oldrep);
- cfs_spin_unlock(&oldrep->rs_lock);
-
- cfs_spin_unlock(&svcpt->scp_rep_lock);
- break;
- }
- cfs_spin_unlock(&exp->exp_lock);
+ spin_lock(&oldrep->rs_lock);
+ ptlrpc_schedule_difficult_reply(oldrep);
+ spin_unlock(&oldrep->rs_lock);
+
+ spin_unlock(&svcpt->scp_rep_lock);
+ break;
+ }
+ spin_unlock(&exp->exp_lock);
}
/**
mti->mti_attr.ma_need = MA_INODE;
mti->mti_attr.ma_valid = 0;
rc = mdt_attr_get_complex(mti, child, &mti->mti_attr);
- if (rc == -EREMOTE) {
- /* object was created on remote server */
- req->rq_status = rc;
- body->valid |= OBD_MD_MDS;
- }
- mdt_pack_attr2body(mti, body, &mti->mti_attr.ma_attr,
- mdt_object_fid(child));
- mdt_object_put(mti->mti_env, child);
+ if (rc == -EREMOTE) {
+ /* object was created on remote server */
+ if (!mdt_is_dne_client(exp))
+ /* Return -EIO for old client */
+ rc = -EIO;
+
+ req->rq_status = rc;
+ body->valid |= OBD_MD_MDS;
+ }
+ mdt_pack_attr2body(mti, body, &mti->mti_attr.ma_attr,
+ mdt_object_fid(child));
+ mdt_object_put(mti->mti_env, child);
}
static void mdt_reconstruct_setattr(struct mdt_thread_info *mti,
repbody = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
repbody->ioepoch = obj->mot_ioepoch;
- cfs_spin_lock(&med->med_open_lock);
- cfs_list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
- if (mfd->mfd_xid == req->rq_xid)
- break;
- }
- LASSERT(&mfd->mfd_list != &med->med_open_head);
- cfs_spin_unlock(&med->med_open_lock);
- repbody->handle.cookie = mfd->mfd_handle.h_cookie;
- }
+ spin_lock(&med->med_open_lock);
+ cfs_list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
+ if (mfd->mfd_xid == req->rq_xid)
+ break;
+ }
+ LASSERT(&mfd->mfd_list != &med->med_open_head);
+ spin_unlock(&med->med_open_lock);
+ repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+ }
- mdt_object_put(mti->mti_env, obj);
+ mdt_object_put(mti->mti_env, obj);
}
typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti,