Whamcloud - gitweb
LU-3536 lod: Separate thandle to different layers.
[fs/lustre-release.git] / lustre / osp / osp_trans.c
index d00dbe7..3daed73 100644 (file)
@@ -239,8 +239,6 @@ int osp_unplug_async_request(const struct lu_env *env,
                }
                dt_update_request_destroy(update);
        } else {
-               LASSERT(list_empty(&update->dur_list));
-
                args = ptlrpc_req_async_args(req);
                args->oaua_update = update;
                args->oaua_count = NULL;
@@ -385,37 +383,29 @@ out:
  */
 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
 {
-       struct thandle          *th = NULL;
-       struct thandle_update   *tu = NULL;
-       int                      rc = 0;
+       struct osp_thandle              *oth;
+       struct thandle                  *th = NULL;
+       struct dt_update_request        *update;
+       ENTRY;
 
-       OBD_ALLOC_PTR(th);
-       if (unlikely(th == NULL))
-               GOTO(out, rc = -ENOMEM);
+       OBD_ALLOC_PTR(oth);
+       if (unlikely(oth == NULL))
+               RETURN(ERR_PTR(-ENOMEM));
 
+       th = &oth->ot_super;
        th->th_dev = d;
        th->th_tags = LCT_TX_HANDLE;
-       atomic_set(&th->th_refc, 1);
-       th->th_alloc_size = sizeof(*th);
-
-       OBD_ALLOC_PTR(tu);
-       if (tu == NULL)
-               GOTO(out, rc = -ENOMEM);
 
-       INIT_LIST_HEAD(&tu->tu_remote_update_list);
-       tu->tu_only_remote_trans = 1;
-       th->th_update = tu;
-
-out:
-       if (rc != 0) {
-               if (tu != NULL)
-                       OBD_FREE_PTR(tu);
-               if (th != NULL)
-                       OBD_FREE_PTR(th);
-               th = ERR_PTR(rc);
+       update = dt_update_request_create(d);
+       if (IS_ERR(update)) {
+               OBD_FREE_PTR(oth);
+               RETURN(ERR_CAST(update));
        }
 
-       return th;
+       oth->ot_dur = update;
+       oth->ot_send_updates_after_local_trans = false;
+
+       RETURN(th);
 }
 
 /**
@@ -543,45 +533,31 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
                             struct dt_update_request *dt_update,
                             struct thandle *th, bool flow_control)
 {
-       struct thandle_update   *tu = th->th_update;
-       int                      rc = 0;
-
-       LASSERT(tu != NULL);
+       int     rc = 0;
 
-       if (is_only_remote_trans(th)) {
+       if (is_only_remote_trans(th) && !th->th_sync) {
                struct osp_async_update_args    *args;
                struct ptlrpc_request           *req;
 
-               list_del_init(&dt_update->dur_list);
-               if (th->th_sync) {
-                       rc = osp_remote_sync(env, osp, dt_update, NULL, true);
-                       dt_update_request_destroy(dt_update);
-
-                       return rc;
-               }
-
                rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
                                         dt_update->dur_buf.ub_req, &req);
-               if (rc == 0) {
-                       down_read(&osp->opd_async_updates_rwsem);
+               if (rc != 0)
+                       return rc;
+               down_read(&osp->opd_async_updates_rwsem);
 
-                       args = ptlrpc_req_async_args(req);
-                       args->oaua_update = dt_update;
-                       args->oaua_count = &osp->opd_async_updates_count;
-                       args->oaua_waitq = &osp->opd_syn_barrier_waitq;
-                       args->oaua_flow_control = flow_control;
-                       req->rq_interpret_reply =
-                               osp_async_update_interpret;
+               args = ptlrpc_req_async_args(req);
+               args->oaua_update = dt_update;
+               args->oaua_count = &osp->opd_async_updates_count;
+               args->oaua_waitq = &osp->opd_syn_barrier_waitq;
+               args->oaua_flow_control = flow_control;
+               req->rq_interpret_reply =
+                       osp_async_update_interpret;
 
-                       atomic_inc(args->oaua_count);
-                       up_read(&osp->opd_async_updates_rwsem);
+               atomic_inc(args->oaua_count);
+               up_read(&osp->opd_async_updates_rwsem);
 
-                       ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
-               } else {
-                       dt_update_request_destroy(dt_update);
-               }
+               ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
        } else {
-               th->th_sync = 1;
                rc = osp_remote_sync(env, osp, dt_update, NULL, true);
        }
 
@@ -589,6 +565,55 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
 }
 
 /**
+ * Get local thandle for osp_thandle
+ *
+ * Get the local OSD thandle from the OSP thandle. Currently, there
+ * are a few OSP API (osp_object_create() and osp_sync_add()) needs
+ * to update the object on local OSD device.
+ *
+ * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then
+ * we will get local thandle by thandle_get_sub_by_dt.
+ *
+ * If the osp_thandle is remote thandle (th_top == NULL, only used
+ * by LFSCK), then it will create a local thandle, and stop it in
+ * osp_trans_stop(). And this only happens on OSP for OST.
+ *
+ * These are temporary solution, once OSP accessing OSD object is
+ * being fixed properly, this function should be removed. XXX
+ *
+ * \param[in] env              pointer to the thread context
+ * \param[in] th               pointer to the transaction handler
+ * \param[in] dt               pointer to the OSP device
+ *
+ * \retval                     pointer to the local thandle
+ * \retval                     ERR_PTR(errno) if it fails.
+ **/
+struct thandle *osp_get_storage_thandle(const struct lu_env *env,
+                                       struct thandle *th,
+                                       struct osp_device *osp)
+{
+       struct osp_thandle      *oth;
+       struct thandle          *local_th;
+
+       if (th->th_top != NULL)
+               return thandle_get_sub_by_dt(env, th->th_top,
+                                            osp->opd_storage);
+
+       LASSERT(!osp->opd_connect_mdt);
+       oth = thandle_to_osp_thandle(th);
+       if (oth->ot_storage_th != NULL)
+               return oth->ot_storage_th;
+
+       local_th = dt_trans_create(env, osp->opd_storage);
+       if (IS_ERR(local_th))
+               return local_th;
+
+       oth->ot_storage_th = local_th;
+
+       return local_th;
+}
+
+/**
  * The OSP layer dt_device_operations::dt_trans_start() interface
  * to start the transaction.
  *
@@ -618,22 +643,42 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th)
 {
-       struct thandle_update           *tu = th->th_update;
-       struct dt_update_request        *dt_update;
-       int                              rc = 0;
-
-       if (tu == NULL)
-               return rc;
+       struct osp_thandle       *oth = thandle_to_osp_thandle(th);
+       struct dt_update_request *dt_update;
+       int                      rc = 0;
 
-       /* Check whether there are updates related with this OSP */
-       dt_update = out_find_update(tu, dt);
-       if (dt_update == NULL)
-               return rc;
+       dt_update = oth->ot_dur;
+       LASSERT(dt_update != NULL);
 
-       if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
+       /* return if there are no updates,  */
+       if (dt_update->dur_buf.ub_req == NULL ||
+           dt_update->dur_buf.ub_req->ourq_count == 0)
+               GOTO(out, rc = 0);
+
+       /* Note: some updates needs to send before local transaction,
+        * some needs to send after local transaction.
+        *
+        * If the transaction only includes remote updates, it will
+        * send updates to remote MDT in osp_trans_stop.
+        *
+        * If it is remote create, it will send the remote req after
+        * local transaction. i.e. create the object locally first,
+        * then insert the name entry.
+        *
+        * If it is remote unlink, it will send the remote req before
+        * the local transaction, i.e. delete the name entry remote
+        * first, then destroy the local object. */
+       if (!is_only_remote_trans(th) &&
+           !oth->ot_send_updates_after_local_trans)
                rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th,
                                       false);
 
+out:
+       /* For remote thandle, if there are local thandle, start it here*/
+       if (th->th_top == NULL && oth->ot_storage_th != NULL)
+               rc = dt_trans_start(env, oth->ot_storage_th->th_dev,
+                                   oth->ot_storage_th);
+
        return rc;
 }
 
