Whamcloud - gitweb
LU-6376 osp: add RPC lock during RPC send 98/14098/2
authorwang di <di.wang@intel.com>
Wed, 11 Mar 2015 04:33:51 +0000 (21:33 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Sat, 28 Mar 2015 02:44:57 +0000 (02:44 +0000)
Add RPC lock in osp_remote_sync(), so only one modified
UPDATE RPC is allowed each time as other normal MDC client.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I6c766625adc355d5c8827bffb78a1820efc46fd6
Reviewed-on: http://review.whamcloud.com/14098
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_update.h
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_sync.c
lustre/osp/osp_trans.c
lustre/target/out_lib.c

index b5502cf..c75e4fb 100644 (file)
@@ -174,12 +174,6 @@ void dt_update_request_destroy(struct dt_update_request *update);
 struct dt_update_request *dt_update_request_create(struct dt_device *dt);
 struct dt_update_request *dt_update_request_find_or_create(struct thandle *th,
                                                          struct dt_object *dt);
 struct dt_update_request *dt_update_request_create(struct dt_device *dt);
 struct dt_update_request *dt_update_request_find_or_create(struct thandle *th,
                                                          struct dt_object *dt);
-int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
-                       const struct object_update_request *ureq,
-                       struct ptlrpc_request **reqp);
-int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
-                   struct dt_update_request *update,
-                   struct ptlrpc_request **reqp);
 int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf,
                    enum update_type op, const struct lu_fid *fid,
                    int params_count, __u16 *param_sizes, const void **bufs,
 int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf,
                    enum update_type op, const struct lu_fid *fid,
                    int params_count, __u16 *param_sizes, const void **bufs,
index 249afbb..86a9fac 100644 (file)
@@ -78,7 +78,6 @@
 #include <lustre_ioctl.h>
 #include <lustre_param.h>
 #include <lustre_log.h>
 #include <lustre_ioctl.h>
 #include <lustre_param.h>
 #include <lustre_log.h>
-#include <lustre_mdc.h>
 
 #include "osp_internal.h"
 
 
 #include "osp_internal.h"
 
index b8beb98..7c450de 100644 (file)
@@ -48,6 +48,7 @@
 #include <lustre_fid.h>
 #include <lustre_update.h>
 #include <lu_target.h>
 #include <lustre_fid.h>
 #include <lustre_update.h>
 #include <lu_target.h>
+#include <lustre_mdc.h>
 
 /*
  * Infrastructure to support tracking of last committed llog record
 
 /*
  * Infrastructure to support tracking of last committed llog record
@@ -411,8 +412,20 @@ static inline struct seq_server_site *osp_seq_site(struct osp_device *osp)
 }
 
 #define osp_init_rpc_lock(lck) mdc_init_rpc_lock(lck)
 }
 
 #define osp_init_rpc_lock(lck) mdc_init_rpc_lock(lck)
-#define osp_get_rpc_lock(lck, it)  mdc_get_rpc_lock(lck, it)
-#define osp_put_rpc_lock(lck, it) mdc_put_rpc_lock(lck, it)
+
+static inline void osp_get_rpc_lock(struct osp_device *osp)
+{
+       struct mdc_rpc_lock *rpc_lock = osp->opd_obd->u.cli.cl_rpc_lock;
+
+       mdc_get_rpc_lock(rpc_lock, NULL);
+}
+
+static inline void osp_put_rpc_lock(struct osp_device *osp)
+{
+       struct mdc_rpc_lock *rpc_lock = osp->opd_obd->u.cli.cl_rpc_lock;
+
+       mdc_put_rpc_lock(rpc_lock, NULL);
+}
 
 static inline int osp_fid_diff(const struct lu_fid *fid1,
                               const struct lu_fid *fid2)
 
 static inline int osp_fid_diff(const struct lu_fid *fid1,
                               const struct lu_fid *fid2)
@@ -519,7 +532,12 @@ struct thandle *osp_trans_create(const struct lu_env *env,
                                 struct dt_device *d);
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th);
                                 struct dt_device *d);
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th);
-
+int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+                       const struct object_update_request *ureq,
+                       struct ptlrpc_request **reqp);
+int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
+                   struct dt_update_request *update,
+                   struct ptlrpc_request **reqp, bool rpc_lock);
 /* osp_object.c */
 int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                 struct lu_attr *attr, struct lustre_capa *capa);
 /* osp_object.c */
 int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                 struct lu_attr *attr, struct lustre_capa *capa);
index 2f50b6a..a60417c 100644 (file)
@@ -659,7 +659,7 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
                GOTO(out, rc);
        }
 
                GOTO(out, rc);
        }
 
