#define DEBUG_SUBSYSTEM S_MDS
+#include <lustre_net.h>
#include "osp_internal.h"
/**
return ourq;
}
-static void object_update_request_free(struct object_update_request *ourq,
- size_t ourq_size)
+/**
+ * Allocate new update request
+ *
+ * Allocate new update request and insert it to the req_update_list.
+ *
+ * \param [in] our osp_udate_request where to create a new
+ * update request
+ *
+ * \retval 0 if creation succeeds.
+ * \retval negative errno if creation fails.
+ */
+int osp_object_update_request_create(struct osp_update_request *our,
+ size_t size)
+{
+ struct osp_update_request_sub *ours;
+
+ OBD_ALLOC_PTR(ours);
+ if (ours == NULL)
+ return -ENOMEM;
+
+ if (size < OUT_UPDATE_INIT_BUFFER_SIZE)
+ size = OUT_UPDATE_INIT_BUFFER_SIZE;
+
+ ours->ours_req = object_update_request_alloc(size);
+
+ if (IS_ERR(ours->ours_req)) {
+ OBD_FREE_PTR(ours);
+ return -ENOMEM;
+ }
+
+ ours->ours_req_size = size;
+ INIT_LIST_HEAD(&ours->ours_list);
+ list_add_tail(&ours->ours_list, &our->our_req_list);
+
+ return 0;
+}
+
+/**
+ * Get current update request
+ *
+ * Get current object update request from our_req_list in
+ * osp_update_request, because we always insert the new update
+ * request in the last position, so the last update request
+ * in the list will be the current update req.
+ *
+ * \param[in] our osp update request where to get the
+ * current object update.
+ *
+ * \retval the current object update.
+ **/
+struct osp_update_request_sub *
+osp_current_object_update_request(struct osp_update_request *our)
{
- if (ourq != NULL)
- OBD_FREE_LARGE(ourq, ourq_size);
+ if (list_empty(&our->our_req_list))
+ return NULL;
+
+ return list_entry(our->our_req_list.prev, struct osp_update_request_sub,
+ ours_list);
}
/**
*/
struct osp_update_request *osp_update_request_create(struct dt_device *dt)
{
- struct osp_update_request *osp_update_req;
- struct object_update_request *ourq;
+ struct osp_update_request *our;
- OBD_ALLOC_PTR(osp_update_req);
- if (osp_update_req == NULL)
+ OBD_ALLOC_PTR(our);
+ if (our == NULL)
return ERR_PTR(-ENOMEM);
- ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
- if (IS_ERR(ourq)) {
- OBD_FREE_PTR(osp_update_req);
- return ERR_CAST(ourq);
- }
-
- osp_update_req->our_req = ourq;
- osp_update_req->our_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
+ INIT_LIST_HEAD(&our->our_req_list);
+ INIT_LIST_HEAD(&our->our_cb_items);
+ INIT_LIST_HEAD(&our->our_list);
- INIT_LIST_HEAD(&osp_update_req->our_cb_items);
- INIT_LIST_HEAD(&osp_update_req->our_list);
-
- return osp_update_req;
+ osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE);
+ return our;
}
void osp_update_request_destroy(struct osp_update_request *our)
{
+ struct osp_update_request_sub *ours;
+ struct osp_update_request_sub *tmp;
+
if (our == NULL)
return;
- object_update_request_free(our->our_req,
- our->our_req_size);
+ list_for_each_entry_safe(ours, tmp, &our->our_req_list, ours_list) {
+ list_del(&ours->ours_list);
+ if (ours->ours_req != NULL)
+ OBD_FREE(ours->ours_req, ours->ours_req_size);
+ OBD_FREE_PTR(ours);
+ }
OBD_FREE_PTR(our);
}
ourq->ourq_magic, ourq->ourq_count, total_size);
}
+/**
+ * Prepare update request.
+ *
+ * Prepare OUT update ptlrpc request, and the request usually includes
+ * all of updates (stored in \param ureq) from one operation.
+ *
+ * \param[in] env execution environment
+ * \param[in] imp import on which ptlrpc request will be sent
+ * \param[in] ureq hold all of updates which will be packed into the req
+ * \param[in] reqp request to be created
+ *
+ * \retval 0 if preparation succeeds.
+ * \retval negative errno if preparation fails.
+ */
+int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+ struct osp_update_request *our,
+ struct ptlrpc_request **reqp)
+{
+ struct ptlrpc_request *req;
+ struct ptlrpc_bulk_desc *desc;
+ struct osp_update_request_sub *ours;
+ struct out_update_header *ouh;
+ struct out_update_buffer *oub;
+ __u32 buf_count = 0;
+ int rc;
+ ENTRY;
+
+ list_for_each_entry(ours, &our->our_req_list, ours_list) {
+ object_update_request_dump(ours->ours_req, D_INFO);
+ buf_count++;
+ }
+
+ req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_BUF, RCL_CLIENT,
+ buf_count * sizeof(*oub));
+
+ rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
+ if (rc != 0)
+ GOTO(out_req, rc);
+
+ ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
+ ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
+ ouh->ouh_count = buf_count;
+
+ oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF);
+ list_for_each_entry(ours, &our->our_req_list, ours_list) {
+ oub->oub_size = ours->ours_req_size;
+ oub++;
+ }
+
+ req->rq_bulk_write = 1;
+ desc = ptlrpc_prep_bulk_imp(req, buf_count,
+ MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
+ PTLRPC_BULK_GET_SOURCE | PTLRPC_BULK_BUF_KVEC,
+ MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops);
+ if (desc == NULL)
+ GOTO(out_req, rc = -ENOMEM);
+
+ /* NB req now owns desc and will free it when it gets freed */
+ list_for_each_entry(ours, &our->our_req_list, ours_list)
+ desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req,
+ ours->ours_req_size);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
+ RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+
+ ptlrpc_request_set_replen(req);
+ req->rq_request_portal = OUT_PORTAL;
+ req->rq_reply_portal = OSC_REPLY_PORTAL;
+ *reqp = req;
+
+out_req:
+ if (rc < 0)
+ ptlrpc_req_finished(req);
+
+ RETURN(rc);
+}
+
+/**
+ * Send update RPC.
+ *
+ * Send update request to the remote MDT synchronously.
+ *
+ * \param[in] env execution environment
+ * \param[in] imp import on which ptlrpc request will be sent
+ * \param[in] our hold all of updates which will be packed into the req
+ * \param[in] reqp request to be created
+ *
+ * \retval 0 if RPC succeeds.
+ * \retval negative errno if RPC fails.
+ */
+
+int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
+ struct osp_update_request *our,
+ struct ptlrpc_request **reqp)
+{
+ struct obd_import *imp = osp->opd_obd->u.cli.cl_import;
+ struct ptlrpc_request *req = NULL;
+ int rc;
+ ENTRY;
+
+ rc = osp_prep_update_req(env, imp, our, &req);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* This will only be called with read-only update, and these updates
+ * might be used to retrieve update log during recovery process, so
+ * it will be allowed to send during recovery process */
+ req->rq_allow_replay = 1;
+
+ /* Note: some dt index api might return non-zero result here, like
+ * osd_index_ea_lookup, so we should only check rc < 0 here */
+ rc = ptlrpc_queue_wait(req);
+ our->our_rc = rc;
+ if (rc < 0 || reqp == NULL)
+ ptlrpc_req_finished(req);
+ else
+ *reqp = req;
+
+ RETURN(rc);
+}
+
static void osp_trans_stop_cb(struct osp_thandle *oth, int result)
{
struct dt_txn_commit_cb *dcb;
int rc;
rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
- our->our_req, &req);
+ our, &req);
if (rc != 0) {
struct osp_update_callback *ouc;
struct osp_update_callback *next;
args->oaua_waitq = NULL;
args->oaua_flow_control = false;
req->rq_interpret_reply = osp_update_interpret;
- ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+ ptlrpcd_add_req(req);
}
return rc;
struct object_update *object_update;
size_t max_update_size;
struct object_update_request *ureq;
+ struct osp_update_request_sub *ours;
int rc = 0;
ENTRY;
RETURN(PTR_ERR(our));
again:
- ureq = our->our_req;
- max_update_size = our->our_req_size - object_update_request_size(ureq);
+ ours = osp_current_object_update_request(our);
+
+ ureq = ours->ours_req;
+ max_update_size = ours->ours_req_size -
+ object_update_request_size(ureq);
object_update = update_buffer_get_update(ureq, ureq->ourq_count);
- rc = out_update_pack(env, object_update, max_update_size, op,
+ rc = out_update_pack(env, object_update, &max_update_size, op,
lu_object_fid(osp2lu_obj(obj)), count, lens, bufs);
/* The queue is full. */
if (rc == -E2BIG) {
return PTR_ERR(our);
}
- if (dt2osp_dev(th->th_dev)->opd_connect_mdt)
- our->our_flags = UPDATE_FL_SYNC;
-
oth->ot_our = our;
our->our_th = oth;
+
return 0;
}
}
/**
- * Prepare update request.
- *
- * Prepare OUT update ptlrpc request, and the request usually includes
- * all of updates (stored in \param ureq) from one operation.
- *
- * \param[in] env execution environment
- * \param[in] imp import on which ptlrpc request will be sent
- * \param[in] ureq hold all of updates which will be packed into the req
- * \param[in] reqp request to be created
- *
- * \retval 0 if preparation succeeds.
- * \retval negative errno if preparation fails.
- */
-int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
- const struct object_update_request *ureq,
- struct ptlrpc_request **reqp)
-{
- struct ptlrpc_request *req;
- struct object_update_request *tmp;
- size_t ureq_len;
- int rc;
- ENTRY;
-
- object_update_request_dump(ureq, D_INFO);
- req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- ureq_len = object_update_request_size(ureq);
- req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
- ureq_len);
-
- rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
- if (rc != 0) {
- ptlrpc_req_finished(req);
- RETURN(rc);
- }
-
- req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
- RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
-
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
- memcpy(tmp, ureq, ureq_len);
-
- ptlrpc_request_set_replen(req);
- req->rq_request_portal = OUT_PORTAL;
- req->rq_reply_portal = OSC_REPLY_PORTAL;
- *reqp = req;
-
- RETURN(rc);
-}
-
-/**
- * Send update RPC.
- *
- * Send update request to the remote MDT synchronously.
- *
- * \param[in] env execution environment
- * \param[in] imp import on which ptlrpc request will be sent
- * \param[in] our hold all of updates which will be packed into the req
- * \param[in] reqp request to be created
- *
- * \retval 0 if RPC succeeds.
- * \retval negative errno if RPC fails.
- */
-
-int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
- struct osp_update_request *our,
- struct ptlrpc_request **reqp)
-{
- struct obd_import *imp = osp->opd_obd->u.cli.cl_import;
- struct ptlrpc_request *req = NULL;
- int rc;
- ENTRY;
-
- rc = osp_prep_update_req(env, imp, our->our_req, &req);
- if (rc != 0)
- RETURN(rc);
-
- /* This will only be called with read-only update, and these updates
- * might be used to retrieve update log during recovery process, so
- * it will be allowed to send during recovery process */
- req->rq_allow_replay = 1;
-
- /* Note: some dt index api might return non-zero result here, like
- * osd_index_ea_lookup, so we should only check rc < 0 here */
- rc = ptlrpc_queue_wait(req);
- if (rc < 0) {
- ptlrpc_req_finished(req);
- our->our_rc = rc;
- RETURN(rc);
- }
-
- if (reqp != NULL) {
- *reqp = req;
- RETURN(rc);
- }
-
- our->our_rc = rc;
-
- ptlrpc_req_finished(req);
-
- RETURN(rc);
-}
-
-/**
* Add commit callback to transaction.
*
* Add commit callback to the osp thandle, which will be called
LASSERT(oth != NULL);
LASSERT(our->our_req_sent == 0);
rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
- our->our_req, &req);
+ our, &req);
if (rc != 0) {
osp_trans_callback(env, oth, rc);
RETURN(rc);
atomic_inc(args->oaua_count);
}
- ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+ ptlrpcd_add_req(req);
req = NULL;
} else {
osp_thandle_get(oth); /* hold for commit callback */
if (top_device->ld_obd->obd_recovering)
req->rq_allow_replay = 1;
- osp_get_rpc_lock(osp);
+ if (osp->opd_connect_mdt)
+ osp_get_rpc_lock(osp);
rc = ptlrpc_queue_wait(req);
- osp_put_rpc_lock(osp);
+ if (osp->opd_connect_mdt)
+ osp_put_rpc_lock(osp);
if ((rc == -ENOMEM && req->rq_set == NULL) ||
(req->rq_transno == 0 && !req->rq_committed)) {
if (args->oaua_update != NULL) {
{
struct osp_thandle *oth = thandle_to_osp_thandle(th);
+ if (oth->ot_super.th_sync)
+ oth->ot_our->our_flags |= UPDATE_FL_SYNC;
/* For remote thandle, if there are local thandle, start it here*/
if (is_only_remote_trans(th) && oth->ot_storage_th != NULL)
return dt_trans_start(env, oth->ot_storage_th->th_dev,
oth->ot_storage_th = NULL;
}
- if (our == NULL || our->our_req == NULL ||
- our->our_req->ourq_count == 0) {
+ if (our == NULL || list_empty(&our->our_req_list)) {
osp_trans_callback(env, oth, th->th_result);
GOTO(out, rc = th->th_result);
}
if (!osp->opd_connect_mdt) {
+ osp_trans_callback(env, oth, th->th_result);
rc = osp_send_update_req(env, osp, oth->ot_our);
GOTO(out, rc);
}