@@ -660,66 +705,65 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                   struct thandle *th)
 {
-       struct thandle_update           *tu = th->th_update;
-       struct dt_update_request        *dt_update;
-       int                              rc = 0;
-       ENTRY;
 
-       LASSERT(tu != NULL);
-       LASSERT(tu != LP_POISON);
+       struct osp_thandle       *oth = thandle_to_osp_thandle(th);
+       struct dt_update_request *dt_update;
+       int                      rc = 0;
+       bool                     keep_dt_update = false;
+       ENTRY;
 
-       /* Check whether there are updates related with this OSP */
-       dt_update = out_find_update(tu, dt);
-       if (dt_update == NULL) {
-               if (!is_only_remote_trans(th))
-                       RETURN(rc);
+       dt_update = oth->ot_dur;
+       LASSERT(dt_update != NULL);
+       LASSERT(dt_update != LP_POISON);
 
-               GOTO(put, rc);
+       /* For remote transaction, if there is local storage thandle,
+        * stop it first */
+       if (oth->ot_storage_th != NULL && th->th_top == NULL) {
+               dt_trans_stop(env, oth->ot_storage_th->th_dev,
+                             oth->ot_storage_th);
+               oth->ot_storage_th = NULL;
        }
-
+       /* If there are no updates, destroy dt_update and thandle */
        if (dt_update->dur_buf.ub_req == NULL ||
-           dt_update->dur_buf.ub_req->ourq_count == 0) {
-               dt_update_request_destroy(dt_update);
-               GOTO(put, rc);
-       }
-
-       if (is_only_remote_trans(th)) {
-               if (th->th_result == 0) {
-                       struct osp_device *osp = dt2osp_dev(th->th_dev);
-                       struct client_obd *cli = &osp->opd_obd->u.cli;
+           dt_update->dur_buf.ub_req->ourq_count == 0)
+               GOTO(out, rc);
 
-                       rc = obd_get_request_slot(cli);
-                       if (!osp->opd_imp_active || !osp->opd_imp_connected) {
-                               if (rc == 0)
-                                       obd_put_request_slot(cli);
+       if (is_only_remote_trans(th) && !th->th_sync) {
+               struct osp_device *osp = dt2osp_dev(th->th_dev);
+               struct client_obd *cli = &osp->opd_obd->u.cli;
 
-                               rc = -ENOTCONN;
-                       }
-
-                       if (rc != 0) {
-                               dt_update_request_destroy(dt_update);
-                               GOTO(put, rc);
-                       }
+               if (th->th_result != 0) {
+                       rc = th->th_result;
+                       GOTO(out, rc);
+               }
 
-                       rc = osp_trans_trigger(env, dt2osp_dev(dt),
-                                              dt_update, th, true);
-                       if (rc != 0)
+               rc = obd_get_request_slot(cli);
+               if (!osp->opd_imp_active || !osp->opd_imp_connected) {
+                       if (rc == 0)
                                obd_put_request_slot(cli);
-               } else {
-                       rc = th->th_result;
-                       dt_update_request_destroy(dt_update);
+                       rc = -ENOTCONN;
                }
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               rc = osp_trans_trigger(env, dt2osp_dev(dt),
+                                      dt_update, th, true);
+               if (rc != 0)
+                       obd_put_request_slot(cli);
+               else
+                       keep_dt_update = true;
        } else {
-               if (tu->tu_sent_after_local_trans)
-                       rc = osp_trans_trigger(env, dt2osp_dev(dt),
-                                              dt_update, th, false);
+               if (oth->ot_send_updates_after_local_trans ||
+                  (is_only_remote_trans(th) && th->th_sync))
+                       rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update,
+                                              th, false);
                rc = dt_update->dur_rc;
-               dt_update_request_destroy(dt_update);
        }
 
-       GOTO(put, rc);
+out:
+       if (!keep_dt_update)
+               dt_update_request_destroy(dt_update);
+       OBD_FREE_PTR(oth);
 
-put:
-       thandle_put(th);
-       return rc;
+       RETURN(rc);
 }