From: Andriy Skulysh Date: Thu, 9 Aug 2018 14:58:17 +0000 (+0300) Subject: LU-12828 ldlm: FLOCK request can be processed twice X-Git-Tag: 2.13.51~116 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=85a12c6c8d7a6d40fc81c73f9b900475b58e3e98 LU-12828 ldlm: FLOCK request can be processed twice Original request can be processed after resend request, so it can create a lock on MDT without client lock or unlock other lock. Make flock enqueue to use modify RPC slot. Change-Id: Icfee202fe2e389beda1116f78f8b933c7ea182fb Cray-bug-id: LUS-5739 Signed-off-by: Andriy Skulysh Signed-off-by: Vitaly Fertman Reviewed-by: Alexander Boyko Reviewed-by: Andrew Perepechko Reviewed-on: https://review.whamcloud.com/36340 Tested-by: jenkins Reviewed-by: Alexandr Boyko Tested-by: Maloo Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 6a8afdd..c3fe060 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -220,6 +220,8 @@ struct lu_target { #define LUT_REPLY_SLOTS_PER_CHUNK (1<<20) #define LUT_REPLY_SLOTS_MAX_CHUNKS 16 +#define TRD_INDEX_MEMORY -1 + /** * Target reply data */ @@ -494,10 +496,10 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg, int sync); int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt); int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd); -int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt, - struct tg_export_data *ted, struct tg_reply_data *trd, - struct ptlrpc_request *req, - struct thandle *th, bool update_lrd_file); +int tgt_mk_reply_data(const struct lu_env *env, struct lu_target *tgt, + struct tg_export_data *ted, struct ptlrpc_request *req, + __u64 opdata, struct thandle *th, bool write_update, + __u64 transno); struct tg_reply_data *tgt_lookup_reply_by_xid(struct tg_export_data *ted, __u64 xid); int tgt_tunables_init(struct lu_target *lut); diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 08d9093..181518d 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -1200,6 +1200,7 @@ struct ldlm_enqueue_info { void *ei_namespace; /** lock namespace **/ u64 ei_inodebits; /** lock inode bits **/ unsigned int ei_enq_slave:1; /** whether enqueue slave stripes */ + unsigned int ei_enq_slot:1; /** whether acquire rpc slot */ }; #define ei_res_id ei_cb_gl diff --git a/lustre/include/lustre_mdc.h b/lustre/include/lustre_mdc.h index fef3fa0..3883993 100644 --- a/lustre/include/lustre_mdc.h +++ b/lustre/include/lustre_mdc.h @@ -63,32 +63,6 @@ struct obd_export; struct ptlrpc_request; struct obd_device; -static inline void mdc_get_mod_rpc_slot(struct ptlrpc_request *req, - struct lookup_intent *it) -{ - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - __u32 opc; - __u16 tag; - - opc = lustre_msg_get_opc(req->rq_reqmsg); - tag = obd_get_mod_rpc_slot(cli, opc, it); - lustre_msg_set_tag(req->rq_reqmsg, tag); - ptlrpc_reassign_next_xid(req); -} - -static inline void mdc_put_mod_rpc_slot(struct ptlrpc_request *req, - struct lookup_intent *it) -{ - struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - __u32 opc; - __u16 tag; - - opc = lustre_msg_get_opc(req->rq_reqmsg); - tag = lustre_msg_get_tag(req->rq_reqmsg); - obd_put_mod_rpc_slot(cli, opc, it, tag); -} - - /** * Update the maximum possible easize. * diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 8bee050..a758cb1 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -2167,7 +2167,8 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, __u64 ptlrpc_next_xid(void); __u64 ptlrpc_sample_next_xid(void); __u64 ptlrpc_req_xid(struct ptlrpc_request *request); -void ptlrpc_reassign_next_xid(struct ptlrpc_request *req); +void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req); +void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req); /* Set of routines to run a function in ptlrpcd context */ void *ptlrpcd_alloc_work(struct obd_import *imp, diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 11ddb0e..ad0f5e8 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -129,10 +129,8 @@ __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli); int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max); int obd_mod_rpc_stats_seq_show(struct client_obd *cli, struct seq_file *seq); -__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc, - struct lookup_intent *it); -void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, - struct lookup_intent *it, __u16 tag); +__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc); +void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag); struct llog_handle; struct llog_rec_hdr; diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 66304f1..3bdf02b 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1759,6 +1759,9 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env, int local = ns_is_client(ldlm_res_to_ns(res)); enum ldlm_error rc = ELDLM_OK; struct ldlm_interval *node = NULL; +#ifdef HAVE_SERVER_SUPPORT + bool reconstruct = false; +#endif ENTRY; /* policies are not executed on the client or during replay */ @@ -1814,6 +1817,19 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env, if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT) OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS); +#ifdef HAVE_SERVER_SUPPORT + reconstruct = !local && res->lr_type == LDLM_FLOCK && + !(*flags & LDLM_FL_TEST_LOCK); + if (reconstruct) { + rc = req_can_reconstruct(cookie, NULL); + if (rc != 0) { + if (rc == 1) + rc = 0; + RETURN(rc); + } + } +#endif + lock_res_and_lock(lock); if (local && ldlm_is_granted(lock)) { /* The server returned a blocked lock, but it was granted @@ -1887,6 +1903,16 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env, out: unlock_res_and_lock(lock); + +#ifdef HAVE_SERVER_SUPPORT + if (reconstruct) { + struct ptlrpc_request *req = cookie; + + tgt_mk_reply_data(NULL, NULL, + &req->rq_export->exp_target_data, + req, 0, NULL, false, 0); + } +#endif if (node) OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node)); return rc; diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index b337009..16ad0e7 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1349,18 +1349,11 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, lock->l_req_extent = lock->l_policy_data.l_extent; existing_lock: - if (flags & LDLM_FL_HAS_INTENT) { - /* - * In this case, the reply buffer is allocated deep in - * local_lock_enqueue by the policy function. - */ - cookie = req; - } else { - /* - * based on the assumption that lvb size never changes during + cookie = req; + if (!(flags & LDLM_FL_HAS_INTENT)) { + /* based on the assumption that lvb size never changes during * resource life time otherwise it need resource->lr_lock's - * protection - */ + * protection */ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, ldlm_lvbo_size(lock)); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index fa05c94..ac40691 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -583,6 +583,11 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns, } } +static bool ldlm_request_slot_needed(enum ldlm_type type) +{ + return type == LDLM_FLOCK || type == LDLM_IBITS; +} + /** * Finishing portion of client lock enqueue code. * @@ -603,6 +608,11 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ENTRY; + if (ldlm_request_slot_needed(type)) + obd_put_request_slot(&req->rq_import->imp_obd->u.cli); + + ptlrpc_put_mod_rpc_slot(req); + if (req && req->rq_svc_thread) env = req->rq_svc_thread->t_env; @@ -1072,6 +1082,24 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LDLM_GLIMPSE_ENQUEUE); } + /* It is important to obtain modify RPC slot first (if applicable), so + * that threads that are waiting for a modify RPC slot are not polluting + * our rpcs in flight counter. */ + + if (einfo->ei_enq_slot) + ptlrpc_get_mod_rpc_slot(req); + + if (ldlm_request_slot_needed(einfo->ei_type)) { + rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli); + if (rc) { + if (einfo->ei_enq_slot) + ptlrpc_put_mod_rpc_slot(req); + failed_lock_cleanup(ns, lock, einfo->ei_mode); + LDLM_LOCK_RELEASE(lock); + GOTO(out, rc); + } + } + if (async) { LASSERT(reqp != NULL); RETURN(0); @@ -1094,6 +1122,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, else rc = err; +out: if (!req_passed_in && req != NULL) { ptlrpc_req_finished(req); if (reqp) diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 7923428..7bceb53 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -860,6 +860,16 @@ out_lock: RETURN(rc); } +static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it) +{ + if (it != NULL && + (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP || + it->it_op == IT_READDIR || + (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE)))) + return true; + return false; +} + /* We always reserve enough space in the reply packet for a stripe MD, because * we don't know in advance the file type. */ static int mdc_enqueue_base(struct obd_export *exp, @@ -871,7 +881,7 @@ static int mdc_enqueue_base(struct obd_export *exp, __u64 extra_lock_flags) { struct obd_device *obddev = class_exp2obd(exp); - struct ptlrpc_request *req = NULL; + struct ptlrpc_request *req; __u64 flags, saved_flags = extra_lock_flags; struct ldlm_res_id res_id; static const union ldlm_policy_data lookup_policy = { @@ -922,6 +932,7 @@ resend: LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n", einfo->ei_type); res_id.name[3] = LDLM_FLOCK; + req = ldlm_enqueue_pack(exp, 0); } else if (it->it_op & IT_OPEN) { req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize); } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { @@ -949,20 +960,7 @@ resend: req->rq_sent = ktime_get_real_seconds() + resends; } - /* It is important to obtain modify RPC slot first (if applicable), so - * that threads that are waiting for a modify RPC slot are not polluting - * our rpcs in flight counter. - * We do not do flock request limiting, though */ - if (it) { - mdc_get_mod_rpc_slot(req, it); - rc = obd_get_request_slot(&obddev->u.cli); - if (rc != 0) { - mdc_put_mod_rpc_slot(req, it); - mdc_clear_replay_flag(req, 0); - ptlrpc_req_finished(req); - RETURN(rc); - } - } + einfo->ei_enq_slot = !mdc_skip_mod_rpc_slot(it); /* With Data-on-MDT the glimpse callback is needed too. * It is set here in advance but not in mdc_finish_enqueue() @@ -973,6 +971,7 @@ resend: rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL, 0, lvb_type, lockh, 0); + if (!it) { /* For flock requests we immediatelly return without further delay and let caller deal with the rest, since rest of @@ -986,12 +985,10 @@ resend: (einfo->ei_type == LDLM_FLOCK) && (einfo->ei_mode == LCK_NL)) goto resend; + ptlrpc_req_finished(req); RETURN(rc); } - obd_put_request_slot(&obddev->u.cli); - mdc_put_mod_rpc_slot(req, it); - if (rc < 0) { CDEBUG(D_INFO, "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n", @@ -1335,19 +1332,15 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env, struct obd_export *exp = ga->ga_exp; struct md_enqueue_info *minfo = ga->ga_minfo; struct ldlm_enqueue_info *einfo = &minfo->mi_einfo; - struct lookup_intent *it; - struct lustre_handle *lockh; - struct obd_device *obddev; - struct ldlm_reply *lockrep; - __u64 flags = LDLM_FL_HAS_INTENT; + struct lookup_intent *it; + struct lustre_handle *lockh; + struct ldlm_reply *lockrep; + __u64 flags = LDLM_FL_HAS_INTENT; ENTRY; it = &minfo->mi_it; lockh = &minfo->mi_lockh; - obddev = class_exp2obd(exp); - - obd_put_request_slot(&obddev->u.cli); if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE)) rc = -ETIMEDOUT; @@ -1384,7 +1377,6 @@ int mdc_intent_getattr_async(struct obd_export *exp, struct lookup_intent *it = &minfo->mi_it; struct ptlrpc_request *req; struct mdc_getattr_args *ga; - struct obd_device *obddev = class_exp2obd(exp); struct ldlm_res_id res_id; union ldlm_policy_data policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP | @@ -1405,12 +1397,6 @@ int mdc_intent_getattr_async(struct obd_export *exp, if (IS_ERR(req)) RETURN(PTR_ERR(req)); - rc = obd_get_request_slot(&obddev->u.cli); - if (rc != 0) { - ptlrpc_req_finished(req); - RETURN(rc); - } - /* With Data-on-MDT the glimpse callback is needed too. * It is set here in advance but not in mdc_finish_enqueue() * to avoid possible races. It is safe to have glimpse handler @@ -1421,7 +1407,6 @@ int mdc_intent_getattr_async(struct obd_export *exp, rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy, &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1); if (rc < 0) { - obd_put_request_slot(&obddev->u.cli); ptlrpc_req_finished(req); RETURN(rc); } diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index cc48828..b733474 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -46,9 +46,9 @@ static int mdc_reint(struct ptlrpc_request *request, int level) request->rq_send_state = level; - mdc_get_mod_rpc_slot(request, NULL); + ptlrpc_get_mod_rpc_slot(request); rc = ptlrpc_queue_wait(request); - mdc_put_mod_rpc_slot(request, NULL); + ptlrpc_put_mod_rpc_slot(request); if (rc) CDEBUG(D_INFO, "error in handling %d\n", rc); else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) { diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 8b3eefc..a46b44b 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -413,12 +413,12 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, /* make rpc */ if (opcode == MDS_REINT) - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); if (opcode == MDS_REINT) - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); if (rc) ptlrpc_req_finished(req); @@ -980,9 +980,9 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); if (req->rq_repmsg == NULL) { CDEBUG(D_RPCTRACE, "request %p failed to send: rc = %d\n", req, @@ -1757,9 +1757,9 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); GOTO(out, rc); out: @@ -1960,9 +1960,9 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); GOTO(out, rc); out: @@ -2020,9 +2020,9 @@ static int mdc_ioc_hsm_request(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); GOTO(out, rc); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 436e5c7..af218a1 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -2286,15 +2286,6 @@ static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli, return avail; } -static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it) -{ - if (it != NULL && - (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP || - it->it_op == IT_READDIR || - (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE)))) - return true; - return false; -} /* Get a modify RPC slot from the obd client @cli according * to the kind of operation @opc that is going to be sent @@ -2304,18 +2295,11 @@ static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it) * Returns the tag to be set in the request message. Tag 0 * is reserved for non-modifying requests. */ -__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc, - struct lookup_intent *it) +__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc) { bool close_req = false; __u16 i, max; - /* read-only metadata RPCs don't consume a slot on MDT - * for reply reconstruction - */ - if (obd_skip_mod_rpc_slot(it)) - return 0; - if (opc == MDS_CLOSE) close_req = true; @@ -2358,15 +2342,13 @@ __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc, EXPORT_SYMBOL(obd_get_mod_rpc_slot); /* Put a modify RPC slot from the obd client @cli according - * to the kind of operation @opc that has been sent and the - * intent @it of the operation if it applies. + * to the kind of operation @opc that has been sent. */ -void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, - struct lookup_intent *it, __u16 tag) +void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag) { bool close_req = false; - if (obd_skip_mod_rpc_slot(it)) + if (tag == 0) return; if (opc == MDS_CLOSE) diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index fbe09e9..94bae92 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -737,7 +737,7 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req) static atomic64_t ptlrpc_last_xid; -void ptlrpc_reassign_next_xid(struct ptlrpc_request *req) +static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req) { spin_lock(&req->rq_import->imp_lock); list_del_init(&req->rq_unreplied_list); @@ -745,7 +745,32 @@ void ptlrpc_reassign_next_xid(struct ptlrpc_request *req) spin_unlock(&req->rq_import->imp_lock); DEBUG_REQ(D_RPCTRACE, req, "reassign xid"); } -EXPORT_SYMBOL(ptlrpc_reassign_next_xid); + +void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 opc; + __u16 tag; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + tag = obd_get_mod_rpc_slot(cli, opc); + lustre_msg_set_tag(req->rq_reqmsg, tag); + ptlrpc_reassign_next_xid(req); +} +EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot); + +void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req) +{ + __u16 tag = lustre_msg_get_tag(req->rq_reqmsg); + + if (tag != 0) { + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); + + obd_put_mod_rpc_slot(cli, opc, tag); + } +} +EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot); int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, __u32 version, int opcode, char **bufs, diff --git a/lustre/target/tgt_lastrcvd.c b/lustre/target/tgt_lastrcvd.c index 99a39c3..398a60a 100644 --- a/lustre/target/tgt_lastrcvd.c +++ b/lustre/target/tgt_lastrcvd.c @@ -331,7 +331,7 @@ static void tgt_free_reply_data(struct lu_target *lut, list_del(&trd->trd_list); ted->ted_reply_cnt--; - if (lut != NULL) + if (lut != NULL && trd->trd_index != TRD_INDEX_MEMORY) tgt_clear_reply_slot(lut, trd->trd_index); OBD_FREE_PTR(trd); } @@ -1166,7 +1166,7 @@ static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag) } } -int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt, +static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt, struct tg_export_data *ted, struct tg_reply_data *trd, struct ptlrpc_request *req, struct thandle *th, bool update_lrd_file) @@ -1181,28 +1181,33 @@ int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt, ted->ted_lcd->lcd_last_transno = lrd->lrd_transno; mutex_unlock(&ted->ted_lcd_lock); - /* find a empty slot */ - i = tgt_find_free_reply_slot(tgt); - if (unlikely(i < 0)) { - CERROR("%s: couldn't find a slot for reply data: " - "rc = %d\n", tgt_name(tgt), i); - RETURN(i); - } - trd->trd_index = i; + if (tgt != NULL) { + /* find a empty slot */ + i = tgt_find_free_reply_slot(tgt); + if (unlikely(i < 0)) { + CERROR("%s: couldn't find a slot for reply data: " + "rc = %d\n", tgt_name(tgt), i); + RETURN(i); + } + trd->trd_index = i; - if (update_lrd_file) { - loff_t off; - int rc; + if (update_lrd_file) { + loff_t off; + int rc; - /* write reply data to disk */ - off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i; - rc = tgt_reply_data_write(env, tgt, lrd, off, th); - if (unlikely(rc != 0)) { - CERROR("%s: can't update %s file: rc = %d\n", - tgt_name(tgt), REPLY_DATA, rc); - RETURN(rc); + /* write reply data to disk */ + off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i; + rc = tgt_reply_data_write(env, tgt, lrd, off, th); + if (unlikely(rc != 0)) { + CERROR("%s: can't update %s file: rc = %d\n", + tgt_name(tgt), REPLY_DATA, rc); + RETURN(rc); + } } + } else { + trd->trd_index = TRD_INDEX_MEMORY; } + /* add reply data to target export's reply list */ mutex_lock(&ted->ted_lcd_lock); if (req != NULL) { @@ -1231,7 +1236,64 @@ int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt, RETURN(0); } -EXPORT_SYMBOL(tgt_add_reply_data); + +int tgt_mk_reply_data(const struct lu_env *env, + struct lu_target *tgt, + struct tg_export_data *ted, + struct ptlrpc_request *req, + __u64 opdata, + struct thandle *th, + bool write_update, + __u64 transno) +{ + struct tg_reply_data *trd; + struct lsd_reply_data *lrd; + __u64 *pre_versions = NULL; + int rc; + + OBD_ALLOC_PTR(trd); + if (unlikely(trd == NULL)) + RETURN(-ENOMEM); + + /* fill reply data information */ + lrd = &trd->trd_reply; + lrd->lrd_transno = transno; + if (req != NULL) { + lrd->lrd_xid = req->rq_xid; + trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg); + lrd->lrd_client_gen = ted->ted_lcd->lcd_generation; + if (write_update) { + pre_versions = lustre_msg_get_versions(req->rq_repmsg); + lrd->lrd_result = th->th_result; + } + } else { + struct tgt_session_info *tsi; + + LASSERT(env != NULL); + tsi = tgt_ses_info(env); + LASSERT(tsi->tsi_xid != 0); + + lrd->lrd_xid = tsi->tsi_xid; + lrd->lrd_result = tsi->tsi_result; + lrd->lrd_client_gen = tsi->tsi_client_gen; + } + + lrd->lrd_data = opdata; + if (pre_versions) { + trd->trd_pre_versions[0] = pre_versions[0]; + trd->trd_pre_versions[1] = pre_versions[1]; + trd->trd_pre_versions[2] = pre_versions[2]; + trd->trd_pre_versions[3] = pre_versions[3]; + } + + rc = tgt_add_reply_data(env, tgt, ted, trd, req, + th, write_update); + if (rc < 0) + OBD_FREE_PTR(trd); + return rc; + +} +EXPORT_SYMBOL(tgt_mk_reply_data); /* * last_rcvd & last_committed update callbacks @@ -1322,48 +1384,8 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt, /* Target that supports multiple reply data */ if (tgt_is_multimodrpcs_client(exp)) { - struct tg_reply_data *trd; - struct lsd_reply_data *lrd; - __u64 *pre_versions; - bool write_update; - - OBD_ALLOC_PTR(trd); - if (unlikely(trd == NULL)) - RETURN(-ENOMEM); - - /* fill reply data information */ - lrd = &trd->trd_reply; - lrd->lrd_transno = tti->tti_transno; - if (req != NULL) { - lrd->lrd_xid = req->rq_xid; - trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg); - pre_versions = lustre_msg_get_versions(req->rq_repmsg); - lrd->lrd_result = th->th_result; - lrd->lrd_client_gen = ted->ted_lcd->lcd_generation; - write_update = true; - } else { - LASSERT(tsi->tsi_xid != 0); - lrd->lrd_xid = tsi->tsi_xid; - lrd->lrd_result = tsi->tsi_result; - lrd->lrd_client_gen = tsi->tsi_client_gen; - trd->trd_tag = 0; - pre_versions = NULL; - write_update = false; - } - - lrd->lrd_data = opdata; - if (pre_versions) { - trd->trd_pre_versions[0] = pre_versions[0]; - trd->trd_pre_versions[1] = pre_versions[1]; - trd->trd_pre_versions[2] = pre_versions[2]; - trd->trd_pre_versions[3] = pre_versions[3]; - } - - rc = tgt_add_reply_data(env, tgt, ted, trd, req, - th, write_update); - if (rc < 0) - OBD_FREE_PTR(trd); - return rc; + return tgt_mk_reply_data(env, tgt, ted, req, opdata, th, + !!(req != NULL), tti->tti_transno); } /* Enough for update replay, let's return */