* GPL HEADER END
*/
/*
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2014, 2015, Intel Corporation.
*/
/*
#define DEBUG_SUBSYSTEM S_CLASS
#include <lu_target.h>
-#include <md_object.h>
+#include <lustre_obdo.h>
#include <lustre_update.h>
+#include <lustre_swab.h>
+#include <md_object.h>
#include <obd.h>
#include <obd_class.h>
+
#include "tgt_internal.h"
/**
struct distribute_txn_replay_req *dtrq = NULL;
list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) {
- if (tmp->dtrq_lur->lur_update_rec.ur_batchid == batchid) {
+ if (tmp->dtrq_batchid == batchid) {
dtrq = tmp;
break;
}
{
struct distribute_txn_replay_req *iter;
+ /* Check if the dtrq has been added to the list */
+ iter = dtrq_lookup(tdtd, new->dtrq_batchid);
+ if (iter != NULL)
+ return -EEXIST;
+
list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) {
- if (iter->dtrq_lur->lur_update_rec.ur_master_transno >
- new->dtrq_lur->lur_update_rec.ur_master_transno)
+ if (iter->dtrq_master_transno > new->dtrq_master_transno)
continue;
/* If there are mulitple replay req with same transno, then
* sort them with batchid */
- if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
- new->dtrq_lur->lur_update_rec.ur_master_transno &&
- iter->dtrq_lur->lur_update_rec.ur_batchid ==
- new->dtrq_lur->lur_update_rec.ur_batchid)
- return -EEXIST;
-
- /* If there are mulitple replay req with same transno, then
- * sort them with batchid */
- if (iter->dtrq_lur->lur_update_rec.ur_master_transno ==
- new->dtrq_lur->lur_update_rec.ur_master_transno &&
- iter->dtrq_lur->lur_update_rec.ur_batchid >
- new->dtrq_lur->lur_update_rec.ur_batchid)
+ if (iter->dtrq_master_transno == new->dtrq_master_transno &&
+ iter->dtrq_batchid > new->dtrq_batchid)
continue;
list_add(&new->dtrq_list, &iter->dtrq_list);
* \retval NULL if the creation fails.
*/
static struct distribute_txn_replay_req *
-dtrq_create(struct llog_update_record *lur)
+dtrq_create(struct target_distribute_txn_data *tdtd,
+ struct llog_update_record *lur)
{
struct distribute_txn_replay_req *new;
memcpy(new->dtrq_lur, lur, new->dtrq_lur_size);
+ /* If the transno in the update record is 0, it means the
+ * update are from master MDT, and it will use the master
+ * last committed transno as its master transno. Later, if
+ * the update records are gotten from slave MDTs, then these
+ * transno will be replaced.
+ * See insert_update_records_to_replay_list(). */
+ if (lur->lur_update_rec.ur_master_transno == 0) {
+ new->dtrq_lur->lur_update_rec.ur_master_transno =
+ tdtd->tdtd_lut->lut_obd->obd_last_committed;
+ new->dtrq_master_transno =
+ tdtd->tdtd_lut->lut_obd->obd_last_committed;
+ } else {
+ new->dtrq_master_transno =
+ lur->lur_update_rec.ur_master_transno;
+ }
+
+ new->dtrq_batchid = lur->lur_update_rec.ur_batchid;
+
spin_lock_init(&new->dtrq_sub_list_lock);
INIT_LIST_HEAD(&new->dtrq_sub_list);
INIT_LIST_HEAD(&new->dtrq_list);
}
/**
+ * Try to add cookie to sub distribute txn request
+ *
+ * Check if the update log cookie has been added to the request, if not,
+ * add it to the dtrqs_cookie_list.
+ *
+ * \param[in] dtrqs sub replay req where cookies to be added.
+ * \param[in] cookie cookie to be added.
+ *
+ * \retval 0 if the cookie is adding succeeds.
+ * \retval negative errno if adding fails.
+ */
+static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
+ struct llog_cookie *cookie)
+{
+ struct sub_thandle_cookie *new;
+
+ OBD_ALLOC_PTR(new);
+ if (new == NULL)
+ return -ENOMEM;
+
+ INIT_LIST_HEAD(&new->stc_list);
+ new->stc_cookie = *cookie;
+ /* Note: only single thread will access one sub_request each time,
+ * so no need lock here */
+ list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
+
+ return 0;
+}
+
+/**
* Insert distribute txn sub req replay
*
* Allocate sub replay req and insert distribute txn replay list.
struct llog_cookie *cookie,
__u32 mdt_index)
{
- struct distribute_txn_replay_req_sub *dtrqs = NULL;
- struct distribute_txn_replay_req_sub *new;
+ struct distribute_txn_replay_req_sub *dtrqs = NULL;
+ struct distribute_txn_replay_req_sub *new;
+ int rc;
ENTRY;
spin_lock(&dtrq->dtrq_sub_list_lock);
dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
spin_unlock(&dtrq->dtrq_sub_list_lock);
- if (dtrqs != NULL)
+ if (dtrqs != NULL) {
+ rc = dtrq_sub_add_cookie(dtrqs, cookie);
RETURN(0);
+ }
OBD_ALLOC_PTR(new);
if (new == NULL)
RETURN(-ENOMEM);
INIT_LIST_HEAD(&new->dtrqs_list);
+ INIT_LIST_HEAD(&new->dtrqs_cookie_list);
new->dtrqs_mdt_index = mdt_index;
- new->dtrqs_llog_cookie = *cookie;
spin_lock(&dtrq->dtrq_sub_list_lock);
dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
- if (dtrqs == NULL)
+ if (dtrqs == NULL) {
list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
- else
+ dtrqs = new;
+ } else {
OBD_FREE_PTR(new);
+ }
spin_unlock(&dtrq->dtrq_sub_list_lock);
+ rc = dtrq_sub_add_cookie(dtrqs, cookie);
+
+ RETURN(rc);
+}
+
+/**
+ * append updates to the current replay updates
+ *
+ * Append more updates to the existent replay update. And this is only
+ * used when combining mulitple updates into one large updates during
+ * replay.
+ *
+ * \param[in] dtrq the update replay request where the new update
+ * records will be added.
+ * \param[in] lur the new update record.
+ *
+ * \retval 0 if appending succeeds.
+ * \retval negative errno if appending fails.
+ */
+static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
+ struct update_records *record)
+{
+ struct llog_update_record *new_lur;
+ size_t lur_size = dtrq->dtrq_lur_size;
+ void *ptr;
+ ENTRY;
+
+ /* Because several threads might retrieve the same records from
+ * different targets, and we only need one copy of records. So
+ * we will check if the records is in the next one, if not, just
+ * skip it */
+ spin_lock(&dtrq->dtrq_sub_list_lock);
+ if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
+ spin_unlock(&dtrq->dtrq_sub_list_lock);
+ RETURN(0);
+ }
+ dtrq->dtrq_lur->lur_update_rec.ur_index++;
+ spin_unlock(&dtrq->dtrq_sub_list_lock);
+
+ lur_size += update_records_size(record);
+ OBD_ALLOC_LARGE(new_lur, lur_size);
+ if (new_lur == NULL) {
+ spin_lock(&dtrq->dtrq_sub_list_lock);
+ dtrq->dtrq_lur->lur_update_rec.ur_index--;
+ spin_unlock(&dtrq->dtrq_sub_list_lock);
+ RETURN(-ENOMEM);
+ }
+
+ /* Copy the old and new records to the new allocated buffer */
+ memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+ ptr = (char *)&new_lur->lur_update_rec +
+ update_records_size(&new_lur->lur_update_rec);
+ memcpy(ptr, &record->ur_ops,
+ update_records_size(record) -
+ offsetof(struct update_records, ur_ops));
+
+ new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
+ new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
+ new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
+
+ /* Replace the records */
+ OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+ dtrq->dtrq_lur = new_lur;
+ dtrq->dtrq_lur_size = lur_size;
+ dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
+ update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
RETURN(0);
}
__u32 mdt_index)
{
struct distribute_txn_replay_req *dtrq;
- struct update_records *record = &lur->lur_update_rec;
+ struct update_records *record = &lur->lur_update_rec;
+ bool replace_record = false;
int rc = 0;
ENTRY;
- CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
+ CDEBUG(D_HA, "%s: insert record batchid = %llu transno = %llu"
" mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
record->ur_batchid, record->ur_master_transno, mdt_index);
+
+ /* Update batchid if necessary */
+ spin_lock(&tdtd->tdtd_batchid_lock);
+ if (record->ur_batchid >= tdtd->tdtd_batchid) {
+ CDEBUG(D_HA, "%s update batchid from %llu" " to %llu\n",
+ tdtd->tdtd_lut->lut_obd->obd_name,
+ tdtd->tdtd_batchid, record->ur_batchid);
+ tdtd->tdtd_batchid = record->ur_batchid + 1;
+ }
+ spin_unlock(&tdtd->tdtd_batchid_lock);
+
again:
spin_lock(&tdtd->tdtd_replay_list_lock);
+ /* First try to build the replay update request with the records */
dtrq = dtrq_lookup(tdtd, record->ur_batchid);
- spin_unlock(&tdtd->tdtd_replay_list_lock);
if (dtrq == NULL) {
- /* If the transno in the update record is 0, it means the
- * update are from master MDT, and we will use the master
- * last committed transno as its batchid. Note: if it got
- * the records from the slave later, it needs to update
- * the batchid by the transno in slave update log (see below) */
- dtrq = dtrq_create(lur);
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+ dtrq = dtrq_create(tdtd, lur);
if (IS_ERR(dtrq))
RETURN(PTR_ERR(dtrq));
- if (record->ur_master_transno == 0)
- dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
- tdtd->tdtd_lut->lut_last_transno;
spin_lock(&tdtd->tdtd_replay_list_lock);
rc = dtrq_insert(tdtd, dtrq);
- spin_unlock(&tdtd->tdtd_replay_list_lock);
- } else if (record->ur_master_transno != 0 &&
- dtrq->dtrq_lur->lur_update_rec.ur_master_transno !=
- record->ur_master_transno) {
- /* If the master transno in update header is not matched with
- * the one in the record, then it means the dtrq is originally
- * created by master record, and we need update master transno
- * and reposition the dtrq(by master transno). */
- dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
+ if (rc < 0) {
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+ dtrq_destroy(dtrq);
+ if (rc == -EEXIST)
+ goto again;
+ return rc;
+ }
+ } else {
+ /* If the master transno in update header is not
+ * matched with the one in the record, then it means
+ * the dtrq is originally created by master record,
+ * so we need update master transno and reposition
+ * the dtrq(by master transno) in the list and also
+ * replace update record */
+ if (record->ur_master_transno != 0 &&
+ dtrq->dtrq_master_transno != record->ur_master_transno &&
+ dtrq->dtrq_lur != NULL) {
+ list_del_init(&dtrq->dtrq_list);
+ dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
record->ur_master_transno;
- list_del_init(&dtrq->dtrq_list);
- spin_lock(&tdtd->tdtd_replay_list_lock);
- rc = dtrq_insert(tdtd, dtrq);
- spin_unlock(&tdtd->tdtd_replay_list_lock);
+
+ dtrq->dtrq_master_transno = record->ur_master_transno;
+ replace_record = true;
+ /* try to insert again */
+ rc = dtrq_insert(tdtd, dtrq);
+ if (rc < 0) {
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+ dtrq_destroy(dtrq);
+ return rc;
+ }
+ }
}
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
- if (rc == -EEXIST) {
- dtrq_destroy(dtrq);
- rc = 0;
- goto again;
+ /* Because there should be only thread access the update record, so
+ * we do not need lock here */
+ if (replace_record) {
+ /* Replace the update record and master transno */
+ OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+ dtrq->dtrq_lur = NULL;
+ dtrq->dtrq_lur_size = llog_update_record_size(lur);
+ OBD_ALLOC_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+ if (dtrq->dtrq_lur == NULL)
+ return -ENOMEM;
+
+ memcpy(dtrq->dtrq_lur, lur, dtrq->dtrq_lur_size);
}
+ /* This is a partial update records, let's try to append
+ * the record to the current replay request */
+ if (record->ur_flags & UPDATE_RECORD_CONTINUE)
+ rc = dtrq_append_updates(dtrq, record);
+
+ /* Then create and add sub update request */
rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
RETURN(rc);
LASSERT(list_empty(&dtrq->dtrq_list));
spin_lock(&dtrq->dtrq_sub_list_lock);
list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
+ struct sub_thandle_cookie *stc;
+ struct sub_thandle_cookie *tmp;
+
list_del(&dtrqs->dtrqs_list);
+ list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
+ stc_list) {
+ list_del(&stc->stc_list);
+ OBD_FREE_PTR(stc);
+ }
OBD_FREE_PTR(dtrqs);
}
spin_unlock(&dtrq->dtrq_sub_list_lock);
list_del_init(&dtrq->dtrq_list);
dtrq_destroy(dtrq);
}
+ list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_finish_list,
+ dtrq_list) {
+ list_del_init(&dtrq->dtrq_list);
+ dtrq_destroy(dtrq);
+ }
spin_unlock(&tdtd->tdtd_replay_list_lock);
}
EXPORT_SYMBOL(dtrq_list_destroy);
if (!list_empty(&tdtd->tdtd_replay_list)) {
dtrq = list_entry(tdtd->tdtd_replay_list.next,
struct distribute_txn_replay_req, dtrq_list);
- transno = dtrq->dtrq_lur->lur_update_rec.ur_master_transno;
+ transno = dtrq->dtrq_master_transno;
}
spin_unlock(&tdtd->tdtd_replay_list_lock);
- CDEBUG(D_HA, "%s: Next update transno "LPU64"\n",
+ CDEBUG(D_HA, "%s: Next update transno %llu\n",
tdtd->tdtd_lut->lut_obd->obd_name, transno);
return transno;
}
EXPORT_SYMBOL(distribute_txn_get_next_transno);
+struct distribute_txn_replay_req *
+distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
+ __u64 xid)
+{
+ struct distribute_txn_replay_req *dtrq = NULL;
+ struct distribute_txn_replay_req *iter;
+
+ spin_lock(&tdtd->tdtd_replay_list_lock);
+ list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
+ if (iter->dtrq_xid == xid) {
+ dtrq = iter;
+ break;
+ }
+ }
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+ return dtrq;
+}
+
+bool is_req_replayed_by_update(struct ptlrpc_request *req)
+{
+ struct lu_target *tgt = class_exp2tgt(req->rq_export);
+ struct distribute_txn_replay_req *dtrq;
+
+ if (tgt->lut_tdtd == NULL)
+ return false;
+
+ dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd, req->rq_xid);
+ if (dtrq == NULL)
+ return false;
+
+ return true;
+}
+EXPORT_SYMBOL(is_req_replayed_by_update);
+
/**
* Check if the update of one object is committed
*
dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
st->st_committed = 1;
- if (dtrqs != NULL)
- st->st_cookie = dtrqs->dtrqs_llog_cookie;
+ if (dtrqs != NULL) {
+ struct sub_thandle_cookie *stc;
+ struct sub_thandle_cookie *tmp;
+
+ list_for_each_entry_safe(stc, tmp,
+ &dtrqs->dtrqs_cookie_list,
+ stc_list)
+ list_move(&stc->stc_list, &st->st_cookie_list);
+ }
RETURN(0);
}
}
/**
+ * Update session information
+ *
+ * Update session information so tgt_txn_stop_cb()->tgt_last_rcvd_update()
+ * can be called correctly during update replay.
+ *
+ * \param[in] env execution environment.
+ * \param[in] tdtd distribute data structure of the recovering tgt.
+ * \param[in] th thandle of this update replay.
+ * \param[in] master_th master sub thandle.
+ * \param[in] ta_arg the tx arg structure to hold the update for updating
+ * reply data.
+ */
+static void update_recovery_update_ses(struct lu_env *env,
+ struct target_distribute_txn_data *tdtd,
+ struct thandle *th,
+ struct thandle *master_th,
+ struct distribute_txn_replay_req *dtrq,
+ struct tx_arg *ta_arg)
+{
+ struct tgt_session_info *tsi;
+ struct lu_target *lut = tdtd->tdtd_lut;
+ struct obd_export *export;
+ struct cfs_hash *hash;
+ struct top_thandle *top_th;
+ struct lsd_reply_data *lrd;
+ size_t size;
+
+ tsi = tgt_ses_info(env);
+ if (tsi->tsi_exp != NULL)
+ return;
+
+ size = ta_arg->u.write.buf.lb_len;
+ lrd = ta_arg->u.write.buf.lb_buf;
+ if (size != sizeof(*lrd) || lrd == NULL)
+ return;
+
+ lrd->lrd_transno = le64_to_cpu(lrd->lrd_transno);
+ lrd->lrd_xid = le64_to_cpu(lrd->lrd_xid);
+ lrd->lrd_data = le64_to_cpu(lrd->lrd_data);
+ lrd->lrd_result = le32_to_cpu(lrd->lrd_result);
+ lrd->lrd_client_gen = le32_to_cpu(lrd->lrd_client_gen);
+
+ if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
+ return;
+
+ hash = cfs_hash_getref(lut->lut_obd->obd_gen_hash);
+ if (hash == NULL)
+ return;
+
+ export = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
+ if (export == NULL) {
+ cfs_hash_putref(hash);
+ return;
+ }
+
+ tsi->tsi_exp = export;
+ tsi->tsi_xid = lrd->lrd_xid;
+ tsi->tsi_opdata = lrd->lrd_data;
+ tsi->tsi_result = lrd->lrd_result;
+ tsi->tsi_client_gen = lrd->lrd_client_gen;
+ dtrq->dtrq_xid = lrd->lrd_xid;
+ top_th = container_of(th, struct top_thandle, tt_super);
+ top_th->tt_master_sub_thandle = master_th;
+ cfs_hash_putref(hash);
+}
+
+/**
* Execute updates in the update replay records
*
* Declare distribute txn replay by update records and add the updates
struct dt_device *sub_dt;
struct sub_thandle *st;
+ if (op->uop_type == OUT_NOOP)
+ continue;
+
dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
if (IS_ERR(dt_obj)) {
rc = PTR_ERR(dt_obj);
+ if (rc == -EREMCHG)
+ LCONSOLE_WARN("%.16s: hit invalid OI mapping "
+ "for "DFID" during recovering, "
+ "that may because auto scrub is "
+ "disabled on related MDT, and "
+ "will cause recovery failure. "
+ "Please enable auto scrub and "
+ "retry the recovery.\n",
+ tdtd->tdtd_lut->lut_obd->obd_name,
+ PFID(fid));
+
break;
}
sub_dt_obj = dt_object_child(dt_obj);
if (rc < 0)
GOTO(stop_trans, rc);
+ th->th_dev = tdtd->tdtd_dt;
ta->ta_handle = th;
/* check if the distribute transaction has been committed */
tmt = top_th->tt_multiple_thandle;
tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom;
- tmt->tmt_batchid = records->ur_batchid;
- tgt_th_info(env)->tti_transno = records->ur_master_transno;
+ tmt->tmt_batchid = dtrq->dtrq_batchid;
+ tgt_th_info(env)->tti_transno = dtrq->dtrq_master_transno;
if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid)
tmt->tmt_committed = 1;
if (rc < 0)
GOTO(stop_trans, rc);
- /* If no updates are needed to be replayed, then
- * mark this records as committed, so commit thread
- * distribute_txn_commit_thread() will delete the
- * record */
+ /* If no updates are needed to be replayed, then mark this records as
+ * committed, so commit thread distribute_txn_commit_thread() will
+ * delete the record */
if (ta->ta_argno == 0)
tmt->tmt_committed = 1;
LASSERT(tmt->tmt_committed == 0);
sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev);
st = lookup_sub_thandle(tmt, sub_dt);
+
LASSERT(st != NULL);
LASSERT(st->st_sub_th != NULL);
rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th,
ta->ta_args[i]);
+
+ /* If the update is to update the reply data, then
+ * we need set the session information, so
+ * tgt_last_rcvd_update() can be called correctly */
+ if (rc == 0 && dt_obj == tdtd->tdtd_lut->lut_reply_data)
+ update_recovery_update_ses(env, tdtd, th,
+ st->st_sub_th, dtrq, ta_arg);
+
if (unlikely(rc < 0)) {
CDEBUG(D_HA, "error during execution of #%u from"
" %s:%d: rc = %d\n", i, ta->ta_args[i]->file,
if (tur != NULL)
tur->tur_update_records = NULL;
+
+ if (tgt_ses_info(env)->tsi_exp != NULL) {
+ class_export_put(tgt_ses_info(env)->tsi_exp);
+ tgt_ses_info(env)->tsi_exp = NULL;
+ }
exit_session:
lu_context_exit(&session_env);
lu_context_fini(&session_env);