X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosp%2Fosp_trans.c;h=9686863b7abfa1ab1c9a60ad4d09c969143dcae0;hp=6d0067aecbf6961ff1be79376d2e5c428957f3d3;hb=f32fbf189fab57202810a9a47343a14c6c3ead95;hpb=2fe22edfe3c365b5c270050fdeed0a86fa74a919 diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index 6d0067a..9686863 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2014, Intel Corporation. + * Copyright (c) 2014, 2017, Intel Corporation. */ /* * lustre/osp/osp_trans.c @@ -66,6 +66,7 @@ #define DEBUG_SUBSYSTEM S_MDS +#include #include "osp_internal.h" /** @@ -76,6 +77,7 @@ struct osp_update_args { atomic_t *oaua_count; wait_queue_head_t *oaua_waitq; bool oaua_flow_control; + const struct lu_env *oaua_update_env; }; /** @@ -95,25 +97,72 @@ struct osp_update_callback { osp_update_interpreter_t ouc_interpreter; }; -static struct object_update_request *object_update_request_alloc(size_t size) +/** + * Allocate new update request + * + * Allocate new update request and insert it to the req_update_list. + * + * \param [in] our osp_udate_request where to create a new + * update request + * + * \retval 0 if creation succeeds. + * \retval negative errno if creation fails. + */ +int osp_object_update_request_create(struct osp_update_request *our, + size_t size) { + struct osp_update_request_sub *ours; struct object_update_request *ourq; + OBD_ALLOC_PTR(ours); + if (ours == NULL) + return -ENOMEM; + + /* The object update request will be added to an SG list for + * bulk transfer. Some IB HW cannot handle partial pages in SG + * lists (since they create gaps in memory regions) so we + * round the size up to the next multiple of PAGE_SIZE. See + * LU-9983. */ + LASSERT(size > 0); + size = round_up(size, PAGE_SIZE); OBD_ALLOC_LARGE(ourq, size); - if (ourq == NULL) - return ERR_PTR(-ENOMEM); + if (ourq == NULL) { + OBD_FREE_PTR(ours); + return -ENOMEM; + } ourq->ourq_magic = UPDATE_REQUEST_MAGIC; ourq->ourq_count = 0; + ours->ours_req = ourq; + ours->ours_req_size = size; + INIT_LIST_HEAD(&ours->ours_list); + list_add_tail(&ours->ours_list, &our->our_req_list); + our->our_req_nr++; - return ourq; + return 0; } -static void object_update_request_free(struct object_update_request *ourq, - size_t ourq_size) +/** + * Get current update request + * + * Get current object update request from our_req_list in + * osp_update_request, because we always insert the new update + * request in the last position, so the last update request + * in the list will be the current update req. + * + * \param[in] our osp update request where to get the + * current object update. + * + * \retval the current object update. + **/ +struct osp_update_request_sub * +osp_current_object_update_request(struct osp_update_request *our) { - if (ourq != NULL) - OBD_FREE_LARGE(ourq, ourq_size); + if (list_empty(&our->our_req_list)) + return NULL; + + return list_entry(our->our_req_list.prev, struct osp_update_request_sub, + ours_list); } /** @@ -130,35 +179,67 @@ static void object_update_request_free(struct object_update_request *ourq, */ struct osp_update_request *osp_update_request_create(struct dt_device *dt) { - struct osp_update_request *osp_update_req; - struct object_update_request *ourq; + struct osp_update_request *our; + int rc; - OBD_ALLOC_PTR(osp_update_req); - if (osp_update_req == NULL) + OBD_ALLOC_PTR(our); + if (our == NULL) return ERR_PTR(-ENOMEM); - ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE); - if (IS_ERR(ourq)) { - OBD_FREE_PTR(osp_update_req); - return ERR_CAST(ourq); - } - - osp_update_req->our_req = ourq; - osp_update_req->our_req_size = OUT_UPDATE_INIT_BUFFER_SIZE; + INIT_LIST_HEAD(&our->our_req_list); + INIT_LIST_HEAD(&our->our_cb_items); + INIT_LIST_HEAD(&our->our_list); + INIT_LIST_HEAD(&our->our_invalidate_cb_list); + spin_lock_init(&our->our_list_lock); - INIT_LIST_HEAD(&osp_update_req->our_cb_items); - INIT_LIST_HEAD(&osp_update_req->our_list); - - return osp_update_req; + rc = osp_object_update_request_create(our, PAGE_SIZE); + if (rc != 0) { + OBD_FREE_PTR(our); + return ERR_PTR(rc); + } + return our; } -void osp_update_request_destroy(struct osp_update_request *our) +void osp_update_request_destroy(const struct lu_env *env, + struct osp_update_request *our) { + struct osp_update_request_sub *ours; + struct osp_update_request_sub *tmp; + if (our == NULL) return; - object_update_request_free(our->our_req, - our->our_req_size); + list_for_each_entry_safe(ours, tmp, &our->our_req_list, ours_list) { + list_del(&ours->ours_list); + if (ours->ours_req != NULL) + OBD_FREE_LARGE(ours->ours_req, ours->ours_req_size); + OBD_FREE_PTR(ours); + } + + if (!list_empty(&our->our_invalidate_cb_list)) { + struct lu_env lenv; + struct osp_object *obj; + struct osp_object *next; + + if (env == NULL) { + lu_env_init(&lenv, LCT_MD_THREAD | LCT_DT_THREAD); + env = &lenv; + } + + list_for_each_entry_safe(obj, next, + &our->our_invalidate_cb_list, + opo_invalidate_cb_list) { + spin_lock(&obj->opo_lock); + list_del_init(&obj->opo_invalidate_cb_list); + spin_unlock(&obj->opo_lock); + + dt_object_put(env, &obj->opo_obj); + } + + if (env == &lenv) + lu_env_fini(&lenv); + } + OBD_FREE_PTR(our); } @@ -175,12 +256,13 @@ object_update_request_dump(const struct object_update_request *ourq, update = object_update_request_get(ourq, i, &size); LASSERT(update != NULL); - CDEBUG(mask, "i = %u fid = "DFID" op = %s master = %u" - "params = %d batchid = "LPU64" size = %zu\n", + CDEBUG(mask, "i = %u fid = "DFID" op = %s " + "params = %d batchid = %llu size = %zu repsize %u\n", i, PFID(&update->ou_fid), update_op_str(update->ou_type), - update->ou_master_index, update->ou_params_count, - update->ou_batchid, size); + update->ou_params_count, + update->ou_batchid, size, + (unsigned)update->ou_result_size); total_size += size; } @@ -189,7 +271,263 @@ object_update_request_dump(const struct object_update_request *ourq, ourq->ourq_magic, ourq->ourq_count, total_size); } -static void osp_trans_stop_cb(struct osp_thandle *oth, int result) +/** + * Prepare inline update request + * + * Prepare OUT update ptlrpc inline request, and the request usually includes + * one update buffer, which does not need bulk transfer. + * + * \param[in] env execution environment + * \param[in] req ptlrpc request + * \param[in] ours sub osp_update_request to be packed + * + * \retval 0 if packing succeeds + * \retval negative errno if packing fails + */ +int osp_prep_inline_update_req(const struct lu_env *env, + struct ptlrpc_request *req, + struct osp_update_request *our, + int repsize) +{ + struct osp_update_request_sub *ours; + struct out_update_header *ouh; + __u32 update_req_size; + int rc; + + ours = list_entry(our->our_req_list.next, + struct osp_update_request_sub, ours_list); + update_req_size = object_update_request_size(ours->ours_req); + req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT, + update_req_size + sizeof(*ouh)); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE); + if (rc != 0) + RETURN(rc); + + ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER); + ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC; + ouh->ouh_count = 1; + ouh->ouh_inline_length = update_req_size; + ouh->ouh_reply_size = repsize; + + memcpy(ouh->ouh_inline_data, ours->ours_req, update_req_size); + + req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY, + RCL_SERVER, repsize); + + ptlrpc_request_set_replen(req); + req->rq_request_portal = OUT_PORTAL; + req->rq_reply_portal = OSC_REPLY_PORTAL; + + RETURN(rc); +} + +/** + * Prepare update request. + * + * Prepare OUT update ptlrpc request, and the request usually includes + * all of updates (stored in \param ureq) from one operation. + * + * \param[in] env execution environment + * \param[in] imp import on which ptlrpc request will be sent + * \param[in] ureq hold all of updates which will be packed into the req + * \param[in] reqp request to be created + * + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. + */ +int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, + struct osp_update_request *our, + struct ptlrpc_request **reqp) +{ + struct ptlrpc_request *req; + struct ptlrpc_bulk_desc *desc; + struct osp_update_request_sub *ours; + const struct object_update_request *ourq; + struct out_update_header *ouh; + struct out_update_buffer *oub; + __u32 buf_count = 0; + int page_count = 0; + int repsize = 0; + struct object_update_reply *reply; + int rc, i; + int total = 0; + ENTRY; + + list_for_each_entry(ours, &our->our_req_list, ours_list) { + object_update_request_dump(ours->ours_req, D_INFO); + + ourq = ours->ours_req; + for (i = 0; i < ourq->ourq_count; i++) { + struct object_update *update; + size_t size = 0; + + + /* XXX: it's very inefficient to lookup update + * this way, iterating from the beginning + * each time */ + update = object_update_request_get(ourq, i, &size); + LASSERT(update != NULL); + + repsize += sizeof(reply->ourp_lens[0]); + repsize += sizeof(struct object_update_result); + repsize += update->ou_result_size; + } + + buf_count++; + } + repsize += sizeof(*reply); + if (repsize < OUT_UPDATE_REPLY_SIZE) + repsize = OUT_UPDATE_REPLY_SIZE; + LASSERT(buf_count > 0); + + req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE); + if (req == NULL) + RETURN(-ENOMEM); + + if (buf_count == 1) { + ours = list_entry(our->our_req_list.next, + struct osp_update_request_sub, ours_list); + + /* Let's check if it can be packed inline */ + if (object_update_request_size(ours->ours_req) + + sizeof(struct out_update_header) < + OUT_UPDATE_MAX_INLINE_SIZE) { + rc = osp_prep_inline_update_req(env, req, our, repsize); + if (rc == 0) + *reqp = req; + GOTO(out_req, rc); + } + } + + req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT, + sizeof(struct osp_update_request)); + + req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_BUF, RCL_CLIENT, + buf_count * sizeof(*oub)); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE); + if (rc != 0) + GOTO(out_req, rc); + + ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER); + ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC; + ouh->ouh_count = buf_count; + ouh->ouh_inline_length = 0; + ouh->ouh_reply_size = repsize; + oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF); + list_for_each_entry(ours, &our->our_req_list, ours_list) { + oub->oub_size = ours->ours_req_size; + oub++; + page_count += round_up(ours->ours_req_size, PAGE_SIZE) + 1; + } + + req->rq_bulk_write = 1; + desc = ptlrpc_prep_bulk_imp(req, page_count, + MD_MAX_BRW_SIZE >> LNET_MTU_BITS, + PTLRPC_BULK_GET_SOURCE | PTLRPC_BULK_BUF_KIOV, + MDS_BULK_PORTAL, &ptlrpc_bulk_kiov_nopin_ops); + if (desc == NULL) + GOTO(out_req, rc = -ENOMEM); + + /* NB req now owns desc and will free it when it gets freed */ + list_for_each_entry(ours, &our->our_req_list, ours_list) { + desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req, + ours->ours_req_size); + total += ours->ours_req_size; + } + CDEBUG(D_OTHER, "total %d in %u\n", total, our->our_update_nr); + + req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY, + RCL_SERVER, repsize); + + ptlrpc_request_set_replen(req); + req->rq_request_portal = OUT_PORTAL; + req->rq_reply_portal = OSC_REPLY_PORTAL; + *reqp = req; + +out_req: + if (rc < 0) + ptlrpc_req_finished(req); + + RETURN(rc); +} + +/** + * Send update RPC. + * + * Send update request to the remote MDT synchronously. + * + * \param[in] env execution environment + * \param[in] imp import on which ptlrpc request will be sent + * \param[in] our hold all of updates which will be packed into the req + * \param[in] reqp request to be created + * + * \retval 0 if RPC succeeds. + * \retval negative errno if RPC fails. + */ +int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, + struct osp_update_request *our, + struct ptlrpc_request **reqp) +{ + struct obd_import *imp = osp->opd_obd->u.cli.cl_import; + struct ptlrpc_request *req = NULL; + int rc; + ENTRY; + + rc = osp_prep_update_req(env, imp, our, &req); + if (rc != 0) + RETURN(rc); + + osp_set_req_replay(osp, req); + req->rq_allow_intr = 1; + + /* Note: some dt index api might return non-zero result here, like + * osd_index_ea_lookup, so we should only check rc < 0 here */ + rc = ptlrpc_queue_wait(req); + our->our_rc = rc; + if (rc < 0 || reqp == NULL) + ptlrpc_req_finished(req); + else + *reqp = req; + + RETURN(rc); +} + +/** + * Invalidate all objects in the osp thandle + * + * invalidate all of objects in the update request, which will be called + * when the transaction is aborted. + * + * \param[in] oth osp thandle. + */ +static void osp_thandle_invalidate_object(const struct lu_env *env, + struct osp_thandle *oth, + int result) +{ + struct osp_update_request *our = oth->ot_our; + struct osp_object *obj; + struct osp_object *next; + + if (our == NULL) + return; + + list_for_each_entry_safe(obj, next, &our->our_invalidate_cb_list, + opo_invalidate_cb_list) { + if (result < 0) + osp_invalidate(env, &obj->opo_obj); + + spin_lock(&obj->opo_lock); + list_del_init(&obj->opo_invalidate_cb_list); + spin_unlock(&obj->opo_lock); + + dt_object_put(env, &obj->opo_obj); + } +} + +static void osp_trans_stop_cb(const struct lu_env *env, + struct osp_thandle *oth, int result) { struct dt_txn_commit_cb *dcb; struct dt_txn_commit_cb *tmp; @@ -203,6 +541,8 @@ static void osp_trans_stop_cb(struct osp_thandle *oth, int result) list_del_init(&dcb->dcb_linkage); dcb->dcb_func(NULL, &oth->ot_super, dcb, result); } + + osp_thandle_invalidate_object(env, oth, result); } /** @@ -264,23 +604,34 @@ static void osp_update_callback_fini(const struct lu_env *env, * \retval negative error number on failure */ static int osp_update_interpret(const struct lu_env *env, - struct ptlrpc_request *req, void *arg, int rc) + struct ptlrpc_request *req, void *args, int rc) { - struct object_update_reply *reply = NULL; - struct osp_update_args *oaua = arg; - struct osp_update_request *our = oaua->oaua_update; - struct osp_thandle *oth; - struct osp_update_callback *ouc; - struct osp_update_callback *next; - int count = 0; - int index = 0; - int rc1 = 0; + struct object_update_reply *reply = NULL; + struct osp_update_args *oaua = args; + struct osp_update_request *our = oaua->oaua_update; + struct osp_thandle *oth; + struct osp_update_callback *ouc; + struct osp_update_callback *next; + int count = 0; + int index = 0; + int rc1 = 0; ENTRY; if (our == NULL) RETURN(0); + /* Sigh env might be NULL in some cases, see + * this calling path. + * osp_send_update_thread() + * ptlrpc_set_wait() ----> null env. + * ptlrpc_check_set() + * osp_update_interpret() + * Let's use env in oaua for this case. + */ + if (env == NULL) + env = oaua->oaua_update_env; + oaua->oaua_update = NULL; oth = our->our_th; if (oaua->oaua_flow_control) { @@ -292,16 +643,16 @@ static int osp_update_interpret(const struct lu_env *env, } /* Unpack the results from the reply message. */ - if (req->rq_repmsg != NULL) { + if (req->rq_repmsg != NULL && req->rq_replied) { reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_OUT_UPDATE_REPLY, OUT_UPDATE_REPLY_SIZE); - if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) - rc1 = -EPROTO; - else + if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) { + if (rc == 0) + rc = -EPROTO; + } else { count = reply->ourp_count; - } else { - rc1 = rc; + } } list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) { @@ -310,18 +661,21 @@ static int osp_update_interpret(const struct lu_env *env, /* The peer may only have handled some requests (indicated * by the 'count') in the packaged OUT RPC, we can only get * results for the handled part. */ - if (index < count && reply->ourp_lens[index] > 0) { + if (index < count && reply->ourp_lens[index] > 0 && rc >= 0) { struct object_update_result *result; result = object_update_result_get(reply, index, NULL); if (result == NULL) - rc1 = -EPROTO; + rc1 = rc = -EPROTO; else - rc1 = result->our_rc; - } else { - rc1 = rc; - if (unlikely(rc1 == 0)) + rc1 = rc = result->our_rc; + } else if (rc1 >= 0) { + /* The peer did not handle these request, let's return + * -EINVAL to update interpret for now */ + if (rc >= 0) rc1 = -EINVAL; + else + rc1 = rc; } if (ouc->ouc_interpreter != NULL) @@ -338,13 +692,13 @@ static int osp_update_interpret(const struct lu_env *env, if (oth != NULL) { /* oth and osp_update_requests will be destoryed in * osp_thandle_put */ - osp_trans_stop_cb(oth, rc); - osp_thandle_put(oth); + osp_trans_stop_cb(env, oth, rc); + osp_thandle_put(env, oth); } else { - osp_update_request_destroy(our); + osp_update_request_destroy(env, our); } - RETURN(0); + RETURN(rc); } /** @@ -367,7 +721,7 @@ int osp_unplug_async_request(const struct lu_env *env, int rc; rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, - our->our_req, &req); + our, &req); if (rc != 0) { struct osp_update_callback *ouc; struct osp_update_callback *next; @@ -381,15 +735,19 @@ int osp_unplug_async_request(const struct lu_env *env, ouc->ouc_data, 0, rc); osp_update_callback_fini(env, ouc); } - osp_update_request_destroy(our); + osp_update_request_destroy(env, our); } else { - args = ptlrpc_req_async_args(req); + args = ptlrpc_req_async_args(args, req); args->oaua_update = our; args->oaua_count = NULL; args->oaua_waitq = NULL; + /* Note: this is asynchronous call for the request, so the + * interrupte cb and current function will be different + * thread, so we need use different env */ + args->oaua_update_env = NULL; args->oaua_flow_control = false; req->rq_interpret_reply = osp_update_interpret; - ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); + ptlrpcd_add_req(req); } return rc; @@ -474,6 +832,7 @@ int osp_insert_update_callback(const struct lu_env *env, * \param[in] lens buffer length array for the subsequent \a bufs * \param[in] bufs the buffers to compose the request * \param[in] data pointer to the data used by the interpreter + * \param[in] repsize how many bytes the caller allocated for \a data * \param[in] interpreter pointer to the interpreter function * * \retval 0 for success @@ -481,7 +840,8 @@ int osp_insert_update_callback(const struct lu_env *env, */ int osp_insert_async_request(const struct lu_env *env, enum update_type op, struct osp_object *obj, int count, - __u16 *lens, const void **bufs, void *data, + __u16 *lens, const void **bufs, + void *data, __u32 repsize, osp_update_interpreter_t interpreter) { struct osp_device *osp; @@ -489,6 +849,7 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op, struct object_update *object_update; size_t max_update_size; struct object_update_request *ureq; + struct osp_update_request_sub *ours; int rc = 0; ENTRY; @@ -498,12 +859,16 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op, RETURN(PTR_ERR(our)); again: - ureq = our->our_req; - max_update_size = our->our_req_size - object_update_request_size(ureq); + ours = osp_current_object_update_request(our); + + ureq = ours->ours_req; + max_update_size = ours->ours_req_size - + object_update_request_size(ureq); object_update = update_buffer_get_update(ureq, ureq->ourq_count); - rc = out_update_pack(env, object_update, max_update_size, op, - lu_object_fid(osp2lu_obj(obj)), count, lens, bufs); + rc = out_update_pack(env, object_update, &max_update_size, op, + lu_object_fid(osp2lu_obj(obj)), count, lens, bufs, + repsize); /* The queue is full. */ if (rc == -E2BIG) { osp->opd_async_requests = NULL; @@ -524,6 +889,7 @@ again: RETURN(rc); ureq->ourq_count++; + our->our_update_nr++; } rc = osp_insert_update_callback(env, our, obj, data, interpreter); @@ -545,21 +911,20 @@ int osp_trans_update_request_create(struct thandle *th) return PTR_ERR(our); } - if (dt2osp_dev(th->th_dev)->opd_connect_mdt) - our->our_flags = UPDATE_FL_SYNC; - oth->ot_our = our; our->our_th = oth; + return 0; } -void osp_thandle_destroy(struct osp_thandle *oth) +void osp_thandle_destroy(const struct lu_env *env, + struct osp_thandle *oth) { LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC); LASSERT(list_empty(&oth->ot_commit_dcb_list)); LASSERT(list_empty(&oth->ot_stop_dcb_list)); if (oth->ot_our != NULL) - osp_update_request_destroy(oth->ot_our); + osp_update_request_destroy(env, oth->ot_our); OBD_FREE_PTR(oth); } @@ -603,7 +968,6 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d) oth->ot_magic = OSP_THANDLE_MAGIC; th = &oth->ot_super; th->th_dev = d; - th->th_tags = LCT_TX_HANDLE; atomic_set(&oth->ot_refcount, 1); INIT_LIST_HEAD(&oth->ot_commit_dcb_list); @@ -613,112 +977,6 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d) } /** - * Prepare update request. - * - * Prepare OUT update ptlrpc request, and the request usually includes - * all of updates (stored in \param ureq) from one operation. - * - * \param[in] env execution environment - * \param[in] imp import on which ptlrpc request will be sent - * \param[in] ureq hold all of updates which will be packed into the req - * \param[in] reqp request to be created - * - * \retval 0 if preparation succeeds. - * \retval negative errno if preparation fails. - */ -int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, - const struct object_update_request *ureq, - struct ptlrpc_request **reqp) -{ - struct ptlrpc_request *req; - struct object_update_request *tmp; - size_t ureq_len; - int rc; - ENTRY; - - object_update_request_dump(ureq, D_INFO); - req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE); - if (req == NULL) - RETURN(-ENOMEM); - - ureq_len = object_update_request_size(ureq); - req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT, - ureq_len); - - rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE); - if (rc != 0) { - ptlrpc_req_finished(req); - RETURN(rc); - } - - req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY, - RCL_SERVER, OUT_UPDATE_REPLY_SIZE); - - tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE); - memcpy(tmp, ureq, ureq_len); - - ptlrpc_request_set_replen(req); - req->rq_request_portal = OUT_PORTAL; - req->rq_reply_portal = OSC_REPLY_PORTAL; - *reqp = req; - - RETURN(rc); -} - -/** - * Send update RPC. - * - * Send update request to the remote MDT synchronously. - * - * \param[in] env execution environment - * \param[in] imp import on which ptlrpc request will be sent - * \param[in] our hold all of updates which will be packed into the req - * \param[in] reqp request to be created - * - * \retval 0 if RPC succeeds. - * \retval negative errno if RPC fails. - */ - -int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, - struct osp_update_request *our, - struct ptlrpc_request **reqp) -{ - struct obd_import *imp = osp->opd_obd->u.cli.cl_import; - struct ptlrpc_request *req = NULL; - int rc; - ENTRY; - - rc = osp_prep_update_req(env, imp, our->our_req, &req); - if (rc != 0) - RETURN(rc); - - /* This will only be called with read-only update, and these updates - * might be used to retrieve update log during recovery process, so - * it will be allowed to send during recovery process */ - req->rq_allow_replay = 1; - - /* Note: some dt index api might return non-zero result here, like - * osd_index_ea_lookup, so we should only check rc < 0 here */ - rc = ptlrpc_queue_wait(req); - if (rc < 0) { - ptlrpc_req_finished(req); - our->our_rc = rc; - RETURN(rc); - } - - if (reqp != NULL) { - *reqp = req; - RETURN(rc); - } - - our->our_rc = rc; - - ptlrpc_req_finished(req); - - RETURN(rc); -} - -/** * Add commit callback to transaction. * * Add commit callback to the osp thandle, which will be called @@ -771,7 +1029,8 @@ static void osp_request_commit_cb(struct ptlrpc_request *req) RETURN_EXIT; oth = thandle_to_osp_thandle(th); - if (lustre_msg_get_last_committed(req->rq_repmsg)) + if (req->rq_repmsg != NULL && + lustre_msg_get_last_committed(req->rq_repmsg)) last_committed_transno = lustre_msg_get_last_committed(req->rq_repmsg); @@ -780,7 +1039,7 @@ static void osp_request_commit_cb(struct ptlrpc_request *req) last_committed_transno = req->rq_import->imp_peer_committed_transno; - CDEBUG(D_HA, "trans no "LPU64" committed transno "LPU64"\n", + CDEBUG(D_HA, "trans no %llu committed transno %llu\n", req->rq_transno, last_committed_transno); /* If the transaction is not really committed, mark result = 1 */ @@ -790,7 +1049,7 @@ static void osp_request_commit_cb(struct ptlrpc_request *req) osp_trans_commit_cb(oth, result); req->rq_committed = 1; - osp_thandle_put(oth); + osp_thandle_put(NULL, oth); EXIT; } @@ -823,7 +1082,7 @@ void osp_trans_callback(const struct lu_env *env, osp_update_callback_fini(env, ouc); } } - osp_trans_stop_cb(oth, rc); + osp_trans_stop_cb(env, oth, rc); osp_trans_commit_cb(oth, rc); } @@ -849,35 +1108,36 @@ static int osp_send_update_req(const struct lu_env *env, { struct osp_update_args *args; struct ptlrpc_request *req; - struct lu_device *top_device; struct osp_thandle *oth = our->our_th; int rc = 0; ENTRY; LASSERT(oth != NULL); - LASSERT(our->our_req_sent == 0); rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, - our->our_req, &req); + our, &req); if (rc != 0) { osp_trans_callback(env, oth, rc); RETURN(rc); } - args = ptlrpc_req_async_args(req); + args = ptlrpc_req_async_args(args, req); args->oaua_update = our; + /* set env to NULL, in case the interrupt cb and current function + * are in different thread */ + args->oaua_update_env = NULL; osp_thandle_get(oth); /* hold for update interpret */ req->rq_interpret_reply = osp_update_interpret; if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) { if (!osp->opd_imp_active || !osp->opd_imp_connected) { osp_trans_callback(env, oth, rc); - osp_thandle_put(oth); + osp_thandle_put(env, oth); GOTO(out, rc = -ENOTCONN); } rc = obd_get_request_slot(&osp->opd_obd->u.cli); if (rc != 0) { osp_trans_callback(env, oth, rc); - osp_thandle_put(oth); + osp_thandle_put(env, oth); GOTO(out, rc = -ENOTCONN); } args->oaua_flow_control = true; @@ -885,12 +1145,12 @@ static int osp_send_update_req(const struct lu_env *env, if (!osp->opd_connect_mdt) { down_read(&osp->opd_async_updates_rwsem); args->oaua_count = &osp->opd_async_updates_count; - args->oaua_waitq = &osp->opd_syn_barrier_waitq; + args->oaua_waitq = &osp->opd_sync_barrier_waitq; up_read(&osp->opd_async_updates_rwsem); atomic_inc(args->oaua_count); } - ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); + ptlrpcd_add_req(req); req = NULL; } else { osp_thandle_get(oth); /* hold for commit callback */ @@ -904,26 +1164,33 @@ static int osp_send_update_req(const struct lu_env *env, * status, in case the other target is being recoveried * at the same time, and if we wait here for the import * to be recoveryed, it might cause deadlock */ - top_device = osp->opd_dt_dev.dd_lu_dev.ld_site->ls_top_dev; - if (top_device->ld_obd->obd_recovering) - req->rq_allow_replay = 1; - - osp_get_rpc_lock(osp); + osp_set_req_replay(osp, req); + + /* Because this req will be synchronus, i.e. it will be called + * in the same thread, so it will be safe to use current + * env */ + args->oaua_update_env = env; + if (osp->opd_connect_mdt) + osp_get_rpc_lock(osp); rc = ptlrpc_queue_wait(req); - osp_put_rpc_lock(osp); - if ((rc == -ENOMEM && req->rq_set == NULL) || + if (osp->opd_connect_mdt) + osp_put_rpc_lock(osp); + + /* We use rq_queued_time to distinguish between local + * and remote -ENOMEM. */ + if ((rc == -ENOMEM && req->rq_queued_time == 0) || (req->rq_transno == 0 && !req->rq_committed)) { if (args->oaua_update != NULL) { /* If osp_update_interpret is not being called, * release the osp_thandle */ args->oaua_update = NULL; - osp_thandle_put(oth); + osp_thandle_put(env, oth); } req->rq_cb_data = NULL; rc = rc == 0 ? req->rq_status : rc; osp_trans_callback(env, oth, rc); - osp_thandle_put(oth); + osp_thandle_put(env, oth); GOTO(out, rc); } } @@ -938,7 +1205,7 @@ out: * Get local thandle for osp_thandle * * Get the local OSD thandle from the OSP thandle. Currently, there - * are a few OSP API (osp_object_create() and osp_sync_add()) needs + * are a few OSP API (osp_create() and osp_sync_add()) needs * to update the object on local OSD device. * * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then @@ -986,16 +1253,17 @@ struct thandle *osp_get_storage_thandle(const struct lu_env *env, /** * Set version for the transaction * - * Set the version for the transaction, then the osp RPC will be - * sent in the order of version, i.e. the transaction with lower - * version will be sent first. + * Set the version for the transaction and add the request to + * the sending list, then after transaction stop, the request + * will be sent in the order of version by the sending thread. * * \param [in] oth osp thandle to be set version. * * \retval 0 if set version succeeds * negative errno if set version fails. */ -int osp_check_and_set_rpc_version(struct osp_thandle *oth) +int osp_check_and_set_rpc_version(struct osp_thandle *oth, + struct osp_object *obj) { struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev); struct osp_updates *ou = osp->opd_update; @@ -1003,15 +1271,31 @@ int osp_check_and_set_rpc_version(struct osp_thandle *oth) if (ou == NULL) return -EIO; - if (oth->ot_version != 0) + if (oth->ot_our->our_version != 0) return 0; spin_lock(&ou->ou_lock); - oth->ot_version = ou->ou_version++; + spin_lock(&oth->ot_our->our_list_lock); + if (obj->opo_stale) { + spin_unlock(&oth->ot_our->our_list_lock); + spin_unlock(&ou->ou_lock); + return -ESTALE; + } + + /* Assign the version and add it to the sending list */ + osp_thandle_get(oth); + oth->ot_our->our_version = ou->ou_version++; + oth->ot_our->our_generation = ou->ou_generation; + list_add_tail(&oth->ot_our->our_list, + &osp->opd_update->ou_list); + oth->ot_our->our_req_ready = 0; + spin_unlock(&oth->ot_our->our_list_lock); spin_unlock(&ou->ou_lock); - CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n", - osp->opd_obd->obd_name, ou->ou_version, oth, oth->ot_version); + LASSERT(oth->ot_super.th_wait_submit == 1); + CDEBUG(D_INFO, "%s: version %llu gen %llu oth:version %p:%llu\n", + osp->opd_obd->obd_name, ou->ou_version, ou->ou_generation, oth, + oth->ot_our->our_version); return 0; } @@ -1039,38 +1323,96 @@ osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp) spin_lock(&ou->ou_lock); list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) { LASSERT(our->our_th != NULL); - CDEBUG(D_INFO, "our %p version "LPU64" rpc_version "LPU64"\n", - our, our->our_th->ot_version, ou->ou_rpc_version); - if (our->our_th->ot_version == 0) { - list_del_init(&our->our_list); - *ourp = our; - got_req = true; - break; - } - + CDEBUG(D_HA, "ou %p version %llu rpc_version %llu\n", + ou, our->our_version, ou->ou_rpc_version); + spin_lock(&our->our_list_lock); /* Find next osp_update_request in the list */ - if (our->our_th->ot_version == ou->ou_rpc_version) { + if (our->our_version == ou->ou_rpc_version && + our->our_req_ready) { list_del_init(&our->our_list); + spin_unlock(&our->our_list_lock); *ourp = our; got_req = true; break; } + spin_unlock(&our->our_list_lock); } spin_unlock(&ou->ou_lock); return got_req; } -static void osp_update_rpc_version(struct osp_updates *ou, - struct osp_thandle *oth) +/** + * Invalidate update request + * + * Invalidate update request in the OSP sending list, so all of + * requests in the sending list will return error, which happens + * when it finds one update (with writing llog) requests fails or + * the OSP is evicted by remote target. see osp_send_update_thread(). + * + * \param[in] osp OSP device whose update requests will be + * invalidated. + **/ +void osp_invalidate_request(struct osp_device *osp) { - if (oth->ot_version == 0) + struct lu_env env; + struct osp_updates *ou = osp->opd_update; + struct osp_update_request *our; + struct osp_update_request *tmp; + LIST_HEAD(list); + int rc; + ENTRY; + + if (ou == NULL) return; - LASSERT(oth->ot_version == ou->ou_rpc_version); + rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags); + if (rc < 0) { + CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name, + rc); + + spin_lock(&ou->ou_lock); + ou->ou_generation++; + spin_unlock(&ou->ou_lock); + + return; + } + spin_lock(&ou->ou_lock); - ou->ou_rpc_version++; + /* invalidate all of request in the sending list */ + list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) { + spin_lock(&our->our_list_lock); + if (our->our_req_ready) + list_move(&our->our_list, &list); + else + list_del_init(&our->our_list); + + if (our->our_th->ot_super.th_result == 0) + our->our_th->ot_super.th_result = -EIO; + + if (our->our_version >= ou->ou_rpc_version) + ou->ou_rpc_version = our->our_version + 1; + spin_unlock(&our->our_list_lock); + + CDEBUG(D_HA, "%s invalidate our %p\n", osp->opd_obd->obd_name, + our); + } + + /* Increase the generation, then the update request with old generation + * will fail with -EIO. */ + ou->ou_generation++; spin_unlock(&ou->ou_lock); + + /* invalidate all of request in the sending list */ + list_for_each_entry_safe(our, tmp, &list, our_list) { + spin_lock(&our->our_list_lock); + list_del_init(&our->our_list); + spin_unlock(&our->our_list_lock); + osp_trans_callback(&env, our->our_th, + our->our_th->ot_super.th_result); + osp_thandle_put(&env, our->our_th); + } + lu_env_fini(&env); } /** @@ -1090,7 +1432,6 @@ int osp_send_update_thread(void *arg) { struct lu_env env; struct osp_device *osp = arg; - struct l_wait_info lwi = { 0 }; struct osp_updates *ou = osp->opd_update; struct ptlrpc_thread *thread = &osp->opd_update_thread; struct osp_update_request *our = NULL; @@ -1109,34 +1450,46 @@ int osp_send_update_thread(void *arg) wake_up(&thread->t_ctl_waitq); while (1) { our = NULL; - l_wait_event(ou->ou_waitq, - !osp_send_update_thread_running(osp) || - osp_get_next_request(ou, &our), - &lwi); + wait_event_idle(ou->ou_waitq, + !osp_send_update_thread_running(osp) || + osp_get_next_request(ou, &our)); if (!osp_send_update_thread_running(osp)) { - if (our != NULL && our->our_th != NULL) { + if (our != NULL) { osp_trans_callback(&env, our->our_th, -EINTR); - osp_thandle_put(our->our_th); + osp_thandle_put(&env, our->our_th); } break; } - if (our->our_req_sent == 0) { - if (our->our_th != NULL && - our->our_th->ot_super.th_result != 0) - osp_trans_callback(&env, our->our_th, - our->our_th->ot_super.th_result); - else - rc = osp_send_update_req(&env, osp, our); + LASSERT(our->our_th != NULL); + if (our->our_th->ot_super.th_result != 0) { + osp_trans_callback(&env, our->our_th, + our->our_th->ot_super.th_result); + rc = our->our_th->ot_super.th_result; + } else if (ou->ou_generation != our->our_generation || + OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) { + rc = -EIO; + osp_trans_callback(&env, our->our_th, rc); + } else { + rc = osp_send_update_req(&env, osp, our); } - if (our->our_th != NULL) { - /* Update the rpc version */ - osp_update_rpc_version(ou, our->our_th); - /* Balanced for thandle_get in osp_trans_trigger() */ - osp_thandle_put(our->our_th); - } + /* Update the rpc version */ + spin_lock(&ou->ou_lock); + if (our->our_version == ou->ou_rpc_version) + ou->ou_rpc_version++; + spin_unlock(&ou->ou_lock); + + /* If one update request fails, let's fail all of the requests + * in the sending list, because the request in the sending + * list are dependent on either other, continue sending these + * request might cause llog or filesystem corruption */ + if (rc < 0) + osp_invalidate_request(osp); + + /* Balanced for thandle_get in osp_check_and_set_rpc_version */ + osp_thandle_put(&env, our->our_th); } thread->t_flags = SVC_STOPPED; @@ -1147,36 +1500,6 @@ int osp_send_update_thread(void *arg) } /** - * Trigger the request for remote updates. - * - * Add the request to the sending list, and wake up osp update - * sending thread. - * - * \param[in] env pointer to the thread context - * \param[in] osp pointer to the OSP device - * \param[in] oth pointer to the transaction handler - * - */ -static void osp_trans_trigger(const struct lu_env *env, - struct osp_device *osp, - struct osp_thandle *oth) -{ - - CDEBUG(D_INFO, "%s: add oth %p with version "LPU64"\n", - osp->opd_obd->obd_name, oth, oth->ot_version); - - LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC); - osp_thandle_get(oth); - LASSERT(oth->ot_our != NULL); - spin_lock(&osp->opd_update->ou_lock); - list_add_tail(&oth->ot_our->our_list, - &osp->opd_update->ou_list); - spin_unlock(&osp->opd_update->ou_lock); - - wake_up(&osp->opd_update->ou_waitq); -} - -/** * The OSP layer dt_device_operations::dt_trans_start() interface * to start the transaction. * @@ -1196,6 +1519,8 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt, { struct osp_thandle *oth = thandle_to_osp_thandle(th); + if (oth->ot_super.th_sync) + oth->ot_our->our_flags |= UPDATE_FL_SYNC; /* For remote thandle, if there are local thandle, start it here*/ if (is_only_remote_trans(th) && oth->ot_storage_th != NULL) return dt_trans_start(env, oth->ot_storage_th->th_dev, @@ -1208,7 +1533,7 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt, * to stop the transaction. * * If the transaction is a remote transaction, related remote - * updates will be triggered here via osp_trans_trigger(). + * updates will be triggered at the end of this function. * * For synchronous mode update or any failed update, the request * will be destroyed explicitly when the osp_trans_stop(). @@ -1239,13 +1564,13 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, oth->ot_storage_th = NULL; } - if (our == NULL || our->our_req == NULL || - our->our_req->ourq_count == 0) { + if (our == NULL || list_empty(&our->our_req_list)) { osp_trans_callback(env, oth, th->th_result); GOTO(out, rc = th->th_result); } if (!osp->opd_connect_mdt) { + osp_trans_callback(env, oth, th->th_result); rc = osp_send_update_req(env, osp, oth->ot_our); GOTO(out, rc); } @@ -1256,19 +1581,27 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, GOTO(out, rc = -EIO); } - if (th->th_sync) { - /* if th_sync is set, then it needs to be sent - * right away. Note: even thought the RPC has been - * sent, it still needs to be added to the sending - * list (see osp_trans_trigger()), so ou_rpc_version - * can be updated correctly. */ + CDEBUG(D_HA, "%s: add oth %p with version %llu\n", + osp->opd_obd->obd_name, oth, our->our_version); + + LASSERT(our->our_req_ready == 0); + spin_lock(&our->our_list_lock); + if (likely(!list_empty(&our->our_list))) { + /* notify sending thread */ + our->our_req_ready = 1; + wake_up(&osp->opd_update->ou_waitq); + spin_unlock(&our->our_list_lock); + } else if (th->th_result == 0) { + /* if the request does not needs to be serialized, + * read-only request etc, let's send it right away */ + spin_unlock(&our->our_list_lock); rc = osp_send_update_req(env, osp, our); - our->our_req_sent = 1; + } else { + spin_unlock(&our->our_list_lock); + osp_trans_callback(env, oth, th->th_result); } - - osp_trans_trigger(env, osp, oth); out: - osp_thandle_put(oth); + osp_thandle_put(env, oth); RETURN(rc); }