RETURN(rc);
}
-static int mdt_last_rcvd_read(const struct lu_env *env,
- struct mdt_device *mdt,
- struct lsd_client_data *lcd, loff_t *off)
+static int mdt_last_rcvd_read(const struct lu_env *env, struct mdt_device *mdt,
+ struct lsd_client_data *lcd, loff_t *off,
+ int index)
{
struct mdt_thread_info *mti;
struct lsd_client_data *tmp;
tmp = &mti->mti_lcd;
rc = dt_record_read(env, mdt->mdt_last_rcvd,
mdt_buf(env, tmp, sizeof(*tmp)), off);
- if (rc == 0)
+ if (rc == 0) {
+ check_lcd(mdt2obd_dev(mdt)->obd_name, index, tmp);
lcd_le_to_cpu(tmp, lcd);
+ }
CDEBUG(D_INFO, "read lcd @%d rc = %d, uuid = %s, last_transno = "LPU64
", last_xid = "LPU64", last_result = %u, last_data = %u, "
/* When we do a clean MDS shutdown, we save the last_transno into
* the header. If we find clients with higher last_transno values
* then those clients may need recovery done. */
- LASSERT(atomic_read(&obd->obd_req_replay_clients) == 0);
+ LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients) == 0);
for (cl_idx = 0, off = lsd->lsd_client_start;
off < last_size; cl_idx++) {
__u64 last_transno;
off = lsd->lsd_client_start +
cl_idx * lsd->lsd_client_size;
- rc = mdt_last_rcvd_read(env, mdt, lcd, &off);
+ rc = mdt_last_rcvd_read(env, mdt, lcd, &off, cl_idx);
if (rc) {
CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
LAST_RCVD, cl_idx, off, rc);
/* VBR: set export last committed version */
exp->exp_last_committed = last_transno;
lcd = NULL;
- spin_lock(&exp->exp_lock);
+ cfs_spin_lock(&exp->exp_lock);
exp->exp_connecting = 0;
exp->exp_in_recovery = 0;
- spin_unlock(&exp->exp_lock);
+ cfs_spin_unlock(&exp->exp_lock);
obd->obd_max_recoverable_clients++;
class_export_put(exp);
}
CDEBUG(D_OTHER, "client at idx %d has last_transno="LPU64"\n",
cl_idx, last_transno);
/* protect __u64 value update */
- spin_lock(&mdt->mdt_transno_lock);
+ cfs_spin_lock(&mdt->mdt_transno_lock);
mdt->mdt_last_transno = max(last_transno,
mdt->mdt_last_transno);
- spin_unlock(&mdt->mdt_transno_lock);
+ cfs_spin_unlock(&mdt->mdt_transno_lock);
}
err_client:
lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
- spin_lock(&mdt->mdt_transno_lock);
+ cfs_spin_lock(&mdt->mdt_transno_lock);
mdt->mdt_last_transno = lsd->lsd_last_transno;
- spin_unlock(&mdt->mdt_transno_lock);
+ cfs_spin_unlock(&mdt->mdt_transno_lock);
CDEBUG(D_INODE, "========BEGIN DUMPING LAST_RCVD========\n");
CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
if (rc)
GOTO(err_client, rc);
- spin_lock(&mdt->mdt_transno_lock);
+ cfs_spin_lock(&mdt->mdt_transno_lock);
/* obd_last_committed is used for compatibility
* with other lustre recovery code */
obd->obd_last_committed = mdt->mdt_last_transno;
- spin_unlock(&mdt->mdt_transno_lock);
+ cfs_spin_unlock(&mdt->mdt_transno_lock);
mdt->mdt_mount_count = mount_count + 1;
lsd->lsd_mount_count = mdt->mdt_mount_count;
CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
mdt->mdt_mount_count, mdt->mdt_last_transno);
- spin_lock(&mdt->mdt_transno_lock);
+ cfs_spin_lock(&mdt->mdt_transno_lock);
mdt->mdt_lsd.lsd_last_transno = mdt->mdt_last_transno;
- spin_unlock(&mdt->mdt_transno_lock);
+ cfs_spin_unlock(&mdt->mdt_transno_lock);
rc = mdt_last_rcvd_header_write(env, mdt, th);
mdt_trans_stop(env, mdt, th);
/* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
* there's no need for extra complication here
*/
- spin_lock(&mdt->mdt_client_bitmap_lock);
- cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
+ cfs_spin_lock(&mdt->mdt_client_bitmap_lock);
+ cl_idx = cfs_find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
if (cl_idx >= LR_MAX_CLIENTS ||
OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n",
cl_idx);
- spin_unlock(&mdt->mdt_client_bitmap_lock);
+ cfs_spin_unlock(&mdt->mdt_client_bitmap_lock);
RETURN(-EOVERFLOW);
}
- set_bit(cl_idx, bitmap);
- spin_unlock(&mdt->mdt_client_bitmap_lock);
+ cfs_set_bit(cl_idx, bitmap);
+ cfs_spin_unlock(&mdt->mdt_client_bitmap_lock);
CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
cl_idx, med->med_lcd->lcd_uuid);
med->med_lr_idx = cl_idx;
med->med_lr_off = lsd->lsd_client_start +
(cl_idx * lsd->lsd_client_size);
- init_mutex(&med->med_lcd_lock);
+ cfs_init_mutex(&med->med_lcd_lock);
LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
/* Write new client data. */
off = med->med_lr_off;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
+ RETURN(-ENOSPC);
+
mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
th = mdt_trans_start(env, mdt);
* server down with lots of sync writes.
*/
mdt_trans_add_cb(th, lut_cb_client, class_export_cb_get(mti->mti_exp));
- spin_lock(&mti->mti_exp->exp_lock);
+ cfs_spin_lock(&mti->mti_exp->exp_lock);
mti->mti_exp->exp_need_sync = 1;
- spin_unlock(&mti->mti_exp->exp_lock);
+ cfs_spin_unlock(&mti->mti_exp->exp_lock);
rc = mdt_last_rcvd_write(env, mdt, lcd, &off, th);
CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n",
if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
RETURN(0);
- spin_lock(&mdt->mdt_client_bitmap_lock);
- if (test_and_set_bit(cl_idx, bitmap)) {
+ cfs_spin_lock(&mdt->mdt_client_bitmap_lock);
+ if (cfs_test_and_set_bit(cl_idx, bitmap)) {
CERROR("MDS client %d: bit already set in bitmap!!\n",
cl_idx);
LBUG();
}
- spin_unlock(&mdt->mdt_client_bitmap_lock);
+ cfs_spin_unlock(&mdt->mdt_client_bitmap_lock);
CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
cl_idx, med->med_lcd->lcd_uuid);
med->med_lr_idx = cl_idx;
med->med_lr_off = lsd->lsd_client_start +
(cl_idx * lsd->lsd_client_size);
- init_mutex(&med->med_lcd_lock);
+ cfs_init_mutex(&med->med_lcd_lock);
LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
* Clear the bit _after_ zeroing out the client so we don't race with
* mdt_client_add and zero out new clients.
*/
- if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
+ if (!cfs_test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
CERROR("MDT client %u: bit already clear in bitmap!!\n",
med->med_lr_idx);
LBUG();
if (IS_ERR(th))
GOTO(free, rc = PTR_ERR(th));
- mutex_down(&med->med_lcd_lock);
- memset(lcd, 0, sizeof *lcd);
+ cfs_mutex_down(&med->med_lcd_lock);
+ memset(lcd->lcd_uuid, 0, sizeof lcd->lcd_uuid);
rc = mdt_last_rcvd_write(env, mdt, lcd, &off, th);
- mutex_up(&med->med_lcd_lock);
+ cfs_mutex_up(&med->med_lcd_lock);
mdt_trans_stop(env, mdt, th);
- spin_lock(&mdt->mdt_client_bitmap_lock);
- clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap);
- spin_unlock(&mdt->mdt_client_bitmap_lock);
-
CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in "
"%s, rc %d\n", med->med_lr_idx, LAST_RCVD, rc);
- EXIT;
+ RETURN(0);
free:
- OBD_FREE_PTR(lcd);
- med->med_lcd = NULL;
return 0;
}
LASSERT(mdt);
med = &req->rq_export->exp_mdt_data;
LASSERT(med);
+
+ cfs_mutex_down(&med->med_lcd_lock);
lcd = med->med_lcd;
- /* if the export has already been failed, we have no last_rcvd slot */
- if (req->rq_export->exp_failed) {
+ /* if the export has already been disconnected, we have no last_rcvd slot,
+ * update server data with latest transno then */
+ if (lcd == NULL) {
+ cfs_mutex_up(&med->med_lcd_lock);
CWARN("commit transaction for disconnected client %s: rc %d\n",
req->rq_export->exp_client_uuid.uuid, rc);
- if (rc == 0)
- rc = -ENOTCONN;
- RETURN(rc);
+ err = mdt_last_rcvd_header_write(mti->mti_env, mdt, th);
+ RETURN(err);
}
off = med->med_lr_off;
LASSERT(ergo(mti->mti_transno == 0, rc != 0));
- mutex_down(&med->med_lcd_lock);
if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
if (mti->mti_transno != 0)
} else {
err = mdt_last_rcvd_write(mti->mti_env, mdt, lcd, &off, th);
}
- mutex_up(&med->med_lcd_lock);
+ cfs_mutex_up(&med->med_lcd_lock);
RETURN(err);
}
}
mti->mti_has_trans = 1;
- spin_lock(&mdt->mdt_transno_lock);
+ cfs_spin_lock(&mdt->mdt_transno_lock);
if (txn->th_result != 0) {
if (mti->mti_transno != 0) {
CERROR("Replay transno "LPU64" failed: rc %i\n",
if (mti->mti_transno > mdt->mdt_last_transno)
mdt->mdt_last_transno = mti->mti_transno;
}
- spin_unlock(&mdt->mdt_transno_lock);
+ cfs_spin_unlock(&mdt->mdt_transno_lock);
/* sometimes the reply message has not been successfully packed */
LASSERT(req != NULL && req->rq_repmsg != NULL);
req->rq_transno = mti->mti_transno;
lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno);
- lustre_msg_set_last_xid(req->rq_repmsg,
- lcd_last_xid(req->rq_export->exp_mdt_data.med_lcd));
/* save transno for the commit callback */
txi->txi_transno = mti->mti_transno;
static void mdt_steal_ack_locks(struct ptlrpc_request *req)
{
struct obd_export *exp = req->rq_export;
- struct list_head *tmp;
+ cfs_list_t *tmp;
struct ptlrpc_reply_state *oldrep;
struct ptlrpc_service *svc;
int i;
/* CAVEAT EMPTOR: spinlock order */
- spin_lock(&exp->exp_lock);
- list_for_each (tmp, &exp->exp_outstanding_replies) {
- oldrep = list_entry(tmp, struct ptlrpc_reply_state,rs_exp_list);
+ cfs_spin_lock(&exp->exp_lock);
+ cfs_list_for_each (tmp, &exp->exp_outstanding_replies) {
+ oldrep = cfs_list_entry(tmp, struct ptlrpc_reply_state,
+ rs_exp_list);
if (oldrep->rs_xid != req->rq_xid)
continue;
oldrep->rs_opc);
svc = oldrep->rs_service;
- spin_lock (&svc->srv_lock);
+ cfs_spin_lock (&svc->srv_lock);
- list_del_init (&oldrep->rs_exp_list);
+ cfs_list_del_init (&oldrep->rs_exp_list);
CWARN("Stealing %d locks from rs %p x"LPD64".t"LPD64
" o%d NID %s\n",
oldrep->rs_nlocks = 0;
DEBUG_REQ(D_HA, req, "stole locks for");
- spin_lock(&oldrep->rs_lock);
+ cfs_spin_lock(&oldrep->rs_lock);
ptlrpc_schedule_difficult_reply (oldrep);
- spin_unlock(&oldrep->rs_lock);
+ cfs_spin_unlock(&oldrep->rs_lock);
- spin_unlock (&svc->srv_lock);
+ cfs_spin_unlock (&svc->srv_lock);
break;
}
- spin_unlock(&exp->exp_lock);
+ cfs_spin_unlock(&exp->exp_lock);
}
/**
mo_attr_get(mti->mti_env, mdt_object_child(obj), &mti->mti_attr);
mdt_pack_attr2body(mti, body, &mti->mti_attr.ma_attr,
mdt_object_fid(obj));
- if (mti->mti_epoch && (mti->mti_epoch->flags & MF_EPOCH_OPEN)) {
+ if (mti->mti_ioepoch && (mti->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
struct mdt_file_data *mfd;
struct mdt_body *repbody;
repbody = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY);
repbody->ioepoch = obj->mot_ioepoch;
- spin_lock(&med->med_open_lock);
- list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
+ cfs_spin_lock(&med->med_open_lock);
+ cfs_list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
if (mfd->mfd_xid == req->rq_xid)
break;
}
LASSERT(&mfd->mfd_list != &med->med_open_head);
- spin_unlock(&med->med_open_lock);
+ cfs_spin_unlock(&med->med_open_lock);
repbody->handle.cookie = mfd->mfd_handle.h_cookie;
}