#include "osp_internal.h"
struct osp_async_update_args {
- struct update_request *oaua_update;
+ struct dt_update_request *oaua_update;
+ bool oaua_flow_control;
};
struct osp_async_update_item {
struct ptlrpc_request *req,
void *arg, int rc)
{
- struct update_reply *reply = NULL;
+ struct object_update_reply *reply = NULL;
struct osp_async_update_args *oaua = arg;
- struct update_request *update = oaua->oaua_update;
+ struct dt_update_request *dt_update = oaua->oaua_update;
struct osp_async_update_item *oaui;
struct osp_async_update_item *next;
int count = 0;
int index = 0;
int rc1 = 0;
+ if (oaua->oaua_flow_control)
+ obd_put_request_slot(
+ &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli);
+
if (rc == 0 || req->rq_repmsg != NULL) {
reply = req_capsule_server_sized_get(&req->rq_pill,
- &RMF_UPDATE_REPLY,
- UPDATE_BUFFER_SIZE);
- if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
+ &RMF_OUT_UPDATE_REPLY,
+ OUT_UPDATE_REPLY_SIZE);
+ if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
rc1 = -EPROTO;
else
- count = reply->ur_count;
+ count = reply->ourp_count;
} else {
rc1 = rc;
}
- list_for_each_entry_safe(oaui, next, &update->ur_cb_items, oaui_list) {
+ list_for_each_entry_safe(oaui, next, &dt_update->dur_cb_items,
+ oaui_list) {
list_del_init(&oaui->oaui_list);
- if (index < count && reply->ur_lens[index] > 0) {
- char *ptr = update_get_buf_internal(reply, index, NULL);
-
- LASSERT(ptr != NULL);
-
- rc1 = le32_to_cpu(*(int *)ptr);
+ if (index < count && reply->ourp_lens[index] > 0) {
+ struct object_update_result *result;
+
+ result = object_update_result_get(reply, index, NULL);
+ if (result == NULL)
+ rc1 = -EPROTO;
+ else
+ rc1 = result->our_rc;
} else {
rc1 = rc;
if (unlikely(rc1 == 0))
rc1 = -EINVAL;
}
- oaui->oaui_interpterer(env, reply, oaui->oaui_obj,
+ oaui->oaui_interpterer(env, reply, req, oaui->oaui_obj,
oaui->oaui_data, index, rc1);
osp_async_update_item_fini(env, oaui);
index++;
}
- out_destroy_update_req(update);
+ out_destroy_update_req(dt_update);
return 0;
}
int osp_unplug_async_update(const struct lu_env *env,
struct osp_device *osp,
- struct update_request *update)
+ struct dt_update_request *update)
{
struct osp_async_update_args *args;
struct ptlrpc_request *req = NULL;
int rc;
rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
- update->ur_buf, UPDATE_BUFFER_SIZE, &req);
+ update->dur_req, &req);
if (rc != 0) {
struct osp_async_update_item *oaui;
struct osp_async_update_item *next;
list_for_each_entry_safe(oaui, next,
- &update->ur_cb_items, oaui_list) {
+ &update->dur_cb_items, oaui_list) {
list_del_init(&oaui->oaui_list);
- oaui->oaui_interpterer(env, NULL, oaui->oaui_obj,
+ oaui->oaui_interpterer(env, NULL, NULL, oaui->oaui_obj,
oaui->oaui_data, 0, rc);
osp_async_update_item_fini(env, oaui);
}
out_destroy_update_req(update);
} else {
- LASSERT(list_empty(&update->ur_list));
+ LASSERT(list_empty(&update->dur_list));
args = ptlrpc_req_async_args(req);
args->oaua_update = update;
}
/* with osp::opd_async_requests_mutex held */
-struct update_request *
+struct dt_update_request *
osp_find_or_create_async_update_request(struct osp_device *osp)
{
- struct update_request *update = osp->opd_async_requests;
+ struct dt_update_request *update = osp->opd_async_requests;
if (update != NULL)
return update;
/* with osp::opd_async_requests_mutex held */
int osp_insert_async_update(const struct lu_env *env,
- struct update_request *update, int op,
+ struct dt_update_request *update, int op,
struct osp_object *obj, int count,
int *lens, const char **bufs, void *data,
osp_async_update_interpterer_t interpterer)
}
if (rc == 0)
- list_add_tail(&oaui->oaui_list, &update->ur_cb_items);
+ list_add_tail(&oaui->oaui_list, &update->dur_cb_items);
GOTO(out, rc);
{
struct thandle *th = NULL;
struct thandle_update *tu = NULL;
- int rc;
+ int rc = 0;
OBD_ALLOC_PTR(th);
if (unlikely(th == NULL))
INIT_LIST_HEAD(&tu->tu_remote_update_list);
tu->tu_only_remote_trans = 1;
+ th->th_update = tu;
+
out:
if (rc != 0) {
if (tu != NULL)
}
static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
- struct update_request *update, struct thandle *th)
+ struct dt_update_request *dt_update,
+ struct thandle *th, bool flow_control)
{
struct thandle_update *tu = th->th_update;
int rc = 0;
/* If the transaction only includes remote update, it should
* still be asynchronous */
- if (tu->tu_only_remote_trans) {
+ if (is_only_remote_trans(th)) {
struct osp_async_update_args *args;
struct ptlrpc_request *req;
- list_del_init(&update->ur_list);
+ list_del_init(&dt_update->dur_list);
rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
- update->ur_buf,
- UPDATE_BUFFER_SIZE, &req);
+ dt_update->dur_req, &req);
if (rc == 0) {
args = ptlrpc_req_async_args(req);
- args->oaua_update = update;
+ args->oaua_update = dt_update;
+ args->oaua_flow_control = flow_control;
req->rq_interpret_reply =
osp_async_update_interpret;
ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
} else {
- out_destroy_update_req(update);
+ out_destroy_update_req(dt_update);
}
} else {
/* Before we support async update, the cross MDT transaction
* has to been synchronized */
th->th_sync = 1;
rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
- update, NULL);
+ dt_update, NULL);
}
return rc;
struct thandle *th)
{
struct thandle_update *tu = th->th_update;
- struct update_request *update;
+ struct dt_update_request *dt_update;
int rc = 0;
if (tu == NULL)
return rc;
/* Check whether there are updates related with this OSP */
- update = out_find_update(tu, dt);
- if (update == NULL)
+ dt_update = out_find_update(tu, dt);
+ if (dt_update == NULL)
return rc;
/* Note: some updates needs to send before local transaction,
* If it is remote unlink, it will send the remote req before
* the local transaction, i.e. delete the name entry remote
* first, then destroy the local object. */
- if (!tu->tu_only_remote_trans && !tu->tu_sent_after_local_trans)
- rc = osp_trans_trigger(env, dt2osp_dev(dt), update, th);
+ if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
+ rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th,
+ false);
return rc;
}
int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
struct thandle *th)
{
- struct thandle_update *tu = th->th_update;
- struct update_request *update;
+ struct thandle_update *tu = th->th_update;
+ struct dt_update_request *dt_update;
int rc = 0;
LASSERT(tu != NULL);
+ LASSERT(tu != LP_POISON);
/* Check whether there are updates related with this OSP */
- update = out_find_update(tu, dt);
- if (update == NULL)
- return rc;
+ dt_update = out_find_update(tu, dt);
+ if (dt_update == NULL) {
+ if (!is_only_remote_trans(th))
+ return rc;
+ goto put;
+ }
- if (update->ur_buf->ub_count == 0)
- GOTO(free, rc);
+ if (dt_update->dur_req->ourq_count == 0) {
+ out_destroy_update_req(dt_update);
+ goto put;
+ }
+
+ if (is_only_remote_trans(th)) {
+ if (th->th_result == 0) {
+ struct osp_device *osp = dt2osp_dev(th->th_dev);
+ struct client_obd *cli = &osp->opd_obd->u.cli;
+
+ rc = obd_get_request_slot(cli);
+ if (!osp->opd_imp_active || osp->opd_got_disconnected) {
+ if (rc == 0)
+ obd_put_request_slot(cli);
+
+ rc = -ENOTCONN;
+ }
+
+ if (rc != 0) {
+ out_destroy_update_req(dt_update);
+ goto put;
+ }
- if (tu->tu_only_remote_trans) {
- if (th->th_result == 0)
rc = osp_trans_trigger(env, dt2osp_dev(dt),
- update, th);
- else
+ dt_update, th, true);
+ if (rc != 0)
+ obd_put_request_slot(cli);
+ } else {
rc = th->th_result;
+ out_destroy_update_req(dt_update);
+ }
} else {
if (tu->tu_sent_after_local_trans)
rc = osp_trans_trigger(env, dt2osp_dev(dt),
- update, th);
- rc = update->ur_rc;
+ dt_update, th, false);
+ rc = dt_update->dur_rc;
+ out_destroy_update_req(dt_update);
}
-free:
- out_destroy_update_req(update);
+
+put:
thandle_put(th);
return rc;
}