Whamcloud - gitweb
LU-3539 osp: Fix a series of UPDATE_OBJ endianness bugs
[fs/lustre-release.git] / lustre / osp / osp_trans.c
index 902de54..fd137d1 100644 (file)
@@ -34,7 +34,8 @@
 #include "osp_internal.h"
 
 struct osp_async_update_args {
-       struct update_request   *oaua_update;
+       struct dt_update_request *oaua_update;
+       unsigned int             oaua_fc:1;
 };
 
 struct osp_async_update_item {
@@ -76,76 +77,83 @@ static int osp_async_update_interpret(const struct lu_env *env,
                                      struct ptlrpc_request *req,
                                      void *arg, int rc)
 {
-       struct update_reply             *reply  = NULL;
+       struct object_update_reply      *reply  = NULL;
        struct osp_async_update_args    *oaua   = arg;
-       struct update_request           *update = oaua->oaua_update;
+       struct dt_update_request        *dt_update = oaua->oaua_update;
        struct osp_async_update_item    *oaui;
        struct osp_async_update_item    *next;
+       struct osp_device               *osp    = dt2osp_dev(dt_update->dur_dt);
        int                              count  = 0;
        int                              index  = 0;
        int                              rc1    = 0;
 
+       if (oaua->oaua_fc)
+               up(&osp->opd_async_fc_sem);
+
        if (rc == 0 || req->rq_repmsg != NULL) {
                reply = req_capsule_server_sized_get(&req->rq_pill,
-                                                    &RMF_UPDATE_REPLY,
-                                                    UPDATE_BUFFER_SIZE);
-               if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
+                                                    &RMF_OUT_UPDATE_REPLY,
+                                                    OUT_UPDATE_REPLY_SIZE);
+               if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
                        rc1 = -EPROTO;
                else
-                       count = reply->ur_count;
+                       count = reply->ourp_count;
        } else {
                rc1 = rc;
        }
 
-       list_for_each_entry_safe(oaui, next, &update->ur_cb_items, oaui_list) {
+       list_for_each_entry_safe(oaui, next, &dt_update->dur_cb_items,
+                                oaui_list) {
                list_del_init(&oaui->oaui_list);
-               if (index < count && reply->ur_lens[index] > 0) {
-                       char *ptr = update_get_buf_internal(reply, index, NULL);
-
-                       LASSERT(ptr != NULL);
-
-                       rc1 = le32_to_cpu(*(int *)ptr);
+               if (index < count && reply->ourp_lens[index] > 0) {
+                       struct object_update_result *result;
+
+                       result = object_update_result_get(reply, index, NULL);
+                       if (result == NULL)
+                               rc1 = -EPROTO;
+                       else
+                               rc1 = result->our_rc;
                } else {
                        rc1 = rc;
                        if (unlikely(rc1 == 0))
                                rc1 = -EINVAL;
                }
 
-               oaui->oaui_interpterer(env, reply, oaui->oaui_obj,
+               oaui->oaui_interpterer(env, reply, req, oaui->oaui_obj,
                                       oaui->oaui_data, index, rc1);
                osp_async_update_item_fini(env, oaui);
                index++;
        }
 
-       out_destroy_update_req(update);
+       out_destroy_update_req(dt_update);
 
        return 0;
 }
 
 int osp_unplug_async_update(const struct lu_env *env,
                            struct osp_device *osp,
-                           struct update_request *update)
+                           struct dt_update_request *update)
 {
        struct osp_async_update_args    *args;
        struct ptlrpc_request           *req = NULL;
        int                              rc;
 
        rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
-                                update->ur_buf, UPDATE_BUFFER_SIZE, &req);
+                                update->dur_req, &req);
        if (rc != 0) {
                struct osp_async_update_item *oaui;
                struct osp_async_update_item *next;
 
                list_for_each_entry_safe(oaui, next,
-                                        &update->ur_cb_items, oaui_list) {
+                                        &update->dur_cb_items, oaui_list) {
                        list_del_init(&oaui->oaui_list);
-                       oaui->oaui_interpterer(env, NULL, oaui->oaui_obj,
+                       oaui->oaui_interpterer(env, NULL, NULL, oaui->oaui_obj,
                                               oaui->oaui_data, 0, rc);
                        osp_async_update_item_fini(env, oaui);
                }
                out_destroy_update_req(update);
        } else {
-               LASSERT(list_empty(&update->ur_list));
+               LASSERT(list_empty(&update->dur_list));
 
                args = ptlrpc_req_async_args(req);
                args->oaua_update = update;
@@ -157,10 +165,10 @@ int osp_unplug_async_update(const struct lu_env *env,
 }
 
 /* with osp::opd_async_requests_mutex held */
-struct update_request *
+struct dt_update_request *
 osp_find_or_create_async_update_request(struct osp_device *osp)
 {
-       struct update_request *update = osp->opd_async_requests;
+       struct dt_update_request *update = osp->opd_async_requests;
 
        if (update != NULL)
                return update;
@@ -174,7 +182,7 @@ osp_find_or_create_async_update_request(struct osp_device *osp)
 
 /* with osp::opd_async_requests_mutex held */
 int osp_insert_async_update(const struct lu_env *env,
-                           struct update_request *update, int op,
+                           struct dt_update_request *update, int op,
                            struct osp_object *obj, int count,
                            int *lens, const char **bufs, void *data,
                            osp_async_update_interpterer_t interpterer)
@@ -208,7 +216,7 @@ again:
        }
 
        if (rc == 0)
-               list_add_tail(&oaui->oaui_list, &update->ur_cb_items);
+               list_add_tail(&oaui->oaui_list, &update->dur_cb_items);
 
        GOTO(out, rc);
 
@@ -228,7 +236,7 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
 {
        struct thandle *th = NULL;
        struct thandle_update *tu = NULL;
-       int rc;
+       int rc = 0;
 
        OBD_ALLOC_PTR(th);
        if (unlikely(th == NULL))
@@ -245,6 +253,8 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
 
        INIT_LIST_HEAD(&tu->tu_remote_update_list);
        tu->tu_only_remote_trans = 1;
+       th->th_update = tu;
+
 out:
        if (rc != 0) {
                if (tu != NULL)
@@ -258,7 +268,8 @@ out:
 }
 
 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
-                            struct update_request *update, struct thandle *th)
+                            struct dt_update_request *dt_update,
+                            struct thandle *th, bool fc)
 {
        struct thandle_update   *tu = th->th_update;
        int                     rc = 0;
@@ -267,29 +278,29 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
 
        /* If the transaction only includes remote update, it should
         * still be asynchronous */
-       if (tu->tu_only_remote_trans) {
+       if (is_only_remote_trans(th)) {
                struct osp_async_update_args    *args;
                struct ptlrpc_request           *req;
 
-               list_del_init(&update->ur_list);
+               list_del_init(&dt_update->dur_list);
                rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
-                                        update->ur_buf,
-                                        UPDATE_BUFFER_SIZE, &req);
+                                        dt_update->dur_req, &req);
                if (rc == 0) {
                        args = ptlrpc_req_async_args(req);
-                       args->oaua_update = update;
+                       args->oaua_update = dt_update;
+                       args->oaua_fc = !!fc;
                        req->rq_interpret_reply =
                                osp_async_update_interpret;
                        ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
                } else {
-                       out_destroy_update_req(update);
+                       out_destroy_update_req(dt_update);
                }
        } else {
                /* Before we support async update, the cross MDT transaction
                 * has to been synchronized */
                th->th_sync = 1;
                rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
-                                    update, NULL);
+                                    dt_update, NULL);
        }
 
        return rc;
