OBD_FREE_PTR(dt_update);
}
+static void
+object_update_request_dump(const struct object_update_request *ourq,
+ unsigned int mask)
+{
+ unsigned int i;
+ size_t total_size = 0;
+
+ for (i = 0; i < ourq->ourq_count; i++) {
+ struct object_update *update;
+ size_t size = 0;
+
+ update = object_update_request_get(ourq, i, &size);
+ LASSERT(update != NULL);
+ CDEBUG(mask, "i = %u fid = "DFID" op = %s master = %u"
+ "params = %d batchid = "LPU64" size = %zu\n",
+ i, PFID(&update->ou_fid),
+ update_op_str(update->ou_type),
+ update->ou_master_index, update->ou_params_count,
+ update->ou_batchid, size);
+
+ total_size += size;
+ }
+
+ CDEBUG(mask, "updates = %p magic = %x count = %d size = %zu\n", ourq,
+ ourq->ourq_magic, ourq->ourq_count, total_size);
+}
+
/**
* Allocate an osp request and initialize it with the given parameters.
*
osp_update_interpreter_t interpreter)
{
struct osp_device *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
- struct dt_update_request *update;
- int rc = 0;
+ struct dt_update_request *update;
+ struct object_update *object_update;
+ size_t max_update_size;
+ struct object_update_request *ureq;
+ int rc = 0;
ENTRY;
update = osp_find_or_create_async_update_request(osp);
RETURN(PTR_ERR(update));
again:
+ ureq = update->dur_buf.ub_req;
+ max_update_size = update->dur_buf.ub_req_size -
+ object_update_request_size(ureq);
+
+ object_update = update_buffer_get_update(ureq, ureq->ourq_count);
+ rc = out_update_pack(env, object_update, max_update_size, op,
+ lu_object_fid(osp2lu_obj(obj)), count, lens, bufs);
/* The queue is full. */
- rc = out_update_pack(env, &update->dur_buf, op,
- lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
- 0);
if (rc == -E2BIG) {
osp->opd_async_requests = NULL;
mutex_unlock(&osp->opd_async_requests_mutex);
RETURN(PTR_ERR(update));
goto again;
+ } else {
+ if (rc < 0)
+ RETURN(rc);
+
+ ureq->ourq_count++;
}
rc = osp_insert_update_callback(env, update, obj, data, interpreter);
return PTR_ERR(update);
}
+ if (dt2osp_dev(th->th_dev)->opd_connect_mdt)
+ update->dur_flags = UPDATE_FL_SYNC;
+
oth->ot_dur = update;
return 0;
}
+
+static void osp_thandle_get(struct osp_thandle *oth)
+{
+ atomic_inc(&oth->ot_refcount);
+}
+
+static void osp_thandle_put(struct osp_thandle *oth)
+{
+ if (atomic_dec_and_test(&oth->ot_refcount))
+ OBD_FREE_PTR(oth);
+}
+
/**
* The OSP layer dt_device_operations::dt_trans_create() interface
* to create a transaction.
th->th_dev = d;
th->th_tags = LCT_TX_HANDLE;
+ atomic_set(&oth->ot_refcount, 1);
+ INIT_LIST_HEAD(&oth->ot_dcb_list);
+
RETURN(th);
}
* Prepare OUT update ptlrpc request, and the request usually includes
* all of updates (stored in \param ureq) from one operation.
*
- * \param[in] env execution environment
- * \param[in] imp import on which ptlrpc request will be sent
- * \param[in] ureq hold all of updates which will be packed into the req
- * \param[in] reqp request to be created
+ * \param[in] env execution environment
+ * \param[in] imp import on which ptlrpc request will be sent
+ * \param[in] ureq hold all of updates which will be packed into the req
+ * \param[in] reqp request to be created
*
- * \retval 0 if preparation succeeds.
- * \retval negative errno if preparation fails.
+ * \retval 0 if preparation succeeds.
+ * \retval negative errno if preparation fails.
*/
int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
const struct object_update_request *ureq,
struct ptlrpc_request **reqp)
{
- struct ptlrpc_request *req;
- struct object_update_request *tmp;
- int ureq_len;
- int rc;
+ struct ptlrpc_request *req;
+ struct object_update_request *tmp;
+ int ureq_len;
+ int rc;
ENTRY;
+ object_update_request_dump(ureq, D_INFO);
req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
if (req == NULL)
RETURN(-ENOMEM);
}
req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
- RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+ RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
memcpy(tmp, ureq, ureq_len);
}
/**
- * Send update RPC.
- *
- * Send update request to the remote MDT synchronously.
- *
- * \param[in] env execution environment
- * \param[in] imp import on which ptlrpc request will be sent
- * \param[in] dt_update hold all of updates which will be packed into the req
- * \param[in] reqp request to be created
- *
- * \retval 0 if RPC succeeds.
- * \retval negative errno if RPC fails.
- */
+* Send update RPC.
+*
+* Send update request to the remote MDT synchronously.
+*
+* \param[in] env execution environment
+* \param[in] imp import on which ptlrpc request will be sent
+* \param[in] dt_update hold all of updates which will be packed into the req
+* \param[in] reqp request to be created
+*
+* \retval 0 if RPC succeeds.
+* \retval negative errno if RPC fails.
+*/
int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
struct dt_update_request *dt_update,
struct ptlrpc_request **reqp)
if (rc != 0)
RETURN(rc);
+ /* This will only be called with read-only update, and these updates
+ * might be used to retrieve update log during recovery process, so
+ * it will be allowed to send during recovery process */
+ req->rq_allow_replay = 1;
+
/* Note: some dt index api might return non-zero result here, like
* osd_index_ea_lookup, so we should only check rc < 0 here */
rc = ptlrpc_queue_wait(req);
}
/**
+ * Add commit callback to transaction.
+ *
+ * Add commit callback to the osp thandle, which will be called
+ * when the thandle is committed remotely.
+ *
+ * \param[in] th the thandle
+ * \param[in] dcb commit callback structure
+ *
+ * \retval only return 0 for now.
+ */
+int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
+{
+ struct osp_thandle *oth = thandle_to_osp_thandle(th);
+
+ LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
+ LASSERT(&dcb->dcb_func != NULL);
+ list_add(&dcb->dcb_linkage, &oth->ot_dcb_list);
+
+ return 0;
+}
+
+static void osp_trans_commit_cb(struct osp_thandle *oth, int result)
+{
+ struct dt_txn_commit_cb *dcb;
+ struct dt_txn_commit_cb *tmp;
+
+ LASSERT(atomic_read(&oth->ot_refcount) > 0);
+ /* call per-transaction callbacks if any */
+ list_for_each_entry_safe(dcb, tmp, &oth->ot_dcb_list,
+ dcb_linkage) {
+ LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
+ "commit callback entry: magic=%x name='%s'\n",
+ dcb->dcb_magic, dcb->dcb_name);
+ list_del_init(&dcb->dcb_linkage);
+ dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
+ }
+}
+
+static void osp_request_commit_cb(struct ptlrpc_request *req)
+{
+ struct thandle *th = req->rq_cb_data;
+ struct osp_thandle *oth = thandle_to_osp_thandle(th);
+ __u64 last_committed_transno = 0;
+ int result = req->rq_status;
+ ENTRY;
+
+ if (lustre_msg_get_last_committed(req->rq_repmsg))
+ last_committed_transno =
+ lustre_msg_get_last_committed(req->rq_repmsg);
+
+ if (last_committed_transno <
+ req->rq_import->imp_peer_committed_transno)
+ last_committed_transno =
+ req->rq_import->imp_peer_committed_transno;
+
+ CDEBUG(D_HA, "trans no "LPU64" committed transno "LPU64"\n",
+ req->rq_transno, last_committed_transno);
+
+ /* If the transaction is not really committed, mark result = 1 */
+ if (req->rq_transno != 0 &&
+ (req->rq_transno > last_committed_transno) && result == 0)
+ result = 1;
+
+ osp_trans_commit_cb(oth, result);
+ req->rq_committed = 1;
+ osp_thandle_put(oth);
+ EXIT;
+}
+
+/**
* Trigger the request for remote updates.
*
* If th_sync is set, then the request will be sent synchronously,
req->rq_interpret_reply = osp_update_interpret;
args = ptlrpc_req_async_args(req);
args->oaua_update = dt_update;
- if (is_only_remote_trans(th) && !th->th_sync) {
+
+ if (is_only_remote_trans(th) && !th->th_sync &&
+ !th->th_wait_submit) {
args->oaua_flow_control = true;
if (!osp->opd_connect_mdt) {
ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
} else {
- osp_get_rpc_lock(osp);
+ struct osp_thandle *oth = thandle_to_osp_thandle(th);
+ struct lu_device *top_device;
+
+ /* If the transaction is created during MDT recoverying
+ * process, it means this is an recovery update, we need
+ * to let OSP send it anyway without checking recoverying
+ * status, in case the other target is being recoveried
+ * at the same time, and if we wait here for the import
+ * to be recoveryed, it might cause deadlock */
+ top_device = osp->opd_dt_dev.dd_lu_dev.ld_site->ls_top_dev;
+ if (top_device->ld_obd->obd_recovering)
+ req->rq_allow_replay = 1;
+
args->oaua_flow_control = false;
+ req->rq_commit_cb = osp_request_commit_cb;
+ req->rq_cb_data = th;
+ osp_thandle_get(oth); /* for commit callback */
+ osp_get_rpc_lock(osp);
rc = ptlrpc_queue_wait(req);
osp_put_rpc_lock(osp);
+ if (req->rq_transno == 0 && !req->rq_committed)
+ osp_thandle_put(oth);
+ else
+ oth->ot_dur = NULL;
ptlrpc_req_finished(req);
}
}
dt_update = oth->ot_dur;
- if (dt_update == NULL)
+ if (dt_update == NULL || th->th_result != 0) {
+ rc = th->th_result;
GOTO(out, rc);
+ }
LASSERT(dt_update != LP_POISON);
/* If there are no updates, destroy dt_update and thandle */
if (dt_update->dur_buf.ub_req == NULL ||
- dt_update->dur_buf.ub_req->ourq_count == 0) {
- dt_update_request_destroy(dt_update);
+ dt_update->dur_buf.ub_req->ourq_count == 0)
GOTO(out, rc);
- }
- if (is_only_remote_trans(th) && !th->th_sync) {
+ if (is_only_remote_trans(th) && !th->th_sync &&
+ !th->th_wait_submit) {
struct osp_device *osp = dt2osp_dev(th->th_dev);
struct client_obd *cli = &osp->opd_obd->u.cli;
out:
/* If RPC is triggered successfully, dt_update will be freed in
* osp_update_interpreter() */
- if (rc != 0 && dt_update != NULL && sent == 0) {
+ if (sent == 0) {
struct osp_update_callback *ouc;
struct osp_update_callback *next;
- list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items,
- ouc_list) {
- list_del_init(&ouc->ouc_list);
- if (ouc->ouc_interpreter != NULL)
- ouc->ouc_interpreter(env, NULL, NULL,
- ouc->ouc_obj,
- ouc->ouc_data, 0, rc);
- osp_update_callback_fini(env, ouc);
+ if (dt_update != NULL) {
+ list_for_each_entry_safe(ouc, next,
+ &dt_update->dur_cb_items,
+ ouc_list) {
+ list_del_init(&ouc->ouc_list);
+ if (ouc->ouc_interpreter != NULL)
+ ouc->ouc_interpreter(env, NULL, NULL,
+ ouc->ouc_obj,
+ ouc->ouc_data, 0,
+ rc);
+ osp_update_callback_fini(env, ouc);
+ }
}
-
+ osp_trans_commit_cb(oth, rc);
dt_update_request_destroy(dt_update);
+ oth->ot_dur = NULL;
}
- OBD_FREE_PTR(oth);
+ osp_thandle_put(oth);
RETURN(rc);
}