#include "mdt_internal.h"
static int mdt_server_data_update(const struct lu_env *env,
- struct mdt_device *mdt,
- int need_sync);
+ struct mdt_device *mdt);
struct lu_buf *mdt_buf(const struct lu_env *env, void *area, ssize_t len)
{
if (rc == 0)
lsd_le_to_cpu(&mti->mti_lsd, &mdt->mdt_lsd);
- CDEBUG(D_INFO, "read last_rcvd header rc = %d:\n"
- "uuid = %s\n"
- "last_transno = "LPU64"\n",
- rc, mdt->mdt_lsd.lsd_uuid,
- mdt->mdt_lsd.lsd_last_transno);
+ CDEBUG(D_INFO, "read last_rcvd header rc = %d, uuid = %s, "
+ "last_transno = "LPU64"\n", rc, mdt->mdt_lsd.lsd_uuid,
+ mdt->mdt_lsd.lsd_last_transno);
return rc;
}
-static inline int mdt_last_rcvd_header_write(const struct lu_env *env,
- struct mdt_device *mdt,
- int need_sync)
+static int mdt_last_rcvd_header_write(const struct lu_env *env,
+ struct mdt_device *mdt,
+ struct thandle *th)
{
struct mdt_thread_info *mti;
- struct thandle *th;
int rc;
ENTRY;
mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- if (mti->mti_exp) {
- spin_lock(&mti->mti_exp->exp_lock);
- mti->mti_exp->exp_need_sync = need_sync;
- spin_unlock(&mti->mti_exp->exp_lock);
- }
- mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
- th = mdt_trans_start(env, mdt);
- if (IS_ERR(th))
- RETURN(PTR_ERR(th));
-
mti->mti_off = 0;
lsd_cpu_to_le(&mdt->mdt_lsd, &mti->mti_lsd);
- if (need_sync && mti->mti_exp)
- mdt_trans_add_cb(th, lut_cb_client, mti->mti_exp);
-
rc = dt_record_write(env, mdt->mdt_last_rcvd,
mdt_buf_const(env, &mti->mti_lsd,
sizeof(mti->mti_lsd)),
&mti->mti_off, th);
- mdt_trans_stop(env, mdt, th);
-
- CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
- "uuid = %s\nlast_transno = "LPU64"\n",
- rc, mdt->mdt_lsd.lsd_uuid, mdt->mdt_lsd.lsd_last_transno);
+ CDEBUG(D_INFO, "write last_rcvd header rc = %d, uuid = %s, "
+ "last_transno = "LPU64"\n", rc, mdt->mdt_lsd.lsd_uuid,
+ mdt->mdt_lsd.lsd_last_transno);
RETURN(rc);
}
-static int mdt_last_rcvd_read(const struct lu_env *env,
- struct mdt_device *mdt,
- struct lsd_client_data *lcd, loff_t *off)
+static int mdt_last_rcvd_read(const struct lu_env *env, struct mdt_device *mdt,
+ struct lsd_client_data *lcd, loff_t *off,
+ int index)
{
struct mdt_thread_info *mti;
struct lsd_client_data *tmp;
tmp = &mti->mti_lcd;
rc = dt_record_read(env, mdt->mdt_last_rcvd,
mdt_buf(env, tmp, sizeof(*tmp)), off);
- if (rc == 0)
+ if (rc == 0) {
+ check_lcd(mdt2obd_dev(mdt)->obd_name, index, tmp);
lcd_le_to_cpu(tmp, lcd);
+ }
- CDEBUG(D_INFO, "read lcd @%d rc = %d:\n"
- "uuid = %s\n"
- "last_transno = "LPU64"\n"
- "last_xid = "LPU64"\n"
- "last_result = %u\n"
- "last_data = %u\n"
- "last_close_transno = "LPU64"\n"
- "last_close_xid = "LPU64"\n"
- "last_close_result = %u\n",
- (int)(*off - sizeof(*tmp)),
- rc,
- lcd->lcd_uuid,
- lcd->lcd_last_transno,
- lcd->lcd_last_xid,
- lcd->lcd_last_result,
- lcd->lcd_last_data,
- lcd->lcd_last_close_transno,
- lcd->lcd_last_close_xid,
- lcd->lcd_last_close_result);
+ CDEBUG(D_INFO, "read lcd @%d rc = %d, uuid = %s, last_transno = "LPU64
+ ", last_xid = "LPU64", last_result = %u, last_data = %u, "
+ "last_close_transno = "LPU64", last_close_xid = "LPU64", "
+ "last_close_result = %u\n", (int)(*off - sizeof(*tmp)),
+ rc, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
+ lcd->lcd_last_result, lcd->lcd_last_data,
+ lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
+ lcd->lcd_last_close_result);
return rc;
}
rc = dt_record_write(env, mdt->mdt_last_rcvd,
mdt_buf_const(env, tmp, sizeof(*tmp)), off, th);
- CDEBUG(D_INFO, "write lcd @%d rc = %d:\n"
- "uuid = %s\n"
- "last_transno = "LPU64"\n"
- "last_xid = "LPU64"\n"
- "last_result = %u\n"
- "last_data = %u\n"
- "last_close_transno = "LPU64"\n"
- "last_close_xid = "LPU64"\n"
- "last_close_result = %u\n",
- (int)(*off - sizeof(*tmp)),
- rc,
- lcd->lcd_uuid,
- lcd->lcd_last_transno,
- lcd->lcd_last_xid,
- lcd->lcd_last_result,
- lcd->lcd_last_data,
- lcd->lcd_last_close_transno,
- lcd->lcd_last_close_xid,
- lcd->lcd_last_close_result);
+ CDEBUG(D_INFO, "write lcd @%d rc = %d, uuid = %s, last_transno = "LPU64
+ ", last_xid = "LPU64", last_result = %u, last_data = %u, "
+ "last_close_transno = "LPU64", last_close_xid = "LPU64" ,"
+ "last_close_result = %u\n", (int)(*off - sizeof(*tmp)),
+ rc, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
+ lcd->lcd_last_result, lcd->lcd_last_data,
+ lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
+ lcd->lcd_last_close_result);
return rc;
}
off = lsd->lsd_client_start +
cl_idx * lsd->lsd_client_size;
- rc = mdt_last_rcvd_read(env, mdt, lcd, &off);
+ rc = mdt_last_rcvd_read(env, mdt, lcd, &off, cl_idx);
if (rc) {
CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
LAST_RCVD, cl_idx, off, rc);
if (PTR_ERR(exp) == -EALREADY) {
/* export already exists, zero out this one */
lcd->lcd_uuid[0] = '\0';
- } else
+ } else {
GOTO(err_client, rc = PTR_ERR(exp));
+ }
} else {
struct mdt_thread_info *mti;
mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
lsd->lsd_server_size = LR_SERVER_SIZE;
lsd->lsd_client_start = LR_CLIENT_START;
lsd->lsd_client_size = LR_CLIENT_SIZE;
+ lsd->lsd_feature_compat = OBD_COMPAT_MDT;
lsd->lsd_feature_rocompat = OBD_ROCOMPAT_LOVOBJID;
lsd->lsd_feature_incompat = OBD_INCOMPAT_MDT |
- OBD_INCOMPAT_COMMON_LR;
+ OBD_INCOMPAT_COMMON_LR;
} else {
LCONSOLE_WARN("%s: used disk, loading\n", obd->obd_name);
rc = mdt_last_rcvd_header_read(env, mdt);
obd->obd_uuid.uuid, lsd->lsd_uuid);
GOTO(out, rc = -EINVAL);
}
-
- /** evict all clients as it is first boot with old last_rcvd */
- if (!(lsd->lsd_feature_incompat & OBD_INCOMPAT_20)) {
- LCONSOLE_WARN("Mounting %s at first time on old FS, "
- "remove all clients for interop needs\n",
- obd->obd_name);
- simple_truncate(lsi->lsi_srv_mnt->mnt_sb->s_root,
- lsi->lsi_srv_mnt, LAST_RCVD,
- lsd->lsd_client_start);
- last_rcvd_size = lsd->lsd_client_start;
- }
+ lsd->lsd_feature_compat |= OBD_COMPAT_MDT;
+ lsd->lsd_feature_incompat |= OBD_INCOMPAT_MDT |
+ OBD_INCOMPAT_COMMON_LR;
}
mount_count = lsd->lsd_mount_count;
ldd = lsi->lsi_ldd;
+ if (lsd->lsd_feature_incompat & ~MDT_INCOMPAT_SUPP) {
+ CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
+ obd->obd_name,
+ lsd->lsd_feature_incompat & ~MDT_INCOMPAT_SUPP);
+ GOTO(out, rc = -EINVAL);
+ }
+ if (lsd->lsd_feature_rocompat & ~MDT_ROCOMPAT_SUPP) {
+ CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
+ obd->obd_name,
+ lsd->lsd_feature_rocompat & ~MDT_ROCOMPAT_SUPP);
+ /* XXX: Do something like remount filesystem read-only */
+ GOTO(out, rc = -EINVAL);
+ }
+ /** Interop: evict all clients at first boot with 1.8 last_rcvd */
+ if (!(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
+ LCONSOLE_WARN("Mounting %s at first time on 1.8 FS, remove all"
+ " clients for interop needs\n", obd->obd_name);
+ simple_truncate(lsi->lsi_srv_mnt->mnt_sb->s_root,
+ lsi->lsi_srv_mnt, LAST_RCVD,
+ lsd->lsd_client_start);
+ last_rcvd_size = lsd->lsd_client_start;
+ /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
+ lsd->lsd_feature_compat |= OBD_COMPAT_20;
+ }
+
if (ldd->ldd_flags & LDD_F_IAM_DIR)
lsd->lsd_feature_incompat |= OBD_INCOMPAT_IAM_DIR;
- lsd->lsd_feature_compat = 0;
- lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID | OBD_INCOMPAT_20;
+ lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
spin_lock(&mdt->mdt_transno_lock);
mdt->mdt_last_transno = lsd->lsd_last_transno;
lsd->lsd_mount_count = mdt->mdt_mount_count;
/* save it, so mount count and last_transno is current */
- rc = mdt_server_data_update(env, mdt, (mti->mti_exp &&
- mti->mti_exp->exp_need_sync));
+ rc = mdt_server_data_update(env, mdt);
if (rc)
GOTO(err_client, rc);
}
static int mdt_server_data_update(const struct lu_env *env,
- struct mdt_device *mdt,
- int need_sync)
+ struct mdt_device *mdt)
{
- int rc = 0;
- ENTRY;
+ struct mdt_thread_info *mti;
+ struct thandle *th;
+ int rc;
+
+ mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+
+ mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
+ th = mdt_trans_start(env, mdt);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
mdt->mdt_mount_count, mdt->mdt_last_transno);
mdt->mdt_lsd.lsd_last_transno = mdt->mdt_last_transno;
spin_unlock(&mdt->mdt_transno_lock);
- /*
- * This may be called from difficult reply handler and
- * mdt->mdt_last_rcvd may be NULL that time.
- */
- if (mdt->mdt_last_rcvd != NULL)
- rc = mdt_last_rcvd_header_write(env, mdt, need_sync);
- RETURN(rc);
+ rc = mdt_last_rcvd_header_write(env, mdt, th);
+ mdt_trans_stop(env, mdt, th);
+ return rc;
}
+
int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt)
{
unsigned long *bitmap = mdt->mdt_client_bitmap;
* transaction so that many connecting clients will not bring
* server down with lots of sync writes.
*/
- mdt_trans_add_cb(th, lut_cb_client, mti->mti_exp);
+ mdt_trans_add_cb(th, lut_cb_client, class_export_cb_get(mti->mti_exp));
spin_lock(&mti->mti_exp->exp_lock);
mti->mti_exp->exp_need_sync = 1;
spin_unlock(&mti->mti_exp->exp_lock);
rc = mdt_last_rcvd_write(env, mdt, lcd, &off, th);
- CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len "LPSZ")\n",
- cl_idx, med->med_lr_off, sizeof(*lcd));
+ CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n",
+ cl_idx, med->med_lr_off, (int)sizeof(*lcd));
mdt_trans_stop(env, mdt, th);
RETURN(rc);
struct obd_device *obd = mdt2obd_dev(mdt);
struct obd_export *exp;
struct thandle *th;
- int need_sync;
loff_t off;
int rc = 0;
ENTRY;
LBUG();
}
- /* Don't force sync on disconnect if aborting recovery,
- * or it does num_clients * num_osts. b=17194 */
- need_sync = (!exp->exp_libclient || exp->exp_need_sync) &&
- !(exp->exp_flags & OBD_OPT_ABORT_RECOV);
+ /* Make sure the server's last_transno is up to date.
+ * This should be done before zeroing client slot so last_transno will
+ * be in server data or in client data in case of failure */
+ mdt_server_data_update(env, mdt);
- /*
- * This may be called from difficult reply handler path and
- * mdt->mdt_last_rcvd may be NULL that time.
- */
- if (mdt->mdt_last_rcvd != NULL) {
- mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
-
- spin_lock(&exp->exp_lock);
- exp->exp_need_sync = need_sync;
- spin_unlock(&exp->exp_lock);
-
- th = mdt_trans_start(env, mdt);
- if (IS_ERR(th))
- GOTO(free, rc = PTR_ERR(th));
-
- if (need_sync) {
- /*
- * Until this operations will be committed the sync
- * is needed for this export.
- */
- mdt_trans_add_cb(th, lut_cb_client, exp);
- }
-
- mutex_down(&med->med_lcd_lock);
- memset(lcd, 0, sizeof *lcd);
-
- rc = mdt_last_rcvd_write(env, mdt, lcd, &off, th);
- mutex_up(&med->med_lcd_lock);
- mdt_trans_stop(env, mdt, th);
- }
+ mdt_trans_credit_init(env, mdt, MDT_TXN_LAST_RCVD_WRITE_OP);
+ th = mdt_trans_start(env, mdt);
+ if (IS_ERR(th))
+ GOTO(free, rc = PTR_ERR(th));
- CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in "
- "%s %ssync rc %d\n", med->med_lr_idx, LAST_RCVD,
- need_sync ? "" : "a", rc);
+ mutex_down(&med->med_lcd_lock);
+ memset(lcd, 0, sizeof *lcd);
+ rc = mdt_last_rcvd_write(env, mdt, lcd, &off, th);
+ med->med_lcd = NULL;
+ mutex_up(&med->med_lcd_lock);
+ mdt_trans_stop(env, mdt, th);
spin_lock(&mdt->mdt_client_bitmap_lock);
clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap);
spin_unlock(&mdt->mdt_client_bitmap_lock);
- /*
- * Make sure the server's last_transno is up to date. Do this
- * after the client is freed so we know all the client's
- * transactions have been committed.
- */
- mdt_server_data_update(env, mdt, need_sync);
-
- EXIT;
-free:
+ CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in "
+ "%s, rc %d\n", med->med_lr_idx, LAST_RCVD, rc);
OBD_FREE_PTR(lcd);
+ RETURN(0);
+free:
+ mutex_down(&med->med_lcd_lock);
med->med_lcd = NULL;
+ mutex_up(&med->med_lcd_lock);
+ OBD_FREE_PTR(lcd);
return 0;
}
LASSERT(mdt);
med = &req->rq_export->exp_mdt_data;
LASSERT(med);
+
+ mutex_down(&med->med_lcd_lock);
lcd = med->med_lcd;
- /* if the export has already been failed, we have no last_rcvd slot */
- if (req->rq_export->exp_failed) {
+ /* if the export has already been disconnected, we have no last_rcvd slot,
+ * update server data with latest transno then */
+ if (lcd == NULL) {
+ mutex_up(&med->med_lcd_lock);
CWARN("commit transaction for disconnected client %s: rc %d\n",
req->rq_export->exp_client_uuid.uuid, rc);
- if (rc == 0)
- rc = -ENOTCONN;
- RETURN(rc);
+ err = mdt_last_rcvd_header_write(mti->mti_env, mdt, th);
+ RETURN(err);
}
off = med->med_lr_off;
LASSERT(ergo(mti->mti_transno == 0, rc != 0));
- mutex_down(&med->med_lcd_lock);
if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
if (mti->mti_transno != 0)
req->rq_transno = mti->mti_transno;
lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno);
- lustre_msg_set_last_xid(req->rq_repmsg,
- lcd_last_xid(req->rq_export->exp_mdt_data.med_lcd));
/* save transno for the commit callback */
txi->txi_transno = mti->mti_transno;
/* add separate commit callback for transaction handling because we need
* export as parameter */
mdt_trans_add_cb(txn, lut_cb_last_committed,
- class_export_get(mti->mti_exp));
+ class_export_cb_get(mti->mti_exp));
return mdt_last_rcvd_update(mti, txn);
}