X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_recovery.c;h=10794c3b3c123258ebd39cc34bbdab73c6f5a409;hb=033480704626652b36490a50a6359e74c1490690;hp=c32ae5a5ae23ce148f6049989ab2102f4802acb4;hpb=78bf713b641cad535b1089fc162fc599c244d283;p=fs%2Flustre-release.git diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index c32ae5a..10794c3 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -78,38 +78,6 @@ const struct lu_buf *mdt_buf_const(const struct lu_env *env, return buf; } -int mdt_record_read(const struct lu_env *env, - struct dt_object *dt, struct lu_buf *buf, loff_t *pos) -{ - int rc; - - LASSERTF(dt != NULL, "dt is NULL when we want to read record\n"); - - rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA); - - if (rc == buf->lb_len) - rc = 0; - else if (rc >= 0) - rc = -EFAULT; - return rc; -} - -int mdt_record_write(const struct lu_env *env, - struct dt_object *dt, const struct lu_buf *buf, - loff_t *pos, struct thandle *th) -{ - int rc; - - LASSERTF(dt != NULL, "dt is NULL when we want to write record\n"); - LASSERT(th != NULL); - rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1); - if (rc == buf->lb_len) - rc = 0; - else if (rc >= 0) - rc = -EFAULT; - return rc; -} - static inline int mdt_trans_credit_get(const struct lu_env *env, struct mdt_device *mdt, enum mdt_txn_op op) @@ -166,61 +134,6 @@ void mdt_trans_stop(const struct lu_env *env, mdt->mdt_bottom->dd_ops->dt_trans_stop(env, th); } -/* last_rcvd handling */ -static inline void lsd_le_to_cpu(struct lr_server_data *buf, - struct lr_server_data *lsd) -{ - memcpy(lsd->lsd_uuid, buf->lsd_uuid, sizeof (lsd->lsd_uuid)); - lsd->lsd_last_transno = le64_to_cpu(buf->lsd_last_transno); - lsd->lsd_mount_count = le64_to_cpu(buf->lsd_mount_count); - lsd->lsd_feature_compat = le32_to_cpu(buf->lsd_feature_compat); - lsd->lsd_feature_rocompat = le32_to_cpu(buf->lsd_feature_rocompat); - lsd->lsd_feature_incompat = le32_to_cpu(buf->lsd_feature_incompat); - lsd->lsd_server_size = le32_to_cpu(buf->lsd_server_size); - lsd->lsd_client_start = le32_to_cpu(buf->lsd_client_start); - lsd->lsd_client_size = le16_to_cpu(buf->lsd_client_size); -} - -static inline void lsd_cpu_to_le(struct lr_server_data *lsd, - struct lr_server_data *buf) -{ - memcpy(buf->lsd_uuid, lsd->lsd_uuid, sizeof (lsd->lsd_uuid)); - buf->lsd_last_transno = cpu_to_le64(lsd->lsd_last_transno); - buf->lsd_mount_count = cpu_to_le64(lsd->lsd_mount_count); - buf->lsd_feature_compat = cpu_to_le32(lsd->lsd_feature_compat); - buf->lsd_feature_rocompat = cpu_to_le32(lsd->lsd_feature_rocompat); - buf->lsd_feature_incompat = cpu_to_le32(lsd->lsd_feature_incompat); - buf->lsd_server_size = cpu_to_le32(lsd->lsd_server_size); - buf->lsd_client_start = cpu_to_le32(lsd->lsd_client_start); - buf->lsd_client_size = cpu_to_le16(lsd->lsd_client_size); -} - -static inline void lcd_le_to_cpu(struct lsd_client_data *buf, - struct lsd_client_data *lcd) -{ - memcpy(lcd->lcd_uuid, buf->lcd_uuid, sizeof (lcd->lcd_uuid)); - lcd->lcd_last_transno = le64_to_cpu(buf->lcd_last_transno); - lcd->lcd_last_xid = le64_to_cpu(buf->lcd_last_xid); - lcd->lcd_last_result = le32_to_cpu(buf->lcd_last_result); - lcd->lcd_last_data = le32_to_cpu(buf->lcd_last_data); - lcd->lcd_last_close_transno = le64_to_cpu(buf->lcd_last_close_transno); - lcd->lcd_last_close_xid = le64_to_cpu(buf->lcd_last_close_xid); - lcd->lcd_last_close_result = le32_to_cpu(buf->lcd_last_close_result); -} - -static inline void lcd_cpu_to_le(struct lsd_client_data *lcd, - struct lsd_client_data *buf) -{ - memcpy(buf->lcd_uuid, lcd->lcd_uuid, sizeof (lcd->lcd_uuid)); - buf->lcd_last_transno = cpu_to_le64(lcd->lcd_last_transno); - buf->lcd_last_xid = cpu_to_le64(lcd->lcd_last_xid); - buf->lcd_last_result = cpu_to_le32(lcd->lcd_last_result); - buf->lcd_last_data = cpu_to_le32(lcd->lcd_last_data); - buf->lcd_last_close_transno = cpu_to_le64(lcd->lcd_last_close_transno); - buf->lcd_last_close_xid = cpu_to_le64(lcd->lcd_last_close_xid); - buf->lcd_last_close_result = cpu_to_le32(lcd->lcd_last_close_result); -} - static inline int mdt_last_rcvd_header_read(const struct lu_env *env, struct mdt_device *mdt) { @@ -230,9 +143,9 @@ static inline int mdt_last_rcvd_header_read(const struct lu_env *env, mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); mti->mti_off = 0; - rc = mdt_record_read(env, mdt->mdt_last_rcvd, - mdt_buf(env, &mti->mti_lsd, sizeof(mti->mti_lsd)), - &mti->mti_off); + rc = dt_record_read(env, mdt->mdt_last_rcvd, + mdt_buf(env, &mti->mti_lsd, sizeof(mti->mti_lsd)), + &mti->mti_off); if (rc == 0) lsd_le_to_cpu(&mti->mti_lsd, &mdt->mdt_lsd); @@ -244,13 +157,6 @@ static inline int mdt_last_rcvd_header_read(const struct lu_env *env, return rc; } -static void mdt_client_cb(const struct mdt_device *mdt, __u64 transno, - void *data, int err) -{ - struct obd_device *obd = mdt2obd_dev(mdt); - target_client_add_cb(obd, transno, data, err); -} - static inline int mdt_last_rcvd_header_write(const struct lu_env *env, struct mdt_device *mdt, int need_sync) @@ -276,12 +182,12 @@ static inline int mdt_last_rcvd_header_write(const struct lu_env *env, lsd_cpu_to_le(&mdt->mdt_lsd, &mti->mti_lsd); if (need_sync && mti->mti_exp) - mdt_trans_add_cb(th, mdt_client_cb, mti->mti_exp); + mdt_trans_add_cb(th, lut_cb_client, mti->mti_exp); - rc = mdt_record_write(env, mdt->mdt_last_rcvd, - mdt_buf_const(env, &mti->mti_lsd, - sizeof(mti->mti_lsd)), - &mti->mti_off, th); + rc = dt_record_write(env, mdt->mdt_last_rcvd, + mdt_buf_const(env, &mti->mti_lsd, + sizeof(mti->mti_lsd)), + &mti->mti_off, th); mdt_trans_stop(env, mdt, th); @@ -302,8 +208,8 @@ static int mdt_last_rcvd_read(const struct lu_env *env, mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); tmp = &mti->mti_lcd; - rc = mdt_record_read(env, mdt->mdt_last_rcvd, - mdt_buf(env, tmp, sizeof(*tmp)), off); + rc = dt_record_read(env, mdt->mdt_last_rcvd, + mdt_buf(env, tmp, sizeof(*tmp)), off); if (rc == 0) lcd_le_to_cpu(tmp, lcd); @@ -344,8 +250,8 @@ static int mdt_last_rcvd_write(const struct lu_env *env, lcd_cpu_to_le(lcd, tmp); - rc = mdt_record_write(env, mdt->mdt_last_rcvd, - mdt_buf_const(env, tmp, sizeof(*tmp)), off, th); + rc = dt_record_write(env, mdt->mdt_last_rcvd, + mdt_buf_const(env, tmp, sizeof(*tmp)), off, th); CDEBUG(D_INFO, "write lcd @%d rc = %d:\n" "uuid = %s\n" @@ -440,6 +346,8 @@ static int mdt_clients_data_init(const struct lu_env *env, rc = mdt_client_add(env, mdt, cl_idx); /* can't fail existing */ LASSERTF(rc == 0, "rc = %d\n", rc); + /* VBR: set export last committed version */ + exp->exp_last_committed = last_transno; lcd = NULL; spin_lock(&exp->exp_lock); exp->exp_connecting = 0; @@ -578,7 +486,7 @@ static int mdt_server_data_init(const struct lu_env *env, lsd->lsd_mount_count = mdt->mdt_mount_count; /* save it, so mount count and last_transno is current */ - rc = mdt_server_data_update(env, mdt, (mti->mti_exp && + rc = mdt_server_data_update(env, mdt, (mti->mti_exp && mti->mti_exp->exp_need_sync)); if (rc) GOTO(err_client, rc); @@ -586,7 +494,7 @@ static int mdt_server_data_init(const struct lu_env *env, RETURN(0); err_client: - target_recovery_fini(obd); + class_disconnect_exports(obd); out: return rc; } @@ -671,13 +579,13 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt) if (IS_ERR(th)) RETURN(PTR_ERR(th)); - /* + /* * Until this operations will be committed the sync is needed * for this export. This should be done _after_ starting the * transaction so that many connecting clients will not bring - * server down with lots of sync writes. + * server down with lots of sync writes. */ - mdt_trans_add_cb(th, mdt_client_cb, mti->mti_exp); + mdt_trans_add_cb(th, lut_cb_client, mti->mti_exp); spin_lock(&mti->mti_exp->exp_lock); mti->mti_exp->exp_need_sync = 1; spin_unlock(&mti->mti_exp->exp_lock); @@ -813,11 +721,11 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt) GOTO(free, rc = PTR_ERR(th)); if (need_sync) { - /* + /* * Until this operations will be committed the sync - * is needed for this export. + * is needed for this export. */ - mdt_trans_add_cb(th, mdt_client_cb, exp); + mdt_trans_add_cb(th, lut_cb_client, exp); } mutex_down(&med->med_lcd_lock); @@ -836,10 +744,10 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt) clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap); spin_unlock(&mdt->mdt_client_bitmap_lock); - /* + /* * Make sure the server's last_transno is up to date. Do this * after the client is freed so we know all the client's - * transactions have been committed. + * transactions have been committed. */ mdt_server_data_update(env, mdt, need_sync); @@ -863,7 +771,6 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti, loff_t off; int err; __s32 rc = th->th_result; - __u64 *transno_p; ENTRY; LASSERT(req); @@ -882,14 +789,25 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti, } off = med->med_lr_off; + LASSERT(ergo(mti->mti_transno == 0, rc != 0)); mutex_down(&med->med_lcd_lock); if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE || lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) { - transno_p = &lcd->lcd_last_close_transno; + if (mti->mti_transno != 0) + lcd->lcd_last_close_transno = mti->mti_transno; lcd->lcd_last_close_xid = req->rq_xid; lcd->lcd_last_close_result = rc; } else { - transno_p = &lcd->lcd_last_transno; + /* VBR: save versions in last_rcvd for reconstruct. */ + __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg); + if (pre_versions) { + lcd->lcd_pre_versions[0] = pre_versions[0]; + lcd->lcd_pre_versions[1] = pre_versions[1]; + lcd->lcd_pre_versions[2] = pre_versions[2]; + lcd->lcd_pre_versions[3] = pre_versions[3]; + } + if (mti->mti_transno != 0) + lcd->lcd_last_transno = mti->mti_transno; lcd->lcd_last_xid = req->rq_xid; lcd->lcd_last_result = rc; /*XXX: save intent_disposition in mdt_thread_info? @@ -898,20 +816,6 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti, lcd->lcd_last_data = mti->mti_opdata; } - /* - * When we store zero transno in lcd we can lost last transno value - * because lcd contains 0, but lsd is not yet written - * The server data should be updated also if the latest - * transno is rewritten by zero. See the bug 11125 for details. - */ - if (mti->mti_transno == 0 && - *transno_p == mdt->mdt_last_transno) - mdt_server_data_update(mti->mti_env, mdt, - (mti->mti_exp && - mti->mti_exp->exp_need_sync)); - - *transno_p = mti->mti_transno; - if (off <= 0) { CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off); err = -EINVAL; @@ -935,6 +839,17 @@ static int mdt_txn_start_cb(const struct lu_env *env, return 0; } +/* Set new object versions */ +static void mdt_versions_set(struct mdt_thread_info *info) +{ + int i; + for (i = 0; i < PTLRPC_NUM_VERSIONS; i++) + if (info->mti_mos[i] != NULL) + mo_version_set(info->mti_env, + mdt_object_child(info->mti_mos[i]), + info->mti_transno); +} + /* Update last_rcvd records with latests transaction data */ static int mdt_txn_stop_cb(const struct lu_env *env, struct thandle *txn, void *cookie) @@ -969,7 +884,6 @@ static int mdt_txn_stop_cb(const struct lu_env *env, if (mti->mti_transno != 0) { CERROR("Replay transno "LPU64" failed: rc %i\n", mti->mti_transno, txn->th_result); - mti->mti_transno = 0; } } else if (mti->mti_transno == 0) { mti->mti_transno = ++ mdt->mdt_last_transno; @@ -978,10 +892,14 @@ static int mdt_txn_stop_cb(const struct lu_env *env, if (mti->mti_transno > mdt->mdt_last_transno) mdt->mdt_last_transno = mti->mti_transno; } - + spin_unlock(&mdt->mdt_transno_lock); /* sometimes the reply message has not been successfully packed */ LASSERT(req != NULL && req->rq_repmsg != NULL); + /** VBR: set new versions */ + if (txn->th_result == 0) + mdt_versions_set(mti); + /* filling reply data */ CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n", mti->mti_transno, req->rq_export->exp_obd->obd_last_committed); @@ -992,7 +910,10 @@ static int mdt_txn_stop_cb(const struct lu_env *env, lcd_last_xid(req->rq_export->exp_mdt_data.med_lcd)); /* save transno for the commit callback */ txi->txi_transno = mti->mti_transno; - spin_unlock(&mdt->mdt_transno_lock); + + /* add separate commit callback for transaction handling because we need + * export as parameter */ + mdt_trans_add_cb(txn, lut_cb_last_committed, mti->mti_exp); return mdt_last_rcvd_update(mti, txn); } @@ -1002,29 +923,15 @@ static int mdt_txn_commit_cb(const struct lu_env *env, struct thandle *txn, void *cookie) { struct mdt_device *mdt = cookie; - struct obd_device *obd = mdt2obd_dev(mdt); struct mdt_txn_info *txi; int i; txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key); - /* copy of obd_transno_commit_cb() but with locking */ - spin_lock(&mdt->mdt_transno_lock); - if (txi->txi_transno > obd->obd_last_committed) { - obd->obd_last_committed = txi->txi_transno; - spin_unlock(&mdt->mdt_transno_lock); - ptlrpc_commit_replies(obd); - } else - spin_unlock(&mdt->mdt_transno_lock); - - if (txi->txi_transno) - CDEBUG(D_HA, "%s: transno "LPD64" is committed\n", - obd->obd_name, txi->txi_transno); - /* iterate through all additional callbacks */ for (i = 0; i < txi->txi_cb_count; i++) { - txi->txi_cb[i].mdt_cb_func(mdt, txi->txi_transno, - txi->txi_cb[i].mdt_cb_data, 0); + txi->txi_cb[i].lut_cb_func(&mdt->mdt_lut, txi->txi_transno, + txi->txi_cb[i].lut_cb_data, 0); } return 0; } @@ -1046,21 +953,14 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb; mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb; mdt->mdt_txn_cb.dtc_cookie = mdt; + mdt->mdt_txn_cb.dtc_tag = LCT_MD_THREAD; CFS_INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage); dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb); - o = dt_store_open(env, mdt->mdt_bottom, "", LAST_RCVD, &fid); - if (!IS_ERR(o)) { - mdt->mdt_last_rcvd = o; - rc = mdt_server_data_init(env, mdt, lsi); - if (rc) - GOTO(put_last_rcvd, rc); - } else { - rc = PTR_ERR(o); - CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc); + rc = mdt_server_data_init(env, mdt, lsi); + if (rc) RETURN(rc); - } o = dt_store_open(env, mdt->mdt_bottom, "", CAPA_KEYS, &fid); if (!IS_ERR(o)) { @@ -1071,16 +971,15 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, } else { rc = PTR_ERR(o); CERROR("cannot open %s: rc = %d\n", CAPA_KEYS, rc); - GOTO(put_last_rcvd, rc); + GOTO(disconnect_exports, rc); } RETURN(0); put_ck_object: lu_object_put(env, &o->do_lu); mdt->mdt_ck_obj = NULL; -put_last_rcvd: - lu_object_put(env, &mdt->mdt_last_rcvd->do_lu); - mdt->mdt_last_rcvd = NULL; +disconnect_exports: + class_disconnect_exports(obd); return rc; } @@ -1090,9 +989,6 @@ void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt) /* Remove transaction callback */ dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb); - if (mdt->mdt_last_rcvd) - lu_object_put(env, &mdt->mdt_last_rcvd->do_lu); - mdt->mdt_last_rcvd = NULL; if (mdt->mdt_ck_obj) lu_object_put(env, &mdt->mdt_ck_obj->do_lu); mdt->mdt_ck_obj = NULL; @@ -1151,6 +1047,20 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req) spin_unlock(&exp->exp_lock); } +/** + * VBR: restore versions + */ +void mdt_vbr_reconstruct(struct ptlrpc_request *req, + struct lsd_client_data *lcd) +{ + __u64 pre_versions[4] = {0}; + pre_versions[0] = lcd->lcd_pre_versions[0]; + pre_versions[1] = lcd->lcd_pre_versions[1]; + pre_versions[2] = lcd->lcd_pre_versions[2]; + pre_versions[3] = lcd->lcd_pre_versions[3]; + lustre_msg_set_versions(req->rq_repmsg, pre_versions); +} + void mdt_req_from_lcd(struct ptlrpc_request *req, struct lsd_client_data *lcd) { @@ -1161,14 +1071,18 @@ void mdt_req_from_lcd(struct ptlrpc_request *req, lustre_msg_get_opc(req->rq_repmsg) == MDS_DONE_WRITING) { req->rq_transno = lcd->lcd_last_close_transno; req->rq_status = lcd->lcd_last_close_result; - lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); - lustre_msg_set_status(req->rq_repmsg, req->rq_status); } else { req->rq_transno = lcd->lcd_last_transno; req->rq_status = lcd->lcd_last_result; - lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); - lustre_msg_set_status(req->rq_repmsg, req->rq_status); + mdt_vbr_reconstruct(req, lcd); } + if (req->rq_status != 0) + req->rq_transno = 0; + lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); + lustre_msg_set_status(req->rq_repmsg, req->rq_status); + DEBUG_REQ(D_RPCTRACE, req, "restoring transno "LPD64"/status %d", + req->rq_transno, req->rq_status); + mdt_steal_ack_locks(req); }