@@ -299,15 +310,15 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th)
 {
        struct thandle_update *tu = th->th_update;
-       struct update_request *update;
+       struct dt_update_request *dt_update;
        int rc = 0;
 
        if (tu == NULL)
                return rc;
 
        /* Check whether there are updates related with this OSP */
-       update = out_find_update(tu, dt);
-       if (update == NULL)
+       dt_update = out_find_update(tu, dt);
+       if (dt_update == NULL)
                return rc;
 
        /* Note: some updates needs to send before local transaction,
@@ -323,8 +334,9 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
         * If it is remote unlink, it will send the remote req before
         * the local transaction, i.e. delete the name entry remote
         * first, then destroy the local object. */
-       if (!tu->tu_only_remote_trans && !tu->tu_sent_after_local_trans)
-               rc = osp_trans_trigger(env, dt2osp_dev(dt), update, th);
+       if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
+               rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th,
+                                      false);
 
        return rc;
 }
@@ -332,33 +344,59 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                   struct thandle *th)
 {
-       struct thandle_update   *tu = th->th_update;
-       struct update_request   *update;
+       struct thandle_update           *tu = th->th_update;
+       struct dt_update_request        *dt_update;
        int rc = 0;
 
        LASSERT(tu != NULL);
+        LASSERT(tu != LP_POISON);
        /* Check whether there are updates related with this OSP */
-       update = out_find_update(tu, dt);
-       if (update == NULL)
-               return rc;
+       dt_update = out_find_update(tu, dt);
+       if (dt_update == NULL) {
+               if (!is_only_remote_trans(th))
+                       return rc;
+               goto put;
+       }
 
-       if (update->ur_buf->ub_count == 0)
-               GOTO(free, rc);
+       if (dt_update->dur_req->ourq_count == 0) {
+               out_destroy_update_req(dt_update);
+               goto put;
+       }
+
+       if (is_only_remote_trans(th)) {
+               if (th->th_result == 0) {
+                       struct osp_device *osp = dt2osp_dev(th->th_dev);
+
+                       do {
+                               if (!osp->opd_imp_active ||
+                                   osp->opd_got_disconnected) {
+                                       out_destroy_update_req(dt_update);
+                                       GOTO(put, rc = -ENOTCONN);
+                               }
+
+                               /* Get the semaphore to guarantee it has
+                                * free slot, which will be released via
+                                * osp_async_update_interpret(). */
+                               rc = down_timeout(&osp->opd_async_fc_sem, HZ);
+                       } while (rc != 0);
 
-       if (tu->tu_only_remote_trans) {
-               if (th->th_result == 0)
                        rc = osp_trans_trigger(env, dt2osp_dev(dt),
-                                              update, th);
-               else
+                                              dt_update, th, true);
+                       if (rc != 0)
+                               up(&osp->opd_async_fc_sem);
+               } else {
                        rc = th->th_result;
+                       out_destroy_update_req(dt_update);
+               }
        } else {
                if (tu->tu_sent_after_local_trans)
                        rc = osp_trans_trigger(env, dt2osp_dev(dt),
-                                              update, th);
-               rc = update->ur_rc;
+                                              dt_update, th, false);
+               rc = dt_update->dur_rc;
+               out_destroy_update_req(dt_update);
        }
-free:
-       out_destroy_update_req(update);
+
+put:
        thandle_put(th);
        return rc;
 }