Whamcloud - gitweb
LU-3536 lod: Separate thandle to different layers.
[fs/lustre-release.git] / lustre / osp / osp_trans.c
index 39a42bc..3daed73 100644 (file)
@@ -63,6 +63,8 @@
 
 struct osp_async_update_args {
        struct dt_update_request *oaua_update;
+       atomic_t                 *oaua_count;
+       wait_queue_head_t        *oaua_waitq;
        bool                      oaua_flow_control;
 };
 
@@ -195,7 +197,10 @@ static int osp_async_update_interpret(const struct lu_env *env,
                index++;
        }
 
-       out_destroy_update_req(dt_update);
+       if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
+               wake_up_all(oaua->oaua_waitq);
+
+       dt_update_request_destroy(dt_update);
 
        return 0;
 }
@@ -219,8 +224,8 @@ int osp_unplug_async_request(const struct lu_env *env,
        struct ptlrpc_request           *req = NULL;
        int                              rc;
 
-       rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
-                                update->dur_req, &req);
+       rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
+                                update->dur_buf.ub_req, &req);
        if (rc != 0) {
                struct osp_async_request *oar;
                struct osp_async_request *next;
@@ -232,12 +237,13 @@ int osp_unplug_async_request(const struct lu_env *env,
                                               oar->oar_data, 0, rc);
                        osp_async_request_fini(env, oar);
                }
-               out_destroy_update_req(update);
+               dt_update_request_destroy(update);
        } else {
-               LASSERT(list_empty(&update->dur_list));
-
                args = ptlrpc_req_async_args(req);
                args->oaua_update = update;
+               args->oaua_count = NULL;
+               args->oaua_waitq = NULL;
+               args->oaua_flow_control = false;
                req->rq_interpret_reply = osp_async_update_interpret;
                ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
        }
@@ -265,7 +271,7 @@ osp_find_or_create_async_update_request(struct osp_device *osp)
        if (update != NULL)
                return update;
 
-       update = out_create_update_req(&osp->opd_dt_dev);
+       update = dt_update_request_create(&osp->opd_dt_dev);
        if (!IS_ERR(update))
                osp->opd_async_requests = update;
 
@@ -287,8 +293,8 @@ osp_find_or_create_async_update_request(struct osp_device *osp)
  * \param[in] env              pointer to the thread context
  * \param[in] op               operation type, see 'enum update_type'
  * \param[in] obj              pointer to the operation target
- * \param[in] count            array size of the subsequent @lens and @bufs
- * \param[in] lens             buffer length array for the subsequent @bufs
+ * \param[in] count            array size of the subsequent \a lens and \a bufs
+ * \param[in] lens             buffer length array for the subsequent \a bufs
  * \param[in] bufs             the buffers to compose the request
  * \param[in] data             pointer to the data used by the interpreter
  * \param[in] interpreter      pointer to the interpreter function
@@ -296,9 +302,9 @@ osp_find_or_create_async_update_request(struct osp_device *osp)
  * \retval                     0 for success
  * \retval                     negative error number on failure
  */