-       rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
+       rc = osp_remote_sync(env, osp, update, &req, false);
        if (rc < 0)
                GOTO(out, rc);
 
        if (rc < 0)
                GOTO(out, rc);
 
index be29d0c..185e36d 100644 (file)
@@ -578,7 +578,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                GOTO(out, rc);
        }
 
                GOTO(out, rc);
        }
 
-       rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
+       rc = osp_remote_sync(env, osp, update, &req, false);
        if (rc != 0) {
                if (rc == -ENOENT) {
                        osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
        if (rc != 0) {
                if (rc == -ENOENT) {
                        osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
@@ -1005,7 +1005,7 @@ unlock:
                GOTO(out, rc);
        }
 
                GOTO(out, rc);
        }
 
-       rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
+       rc = osp_remote_sync(env, osp, update, &req, false);
        if (rc != 0) {
                if (rc == -ENOENT) {
                        dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
        if (rc != 0) {
                if (rc == -ENOENT) {
                        dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
index d0b6d7c..a480daa 100644 (file)
@@ -768,7 +768,7 @@ static int osp_prep_unlink_update_req(const struct lu_env *env,
        if (rc != 0)
                GOTO(out, rc);
 
        if (rc != 0)
                GOTO(out, rc);
 
-       rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
+       rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
                                 update->dur_buf.ub_req, &req);
        if (rc != 0)
                GOTO(out, rc);
                                 update->dur_buf.ub_req, &req);
        if (rc != 0)
                GOTO(out, rc);
index 449a474..d00dbe7 100644 (file)
@@ -224,7 +224,7 @@ int osp_unplug_async_request(const struct lu_env *env,
        struct ptlrpc_request           *req = NULL;
        int                              rc;
 
        struct ptlrpc_request           *req = NULL;
        int                              rc;
 
-       rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
+       rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
                                 update->dur_buf.ub_req, &req);
        if (rc != 0) {
                struct osp_async_request *oar;
                                 update->dur_buf.ub_req, &req);
        if (rc != 0) {
                struct osp_async_request *oar;
@@ -419,6 +419,109 @@ out:
 }
 
 /**
 }
 
 /**
+ * Prepare update request.
+ *
+ * Prepare OUT update ptlrpc request, and the request usually includes
+ * all of updates (stored in \param ureq) from one operation.
+ *
+ * \param[in] env      execution environment
+ * \param[in] imp      import on which ptlrpc request will be sent
+ * \param[in] ureq     hold all of updates which will be packed into the req
+ * \param[in] reqp     request to be created
+ *
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
+ */
+int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+                       const struct object_update_request *ureq,
+                       struct ptlrpc_request **reqp)
+{
+       struct ptlrpc_request           *req;
+       struct object_update_request    *tmp;
+       int                             ureq_len;
+       int                             rc;
+       ENTRY;
+
+       req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       ureq_len = object_update_request_size(ureq);
+       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
+                            ureq_len);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
+       if (rc != 0) {
+               ptlrpc_req_finished(req);
+               RETURN(rc);
+       }
+
+       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
+                            RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+
+       tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
+       memcpy(tmp, ureq, ureq_len);
+
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OUT_PORTAL;
+       req->rq_reply_portal = OSC_REPLY_PORTAL;
+       *reqp = req;
+
+       RETURN(rc);
+}
+
+/**
+ * Send update RPC.
+ *
+ * Send update request to the remote MDT synchronously.
+ *
+ * \param[in] env      execution environment
+ * \param[in] imp      import on which ptlrpc request will be sent
+ * \param[in] dt_update        hold all of updates which will be packed into the req
+ * \param[in] reqp     request to be created
+ *
+ * \retval             0 if RPC succeeds.
+ * \retval             negative errno if RPC fails.
+ */
+int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
+                   struct dt_update_request *dt_update,
+                   struct ptlrpc_request **reqp, bool rpc_lock)
+{
+       struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
+       struct ptlrpc_request   *req = NULL;
+       int                     rc;
+       ENTRY;
+
+       rc = osp_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* Note: some dt index api might return non-zero result here, like
+        * osd_index_ea_lookup, so we should only check rc < 0 here */
+       if (rpc_lock)
+               osp_get_rpc_lock(osp);
+       rc = ptlrpc_queue_wait(req);
+       if (rpc_lock)
+               osp_put_rpc_lock(osp);
+       if (rc < 0) {
+               ptlrpc_req_finished(req);
+               dt_update->dur_rc = rc;
+               RETURN(rc);
+       }
+
+       if (reqp != NULL) {
+               *reqp = req;
+               RETURN(rc);
+       }
+
+       dt_update->dur_rc = rc;
+
+       ptlrpc_req_finished(req);
+
+       RETURN(rc);
+}
+
+/**
  * Trigger the request for remote updates.
  *
  * If the transaction is not a remote one or it is required to be sync mode
  * Trigger the request for remote updates.
  *
  * If the transaction is not a remote one or it is required to be sync mode
@@ -451,14 +554,13 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
 
                list_del_init(&dt_update->dur_list);
                if (th->th_sync) {
 
                list_del_init(&dt_update->dur_list);
                if (th->th_sync) {
-                       rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
-                                            dt_update, NULL);
+                       rc = osp_remote_sync(env, osp, dt_update, NULL, true);
                        dt_update_request_destroy(dt_update);
 
                        return rc;
                }
 
                        dt_update_request_destroy(dt_update);
 
                        return rc;
                }
 
-               rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
+               rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
                                         dt_update->dur_buf.ub_req, &req);
                if (rc == 0) {
                        down_read(&osp->opd_async_updates_rwsem);
                                         dt_update->dur_buf.ub_req, &req);
                if (rc == 0) {
                        down_read(&osp->opd_async_updates_rwsem);
@@ -480,8 +582,7 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
                }
        } else {
                th->th_sync = 1;
                }
        } else {
                th->th_sync = 1;
-               rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
-                                    dt_update, NULL);
+               rc = osp_remote_sync(env, osp, dt_update, NULL, true);
        }
 
        return rc;
        }
 
        return rc;
index a351aee..dce9da9 100644 (file)
@@ -176,106 +176,6 @@ dt_update_request_find_or_create(struct thandle *th, struct dt_object *dt)
 EXPORT_SYMBOL(dt_update_request_find_or_create);
 
 /**
 EXPORT_SYMBOL(dt_update_request_find_or_create);
 
 /**
- * Prepare update request.
- *
- * Prepare OUT update ptlrpc request, and the request usually includes
- * all of updates (stored in \param ureq) from one operation.
- *
- * \param[in] env      execution environment
- * \param[in] imp      import on which ptlrpc request will be sent
- * \param[in] ureq     hold all of updates which will be packed into the req
- * \param[in] reqp     request to be created
- *
- * \retval             0 if preparation succeeds.
- * \retval             negative errno if preparation fails.
- */
-int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
-                       const struct object_update_request *ureq,
-                       struct ptlrpc_request **reqp)
-{
-       struct ptlrpc_request           *req;
-       struct object_update_request    *tmp;
-       int                             ureq_len;
-       int                             rc;
-       ENTRY;
-
-       req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
-       if (req == NULL)
-               RETURN(-ENOMEM);
-
-       ureq_len = object_update_request_size(ureq);
-       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
-                            ureq_len);
-
-       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
-       if (rc != 0) {
-               ptlrpc_req_finished(req);
-               RETURN(rc);
-       }
-
-       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
-                            RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
-
-       tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
-       memcpy(tmp, ureq, ureq_len);
-
-       ptlrpc_request_set_replen(req);
-       req->rq_request_portal = OUT_PORTAL;
-       req->rq_reply_portal = OSC_REPLY_PORTAL;
-       *reqp = req;
-
-       RETURN(rc);
-}
-EXPORT_SYMBOL(out_prep_update_req);
-
-/**
- * Send update RPC.
- *
- * Send update request to the remote MDT synchronously.
- *
- * \param[in] env      execution environment
- * \param[in] imp      import on which ptlrpc request will be sent
- * \param[in] dt_update        hold all of updates which will be packed into the req
- * \param[in] reqp     request to be created
- *
- * \retval             0 if RPC succeeds.
- * \retval             negative errno if RPC fails.
- */
-int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
-                   struct dt_update_request *dt_update,
-                   struct ptlrpc_request **reqp)
-{
-       struct ptlrpc_request   *req = NULL;
-       int                     rc;
-       ENTRY;
-
-       rc = out_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req);
-       if (rc != 0)
-               RETURN(rc);
-
-       /* Note: some dt index api might return non-zero result here, like
-        * osd_index_ea_lookup, so we should only check rc < 0 here */
-       rc = ptlrpc_queue_wait(req);
-       if (rc < 0) {
-               ptlrpc_req_finished(req);
-               dt_update->dur_rc = rc;
-               RETURN(rc);
-       }
-
-       if (reqp != NULL) {
-               *reqp = req;
-               RETURN(rc);
-       }
-
-       dt_update->dur_rc = rc;
-
-       ptlrpc_req_finished(req);
-
-       RETURN(rc);
-}
-EXPORT_SYMBOL(out_remote_sync);
-
-/**
  * resize update buffer
  *
  * Extend the update buffer by new_size.
  * resize update buffer
  *
  * Extend the update buffer by new_size.