/* only one record write */
enum {
- MDT_TXN_WRITE_RECORD_CREDITS = 3
+ MDT_TXN_LAST_RCVD_WRITE_CREDITS = 3
};
static struct thandle* mdt_trans_start(const struct lu_context *ctx,
- struct mdt_device *mdt)
+ struct mdt_device *mdt, int credits)
{
struct mdt_thread_info *mti;
struct txn_param *p;
mti = lu_context_key_get(ctx, &mdt_thread_key);
p = &mti->mti_txn_param;
- p->tp_credits = MDT_TXN_WRITE_RECORD_CREDITS;
+ p->tp_credits = credits;
return mdt->mdt_bottom->dd_ops->dt_trans_start(ctx, mdt->mdt_bottom, p);
}
mti = lu_context_key_get(ctx, &mdt_thread_key);
- th = mdt_trans_start(ctx, mdt);
+ th = mdt_trans_start(ctx, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
cl_idx, last_transno);
/* protect __u64 value update */
spin_lock(&mdt->mdt_transno_lock);
- if (last_transno > mdt->mdt_last_transno)
- mdt->mdt_last_transno = last_transno;
+ mdt->mdt_last_transno = max(last_transno,
+ mdt->mdt_last_transno);
spin_unlock(&mdt->mdt_transno_lock);
}
if (rc)
RETURN(rc);
- last_rcvd_size = la->la_size;
+ last_rcvd_size = (unsigned long)la->la_size;
if (last_rcvd_size == 0) {
LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
rc = mdt_last_rcvd_header_write(ctx, mdt, msd);
RETURN(rc);
}
-#if 0
+
int mdt_client_new(const struct lu_context *ctx,
struct mdt_device *mdt,
struct mdt_export_data *med)
unsigned long *bitmap = mdt->mdt_client_bitmap;
struct mdt_client_data *mcd = med->med_mcd;
struct mdt_server_data *msd = &mdt->mdt_msd;
+ struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
+ struct mdt_thread_info *mti;
+ struct thandle *th;
+ loff_t off;
int rc = 0;
+ int cl_idx;
ENTRY;
LASSERT(bitmap != NULL);
-
+ if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
+ RETURN(0);
+ mti = lu_context_key_get(ctx, &mdt_thread_key);
/* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
* there's no need for extra complication here
*/
+ spin_lock(&mdt->mdt_client_bitmap_lock);
cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
-repeat:
if (cl_idx >= LR_MAX_CLIENTS ||
MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
- return -EOVERFLOW;
- }
- if (test_and_set_bit(cl_idx, bitmap)) {
- cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
- cl_idx);
- goto repeat;
+ spin_unlock(&mdt->mdt_client_bitmap_lock);
+ RETURN(-EOVERFLOW);
}
+ set_bit(cl_idx, bitmap);
+ spin_unlock(&mdt->mdt_client_bitmap_lock);
CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
cl_idx, med->med_mcd->mcd_uuid);
init_mutex(&med->med_mcd_lock);
LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
+ /* write new client data */
+ off = med->med_lr_off;
+ th = mdt_trans_start(ctx, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
+ CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
+ cl_idx, med->med_lr_off, sizeof(*mcd));
+ mdt_trans_stop(ctx, mdt, th);
- if (new_client) {
- loff_t off = med->med_lr_off;
- struct thandle *th;
- th = mdt_trans_start(ctx, mdt);
- if (IS_ERR(th))
- RETURN(PTR_ERR(th));
-
- rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
- CDEBUG(D_INFO,
- "wrote client mcd at idx %u off %llu (len %u)\n",
- cl_idx, med->med_lr_off, sizeof(*mcd));
- mdt_trans_stop(ctx, mdt, th);
- }
RETURN(rc);
}
-#endif
+
/* Add client data to the MDS. We use a bitmap to locate a free space
* in the last_rcvd file if cl_off is -1 (i.e. a new client).
* Otherwise, we just have to read the data from the last_rcvd file and
struct mdt_export_data *med, int cl_idx)
{
unsigned long *bitmap = mdt->mdt_client_bitmap;
- struct mdt_client_data *mcd = med->med_mcd;
+ struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
struct mdt_server_data *msd = &mdt->mdt_msd;
- int new_client = (cl_idx == -1);
int rc = 0;
ENTRY;
LASSERT(bitmap != NULL);
- LASSERTF(cl_idx > -2, "%d\n", cl_idx);
+ LASSERTF(cl_idx >= 0, "%d\n", cl_idx);
- /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
- * there's no need for extra complication here
- */
- if (new_client) {
- cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
- repeat:
- if (cl_idx >= LR_MAX_CLIENTS ||
- MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
- CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
- return -EOVERFLOW;
- }
- if (test_and_set_bit(cl_idx, bitmap)) {
- cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
- cl_idx);
- goto repeat;
- }
- } else {
- if (test_and_set_bit(cl_idx, bitmap)) {
- CERROR("MDS client %d: bit already set in bitmap!!\n",
- cl_idx);
- LBUG();
- }
+ if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
+ RETURN(0);
+
+ spin_lock(&mdt->mdt_client_bitmap_lock);
+ if (test_and_set_bit(cl_idx, bitmap)) {
+ CERROR("MDS client %d: bit already set in bitmap!!\n",
+ cl_idx);
+ LBUG();
}
+ spin_unlock(&mdt->mdt_client_bitmap_lock);
CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
cl_idx, med->med_mcd->mcd_uuid);
LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
- if (new_client) {
- loff_t off = med->med_lr_off;
- struct thandle *th;
- th = mdt_trans_start(ctx, mdt);
- if (IS_ERR(th))
- RETURN(PTR_ERR(th));
-
- rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
- CDEBUG(D_INFO,
- "wrote client mcd at idx %u off %llu (len %u)\n",
- cl_idx, med->med_lr_off, sizeof(*mcd));
- mdt_trans_stop(ctx, mdt, th);
- }
RETURN(rc);
}
-int mdt_client_free(const struct lu_context *ctx,
+int mdt_client_del(const struct lu_context *ctx,
struct mdt_device *mdt,
struct mdt_export_data *med)
{
- struct mdt_client_data *mcd = med->med_mcd;
+ struct mdt_client_data *mcd = med->med_mcd;
struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
struct thandle *th;
loff_t off;
LBUG();
}
- th = mdt_trans_start(ctx, mdt);
+ th = mdt_trans_start(ctx, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
if (IS_ERR(th))
GOTO(free, rc = PTR_ERR(th));
extern struct lu_context_key mdt_txn_key;
extern struct lu_context_key mdt_thread_key;
-enum {
- MDT_TXN_LAST_RCVD_CREDITS = 3
-};
-
/* add credits for last_rcvd update */
static int mdt_txn_start_cb(const struct lu_context *ctx,
struct txn_param *param, void *cookie)
{
- param->tp_credits += MDT_TXN_LAST_RCVD_CREDITS;
+ param->tp_credits += MDT_TXN_LAST_RCVD_WRITE_CREDITS;
return 0;
}
txi->txi_transno = 0;
return 0;
}
+ req = mdt_info_req(mti);
+ LASSERT(req != NULL);
/*TODO: checks for recovery cases, see mds_finish_transno */
spin_lock(&mdt->mdt_transno_lock);
if (txn->th_result != 0) {
} else if (mti->mti_transno == 0) {
mti->mti_transno = ++ mdt->mdt_last_transno;
} else {
- /* replay */
+ /* should be replay */
+ if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
+ CERROR("Double transaction ("LPU64") per thread!\n",
+ mti->mti_transno);
if (mti->mti_transno > mdt->mdt_last_transno)
mdt->mdt_last_transno = mti->mti_transno;
}
spin_unlock(&mdt->mdt_transno_lock);
- /* filling reply data */
- req = mdt_info_req(mti);
-
/* sometimes the reply message has not been successfully packed */
LASSERT(req != NULL && req->rq_repmsg != NULL);
+ /* filling reply data */
CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
mti->mti_transno, req->rq_export->exp_obd->obd_last_committed);