-int osp_insert_async_request(const struct lu_env *env,
-                            int op, struct osp_object *obj, int count,
-                            int *lens, const char **bufs, void *data,
+int osp_insert_async_request(const struct lu_env *env, enum update_type op,
+                            struct osp_object *obj, int count,
+                            __u16 *lens, const void **bufs, void *data,
                             osp_async_request_interpreter_t interpreter)
 {
        struct osp_async_request     *oar;
@@ -316,9 +322,10 @@ int osp_insert_async_request(const struct lu_env *env,
                GOTO(out, rc = PTR_ERR(update));
 
 again:
-       rc = out_insert_update(env, update, op, lu_object_fid(osp2lu_obj(obj)),
-                              count, lens, bufs);
        /* The queue is full. */
+       rc = out_update_pack(env, &update->dur_buf, op,
+                            lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
+                            0);
        if (rc == -E2BIG) {
                osp->opd_async_requests = NULL;
                mutex_unlock(&osp->opd_async_requests_mutex);
@@ -376,45 +383,140 @@ out:
  */
 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
 {
-       struct thandle          *th = NULL;
-       struct thandle_update   *tu = NULL;
-       int                      rc = 0;
+       struct osp_thandle              *oth;
+       struct thandle                  *th = NULL;
+       struct dt_update_request        *update;
+       ENTRY;
 
-       OBD_ALLOC_PTR(th);
-       if (unlikely(th == NULL))
-               GOTO(out, rc = -ENOMEM);
+       OBD_ALLOC_PTR(oth);
+       if (unlikely(oth == NULL))
+               RETURN(ERR_PTR(-ENOMEM));
 
+       th = &oth->ot_super;
        th->th_dev = d;
        th->th_tags = LCT_TX_HANDLE;
-       atomic_set(&th->th_refc, 1);
-       th->th_alloc_size = sizeof(*th);
 
-       OBD_ALLOC_PTR(tu);
-       if (tu == NULL)
-               GOTO(out, rc = -ENOMEM);
+       update = dt_update_request_create(d);
+       if (IS_ERR(update)) {
+               OBD_FREE_PTR(oth);
+               RETURN(ERR_CAST(update));
+       }
 
-       INIT_LIST_HEAD(&tu->tu_remote_update_list);
-       tu->tu_only_remote_trans = 1;
-       th->th_update = tu;
+       oth->ot_dur = update;
+       oth->ot_send_updates_after_local_trans = false;
 
-out:
+       RETURN(th);
+}
+
+/**
+ * Prepare update request.
+ *
+ * Prepare OUT update ptlrpc request, and the request usually includes
+ * all of updates (stored in \param ureq) from one operation.
+ *
+ * \param[in] env      execution environment
+ * \param[in] imp      import on which ptlrpc request will be sent
+ * \param[in] ureq     hold all of updates which will be packed into the req
+ * \param[in] reqp     request to be created
+ *
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
+ */
+int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+                       const struct object_update_request *ureq,
+                       struct ptlrpc_request **reqp)
+{
+       struct ptlrpc_request           *req;
+       struct object_update_request    *tmp;
+       int                             ureq_len;
+       int                             rc;
+       ENTRY;
+
+       req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       ureq_len = object_update_request_size(ureq);
+       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
+                            ureq_len);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
        if (rc != 0) {
-               if (tu != NULL)
-                       OBD_FREE_PTR(tu);
-               if (th != NULL)
-                       OBD_FREE_PTR(th);
-               th = ERR_PTR(rc);
+               ptlrpc_req_finished(req);
+               RETURN(rc);
        }
 
-       return th;
+       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
+                            RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+
+       tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
+       memcpy(tmp, ureq, ureq_len);
+
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OUT_PORTAL;
+       req->rq_reply_portal = OSC_REPLY_PORTAL;
+       *reqp = req;
+
+       RETURN(rc);
+}
+
+/**
+ * Send update RPC.
+ *
+ * Send update request to the remote MDT synchronously.
+ *
+ * \param[in] env      execution environment
+ * \param[in] imp      import on which ptlrpc request will be sent
+ * \param[in] dt_update        hold all of updates which will be packed into the req
+ * \param[in] reqp     request to be created
+ *
+ * \retval             0 if RPC succeeds.
+ * \retval             negative errno if RPC fails.
+ */
+int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
+                   struct dt_update_request *dt_update,
+                   struct ptlrpc_request **reqp, bool rpc_lock)
+{
+       struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
+       struct ptlrpc_request   *req = NULL;
+       int                     rc;
+       ENTRY;
+
+       rc = osp_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* Note: some dt index api might return non-zero result here, like
+        * osd_index_ea_lookup, so we should only check rc < 0 here */
+       if (rpc_lock)
+               osp_get_rpc_lock(osp);
+       rc = ptlrpc_queue_wait(req);
+       if (rpc_lock)
+               osp_put_rpc_lock(osp);
+       if (rc < 0) {
+               ptlrpc_req_finished(req);
+               dt_update->dur_rc = rc;
+               RETURN(rc);
+       }
+
+       if (reqp != NULL) {
+               *reqp = req;
+               RETURN(rc);
+       }
+
+       dt_update->dur_rc = rc;
+
+       ptlrpc_req_finished(req);
+
+       RETURN(rc);
 }
 
 /**
  * Trigger the request for remote updates.
  *
- * If the transaction is a remote transaction, then related remote updates
- * will be sent asynchronously; otherwise, the cross MDTs transaction will
- * be synchronized.
+ * If the transaction is not a remote one or it is required to be sync mode
+ * (th->th_sync is set), then it will be sent synchronously; otherwise, the
+ * RPC will be sent asynchronously.
  *
  * Please refer to osp_trans_create() for transaction type.
  *
@@ -431,38 +533,87 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
                             struct dt_update_request *dt_update,
                             struct thandle *th, bool flow_control)
 {
-       struct thandle_update   *tu = th->th_update;
-       int                      rc = 0;
+       int     rc = 0;
 
-       LASSERT(tu != NULL);
-
-       if (is_only_remote_trans(th)) {
+       if (is_only_remote_trans(th) && !th->th_sync) {
                struct osp_async_update_args    *args;
                struct ptlrpc_request           *req;
 
-               list_del_init(&dt_update->dur_list);
-               rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
-                                        dt_update->dur_req, &req);
-               if (rc == 0) {
-                       args = ptlrpc_req_async_args(req);
-                       args->oaua_update = dt_update;
-                       args->oaua_flow_control = flow_control;
-                       req->rq_interpret_reply =
-                               osp_async_update_interpret;
-                       ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
-               } else {
-                       out_destroy_update_req(dt_update);
-               }
+               rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
+                                        dt_update->dur_buf.ub_req, &req);
+               if (rc != 0)
+                       return rc;
+               down_read(&osp->opd_async_updates_rwsem);
+
+               args = ptlrpc_req_async_args(req);
+               args->oaua_update = dt_update;
+               args->oaua_count = &osp->opd_async_updates_count;
+               args->oaua_waitq = &osp->opd_syn_barrier_waitq;
+               args->oaua_flow_control = flow_control;
+               req->rq_interpret_reply =
+                       osp_async_update_interpret;
+
+               atomic_inc(args->oaua_count);
+               up_read(&osp->opd_async_updates_rwsem);
+
+               ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
        } else {
-               th->th_sync = 1;
-               rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
-                                    dt_update, NULL);
+               rc = osp_remote_sync(env, osp, dt_update, NULL, true);
        }
 
        return rc;
 }
 
 /**
+ * Get local thandle for osp_thandle
+ *
+ * Get the local OSD thandle from the OSP thandle. Currently, there
+ * are a few OSP API (osp_object_create() and osp_sync_add()) needs
+ * to update the object on local OSD device.
+ *
+ * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then
+ * we will get local thandle by thandle_get_sub_by_dt.
+ *
+ * If the osp_thandle is remote thandle (th_top == NULL, only used
+ * by LFSCK), then it will create a local thandle, and stop it in
+ * osp_trans_stop(). And this only happens on OSP for OST.
+ *
+ * These are temporary solution, once OSP accessing OSD object is
+ * being fixed properly, this function should be removed. XXX
+ *
+ * \param[in] env              pointer to the thread context
+ * \param[in] th               pointer to the transaction handler
+ * \param[in] dt               pointer to the OSP device
+ *
+ * \retval                     pointer to the local thandle
+ * \retval                     ERR_PTR(errno) if it fails.
+ **/
+struct thandle *osp_get_storage_thandle(const struct lu_env *env,
+                                       struct thandle *th,
+                                       struct osp_device *osp)
+{
+       struct osp_thandle      *oth;
+       struct thandle          *local_th;
+
+       if (th->th_top != NULL)
+               return thandle_get_sub_by_dt(env, th->th_top,
+                                            osp->opd_storage);
+
+       LASSERT(!osp->opd_connect_mdt);
+       oth = thandle_to_osp_thandle(th);
+       if (oth->ot_storage_th != NULL)
+               return oth->ot_storage_th;
+
+       local_th = dt_trans_create(env, osp->opd_storage);
+       if (IS_ERR(local_th))
+               return local_th;
+
+       oth->ot_storage_th = local_th;
+
+       return local_th;
+}
+
+/**
  * The OSP layer dt_device_operations::dt_trans_start() interface
  * to start the transaction.
  *
@@ -492,22 +643,42 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th)
 {
-       struct thandle_update           *tu = th->th_update;
-       struct dt_update_request        *dt_update;
-       int                              rc = 0;
-
-       if (tu == NULL)
-               return rc;
-
-       /* Check whether there are updates related with this OSP */
-       dt_update = out_find_update(tu, dt);
-       if (dt_update == NULL)
-               return rc;
+       struct osp_thandle       *oth = thandle_to_osp_thandle(th);
+       struct dt_update_request *dt_update;
+       int                      rc = 0;
 
-       if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
+       dt_update = oth->ot_dur;
+       LASSERT(dt_update != NULL);
+
+       /* return if there are no updates,  */
+       if (dt_update->dur_buf.ub_req == NULL ||
+           dt_update->dur_buf.ub_req->ourq_count == 0)
+               GOTO(out, rc = 0);
+
+       /* Note: some updates needs to send before local transaction,
+        * some needs to send after local transaction.
+        *
+        * If the transaction only includes remote updates, it will
+        * send updates to remote MDT in osp_trans_stop.
+        *
+        * If it is remote create, it will send the remote req after
+        * local transaction. i.e. create the object locally first,
+        * then insert the name entry.
+        *
+        * If it is remote unlink, it will send the remote req before
+        * the local transaction, i.e. delete the name entry remote
+        * first, then destroy the local object. */
+       if (!is_only_remote_trans(th) &&
+           !oth->ot_send_updates_after_local_trans)
                rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th,
                                       false);
 
