+/**
+ * Prepare update request.
+ *
+ * Prepare OUT update ptlrpc request, and the request usually includes
+ * all of updates (stored in \param ureq) from one operation.
+ *
+ * \param[in] env execution environment
+ * \param[in] imp import on which ptlrpc request will be sent
+ * \param[in] ureq hold all of updates which will be packed into the req
+ * \param[in] reqp request to be created
+ *
+ * \retval 0 if preparation succeeds.
+ * \retval negative errno if preparation fails.
+ */
+int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+ struct osp_update_request *our,
+ struct ptlrpc_request **reqp)
+{
+ struct ptlrpc_request *req;
+ struct ptlrpc_bulk_desc *desc;
+ struct osp_update_request_sub *ours;
+ struct out_update_header *ouh;
+ struct out_update_buffer *oub;
+ __u32 buf_count = 0;
+ int rc;
+ ENTRY;
+
+ list_for_each_entry(ours, &our->our_req_list, ours_list) {
+ object_update_request_dump(ours->ours_req, D_INFO);
+ buf_count++;
+ }
+
+ req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_BUF, RCL_CLIENT,
+ buf_count * sizeof(*oub));
+
+ rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
+ if (rc != 0)
+ GOTO(out_req, rc);
+
+ ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
+ ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
+ ouh->ouh_count = buf_count;
+
+ oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF);
+ list_for_each_entry(ours, &our->our_req_list, ours_list) {
+ oub->oub_size = ours->ours_req_size;
+ oub++;
+ }
+
+ req->rq_bulk_write = 1;
+ desc = ptlrpc_prep_bulk_imp(req, buf_count,
+ MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
+ PTLRPC_BULK_GET_SOURCE | PTLRPC_BULK_BUF_KVEC,
+ MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops);
+ if (desc == NULL)
+ GOTO(out_req, rc = -ENOMEM);
+
+ /* NB req now owns desc and will free it when it gets freed */
+ list_for_each_entry(ours, &our->our_req_list, ours_list)
+ desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req,
+ ours->ours_req_size);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
+ RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+
+ ptlrpc_request_set_replen(req);
+ req->rq_request_portal = OUT_PORTAL;
+ req->rq_reply_portal = OSC_REPLY_PORTAL;
+ *reqp = req;
+
+out_req:
+ if (rc < 0)
+ ptlrpc_req_finished(req);
+
+ RETURN(rc);
+}
+
+/**
+ * Send update RPC.
+ *
+ * Send update request to the remote MDT synchronously.
+ *
+ * \param[in] env execution environment
+ * \param[in] imp import on which ptlrpc request will be sent
+ * \param[in] our hold all of updates which will be packed into the req
+ * \param[in] reqp request to be created
+ *
+ * \retval 0 if RPC succeeds.
+ * \retval negative errno if RPC fails.
+ */
+
+int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
+ struct osp_update_request *our,
+ struct ptlrpc_request **reqp)
+{
+ struct obd_import *imp = osp->opd_obd->u.cli.cl_import;
+ struct ptlrpc_request *req = NULL;
+ int rc;
+ ENTRY;
+
+ rc = osp_prep_update_req(env, imp, our, &req);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* This will only be called with read-only update, and these updates
+ * might be used to retrieve update log during recovery process, so
+ * it will be allowed to send during recovery process */
+ req->rq_allow_replay = 1;
+
+ /* Note: some dt index api might return non-zero result here, like
+ * osd_index_ea_lookup, so we should only check rc < 0 here */
+ rc = ptlrpc_queue_wait(req);
+ our->our_rc = rc;
+ if (rc < 0 || reqp == NULL)
+ ptlrpc_req_finished(req);
+ else
+ *reqp = req;
+
+ RETURN(rc);
+}
+