Whamcloud - gitweb
LU-6376 osp: add RPC lock during RPC send
[fs/lustre-release.git] / lustre / osp / osp_trans.c
index 449a474..d00dbe7 100644 (file)
@@ -224,7 +224,7 @@ int osp_unplug_async_request(const struct lu_env *env,
        struct ptlrpc_request           *req = NULL;
        int                              rc;
 
-       rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
+       rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
                                 update->dur_buf.ub_req, &req);
        if (rc != 0) {
                struct osp_async_request *oar;
@@ -419,6 +419,109 @@ out:
 }
 
 /**
+ * Prepare update request.
+ *
+ * Prepare OUT update ptlrpc request, and the request usually includes
+ * all of updates (stored in \param ureq) from one operation.
+ *
+ * \param[in] env      execution environment
+ * \param[in] imp      import on which ptlrpc request will be sent
+ * \param[in] ureq     hold all of updates which will be packed into the req
+ * \param[in] reqp     request to be created
+ *
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
+ */
+int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+                       const struct object_update_request *ureq,
+                       struct ptlrpc_request **reqp)
+{
+       struct ptlrpc_request           *req;
+       struct object_update_request    *tmp;
+       int                             ureq_len;
+       int                             rc;
+       ENTRY;
+
+       req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       ureq_len = object_update_request_size(ureq);
+       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
+                            ureq_len);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
+       if (rc != 0) {
+               ptlrpc_req_finished(req);
+               RETURN(rc);
+       }
+
+       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
+                            RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+
+       tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
+       memcpy(tmp, ureq, ureq_len);
+
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OUT_PORTAL;
+       req->rq_reply_portal = OSC_REPLY_PORTAL;
+       *reqp = req;
+
+       RETURN(rc);
+}
+
+/**
+ * Send update RPC.
+ *
+ * Send update request to the remote MDT synchronously.
+ *
+ * \param[in] env      execution environment
+ * \param[in] imp      import on which ptlrpc request will be sent
+ * \param[in] dt_update        hold all of updates which will be packed into the req
+ * \param[in] reqp     request to be created
+ *
+ * \retval             0 if RPC succeeds.
+ * \retval             negative errno if RPC fails.
+ */
+int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
+                   struct dt_update_request *dt_update,
+                   struct ptlrpc_request **reqp, bool rpc_lock)
+{
+       struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
+       struct ptlrpc_request   *req = NULL;
+       int                     rc;
+       ENTRY;
+
+       rc = osp_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* Note: some dt index api might return non-zero result here, like
+        * osd_index_ea_lookup, so we should only check rc < 0 here */
+       if (rpc_lock)
+               osp_get_rpc_lock(osp);
+       rc = ptlrpc_queue_wait(req);
+       if (rpc_lock)
+               osp_put_rpc_lock(osp);
+       if (rc < 0) {
+               ptlrpc_req_finished(req);
+               dt_update->dur_rc = rc;
+               RETURN(rc);
+       }
+
+       if (reqp != NULL) {
+               *reqp = req;
+               RETURN(rc);
+       }
+
+       dt_update->dur_rc = rc;
+
+       ptlrpc_req_finished(req);
+
+       RETURN(rc);
+}
+
+/**
  * Trigger the request for remote updates.
  *
  * If the transaction is not a remote one or it is required to be sync mode
@@ -451,14 +554,13 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
 
                list_del_init(&dt_update->dur_list);
                if (th->th_sync) {
-                       rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
-                                            dt_update, NULL);
+                       rc = osp_remote_sync(env, osp, dt_update, NULL, true);
                        dt_update_request_destroy(dt_update);
 
                        return rc;
                }
 
-               rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
+               rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
                                         dt_update->dur_buf.ub_req, &req);
                if (rc == 0) {
                        down_read(&osp->opd_async_updates_rwsem);
@@ -480,8 +582,7 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
                }
        } else {
                th->th_sync = 1;
-               rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
-                                    dt_update, NULL);
+               rc = osp_remote_sync(env, osp, dt_update, NULL, true);
        }
 
        return rc;