+out:
+       /* For remote thandle, if there are local thandle, start it here*/
+       if (th->th_top == NULL && oth->ot_storage_th != NULL)
+               rc = dt_trans_start(env, oth->ot_storage_th->th_dev,
+                                   oth->ot_storage_th);
+
        return rc;
 }
 
@@ -534,61 +705,65 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                   struct thandle *th)
 {
-       struct thandle_update           *tu = th->th_update;
-       struct dt_update_request        *dt_update;
-       int                              rc = 0;
 
-       LASSERT(tu != NULL);
-       LASSERT(tu != LP_POISON);
+       struct osp_thandle       *oth = thandle_to_osp_thandle(th);
+       struct dt_update_request *dt_update;
+       int                      rc = 0;
+       bool                     keep_dt_update = false;
+       ENTRY;
 
-       /* Check whether there are updates related with this OSP */
-       dt_update = out_find_update(tu, dt);
-       if (dt_update == NULL) {
-               if (!is_only_remote_trans(th))
-                       return rc;
-               goto put;
-       }
+       dt_update = oth->ot_dur;
+       LASSERT(dt_update != NULL);
+       LASSERT(dt_update != LP_POISON);
 
-       if (dt_update->dur_req->ourq_count == 0) {
-               out_destroy_update_req(dt_update);
-               goto put;
+       /* For remote transaction, if there is local storage thandle,
+        * stop it first */
+       if (oth->ot_storage_th != NULL && th->th_top == NULL) {
+               dt_trans_stop(env, oth->ot_storage_th->th_dev,
+                             oth->ot_storage_th);
+               oth->ot_storage_th = NULL;
        }
+       /* If there are no updates, destroy dt_update and thandle */
+       if (dt_update->dur_buf.ub_req == NULL ||
+           dt_update->dur_buf.ub_req->ourq_count == 0)
+               GOTO(out, rc);
 
-       if (is_only_remote_trans(th)) {
-               if (th->th_result == 0) {
-                       struct osp_device *osp = dt2osp_dev(th->th_dev);
-                       struct client_obd *cli = &osp->opd_obd->u.cli;
-
-                       rc = obd_get_request_slot(cli);
-                       if (!osp->opd_imp_active || osp->opd_got_disconnected) {
-                               if (rc == 0)
-                                       obd_put_request_slot(cli);
-
-                               rc = -ENOTCONN;
-                       }
+       if (is_only_remote_trans(th) && !th->th_sync) {
+               struct osp_device *osp = dt2osp_dev(th->th_dev);
+               struct client_obd *cli = &osp->opd_obd->u.cli;
 
-                       if (rc != 0) {
-                               out_destroy_update_req(dt_update);
-                               goto put;
-                       }
+               if (th->th_result != 0) {
+                       rc = th->th_result;
+                       GOTO(out, rc);
+               }
 
-                       rc = osp_trans_trigger(env, dt2osp_dev(dt),
-                                              dt_update, th, true);
-                       if (rc != 0)
+               rc = obd_get_request_slot(cli);
+               if (!osp->opd_imp_active || !osp->opd_imp_connected) {
+                       if (rc == 0)
                                obd_put_request_slot(cli);
-               } else {
-                       rc = th->th_result;
-                       out_destroy_update_req(dt_update);
+                       rc = -ENOTCONN;
                }
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rc = osp_trans_trigger(env, dt2osp_dev(dt),
+                                      dt_update, th, true);
+               if (rc != 0)
+                       obd_put_request_slot(cli);
+               else
+                       keep_dt_update = true;
        } else {
-               if (tu->tu_sent_after_local_trans)
-                       rc = osp_trans_trigger(env, dt2osp_dev(dt),
-                                              dt_update, th, false);
+               if (oth->ot_send_updates_after_local_trans ||
+                  (is_only_remote_trans(th) && th->th_sync))
+                       rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update,
+                                              th, false);
                rc = dt_update->dur_rc;
-               out_destroy_update_req(dt_update);
        }
 
-put:
-       thandle_put(th);
-       return rc;
+out:
+       if (!keep_dt_update)
+               dt_update_request_destroy(dt_update);
+       OBD_FREE_PTR(oth);
+
+       RETURN(rc);
 }