From: Gregoire Pichon Date: Tue, 31 Mar 2015 12:47:53 +0000 (+0200) Subject: LU-5319 mdt: support multiple modify RCPs in parallel X-Git-Tag: 2.7.56~12 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=refs%2Fchanges%2F60%2F14860%2F13;p=fs%2Flustre-release.git LU-5319 mdt: support multiple modify RCPs in parallel This patch implements the server part of the feature that allows support of multiple modify RPCs in parallel on MDT targets. Each target export is able to store several in-memory reply data so that it can reconstruct several requests issued by the client in parallel. Additionally, a new internal file REPLY_DATA is created on the target to store on-disk reply data. The reply data slots in that file are managed by a bitmap (lut_reply_bitmap) and can be used to store reply data of any client. When target recovers, the on-disk reply data is used to restore the in-memory reply data and ensure reconstruction of committed operations. Signed-off-by: Alex Zhuravlev Signed-off-by: Gregoire Pichon Change-Id: I8f91666f5b0b4f7b9445a01c520d73f56d059ff3 Reviewed-on: http://review.whamcloud.com/14860 Tested-by: Jenkins Reviewed-by: Andreas Dilger Tested-by: Maloo Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 78800cf..4428e69 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -138,6 +138,35 @@ struct lu_target { spinlock_t lut_client_bitmap_lock; /** Bitmap of known clients */ unsigned long *lut_client_bitmap; + /* Number of clients supporting multiple modify RPCs + * recorded in the bitmap */ + atomic_t lut_num_clients; + /* Client generation to identify client slot reuse */ + atomic_t lut_client_generation; + /** reply_data file */ + struct dt_object *lut_reply_data; + /** Bitmap of used slots in the reply data file */ + unsigned long **lut_reply_bitmap; +}; + +/* number of slots in reply bitmap */ +#define LUT_REPLY_SLOTS_PER_CHUNK (1<<20) +#define LUT_REPLY_SLOTS_MAX_CHUNKS 16 + +/** + * Target reply data + */ +struct tg_reply_data { + /** chain of reply data anchored in tg_export_data */ + struct list_head trd_list; + /** copy of on-disk reply data */ + struct lsd_reply_data trd_reply; + /** versions for Version Based Recovery */ + __u64 trd_pre_versions[4]; + /** slot index in reply_data file */ + int trd_index; + /** tag the client used */ + __u16 trd_tag; }; extern struct lu_context_key tgt_session_key; @@ -300,6 +329,12 @@ static inline int req_is_replay(struct ptlrpc_request *req) return !!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY); } +static inline bool tgt_is_multimodrpcs_client(struct obd_export *exp) +{ + return exp_connect_flags(exp) & OBD_CONNECT_MULTIMODRPCS; +} + + /* target/tgt_handler.c */ int tgt_request_handle(struct ptlrpc_request *req); char *tgt_name(struct lu_target *tgt); @@ -350,6 +385,7 @@ void tgt_register_lfsck_in_notify(int (*notify)(const struct lu_env *, void tgt_register_lfsck_query(int (*query)(const struct lu_env *, struct dt_device *, struct lfsck_request *)); +bool req_can_reconstruct(struct ptlrpc_request *req, struct tg_reply_data *trd); extern struct tgt_handler tgt_lfsck_handlers[]; extern struct tgt_handler tgt_obd_handlers[]; @@ -394,6 +430,8 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg, int sync); int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg, loff_t off); +int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt); +bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd); /* target/update_trans.c */ int distribute_txn_init(const struct lu_env *env, diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index ebd526e..91ae410 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1417,7 +1417,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK | \ OBD_CONNECT_OPEN_BY_FID | \ OBD_CONNECT_DIR_STRIPE | \ - OBD_CONNECT_BULK_MBITS) + OBD_CONNECT_BULK_MBITS | \ + OBD_CONNECT_MULTIMODRPCS) #define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \ OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \ diff --git a/lustre/include/lustre_disk.h b/lustre/include/lustre_disk.h index b0dec84..ba40f2b 100644 --- a/lustre/include/lustre_disk.h +++ b/lustre/include/lustre_disk.h @@ -59,6 +59,7 @@ /** Persistent mount data are stored on the disk in this file. */ #define MOUNT_DATA_FILE MOUNT_CONFIGS_DIR"/"CONFIGS_FILE #define LAST_RCVD "last_rcvd" +#define REPLY_DATA "reply_data" #define LOV_OBJID "lov_objid" #define LOV_OBJSEQ "lov_objseq" #define HEALTH_CHECK "health_check" @@ -317,6 +318,8 @@ struct lustre_mount_data { #define OBD_INCOMPAT_LMM_VER 0x00000100 /** multiple OI files for MDT */ #define OBD_INCOMPAT_MULTI_OI 0x00000200 +/** multiple RPCs in flight */ +#define OBD_INCOMPAT_MULTI_RPCS 0x00000400 /* Data stored per server at the head of the last_rcvd file. In le32 order. This should be common to filter_internal.h, lustre_mds.h */ @@ -361,11 +364,35 @@ struct lsd_client_data { /* VBR: last versions */ __u64 lcd_pre_versions[4]; __u32 lcd_last_epoch; - /** orphans handling for delayed export rely on that */ - __u32 lcd_first_epoch; - __u8 lcd_padding[LR_CLIENT_SIZE - 128]; + /* generation counter of client slot in last_rcvd */ + __u32 lcd_generation; + __u8 lcd_padding[LR_CLIENT_SIZE - 128]; }; + +/* Data stored in each slot of the reply_data file. + * + * The lrd_client_gen field is assigned with lcd_generation value + * to allow identify which client the reply data belongs to. + */ +struct lsd_reply_data { + __u64 lrd_transno; /* transaction number */ + __u64 lrd_xid; /* transmission id */ + __u64 lrd_data; /* per-operation data */ + __u32 lrd_result; /* request result */ + __u32 lrd_client_gen; /* client generation */ +}; + +/* Header of the reply_data file */ +#define LRH_MAGIC 0xbdabda01 +struct lsd_reply_header { + __u32 lrh_magic; + __u32 lrh_header_size; + __u32 lrh_reply_size; + __u8 lrh_pad[sizeof(struct lsd_reply_data) - 12]; +}; + + /* bug20354: the lcd_uuid for export of clients may be wrong */ static inline void check_lcd(char *obd_name, int index, struct lsd_client_data *lcd) @@ -452,7 +479,7 @@ static inline void lcd_le_to_cpu(struct lsd_client_data *buf, lcd->lcd_pre_versions[2] = le64_to_cpu(buf->lcd_pre_versions[2]); lcd->lcd_pre_versions[3] = le64_to_cpu(buf->lcd_pre_versions[3]); lcd->lcd_last_epoch = le32_to_cpu(buf->lcd_last_epoch); - lcd->lcd_first_epoch = le32_to_cpu(buf->lcd_first_epoch); + lcd->lcd_generation = le32_to_cpu(buf->lcd_generation); } static inline void lcd_cpu_to_le(struct lsd_client_data *lcd, @@ -472,7 +499,7 @@ static inline void lcd_cpu_to_le(struct lsd_client_data *lcd, buf->lcd_pre_versions[2] = cpu_to_le64(lcd->lcd_pre_versions[2]); buf->lcd_pre_versions[3] = cpu_to_le64(lcd->lcd_pre_versions[3]); buf->lcd_last_epoch = cpu_to_le32(lcd->lcd_last_epoch); - buf->lcd_first_epoch = cpu_to_le32(lcd->lcd_first_epoch); + buf->lcd_generation = cpu_to_le32(lcd->lcd_generation); } static inline __u64 lcd_last_transno(struct lsd_client_data *lcd) diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index fe41b2b..d6e5c11 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -59,7 +59,8 @@ struct mdt_idmap_table; * Target-specific export data */ struct tg_export_data { - /** Protects led_lcd below */ + /** Protects ted_lcd, ted_reply_* and + * ted_release_* fields below */ struct mutex ted_lcd_lock; /** Per-client data for each export */ struct lsd_client_data *ted_lcd; @@ -71,6 +72,18 @@ struct tg_export_data { /** nodemap this export is a member of */ struct lu_nodemap *ted_nodemap; struct hlist_node ted_nodemap_member; + + /* Every reply data fields below are + * protected by ted_lcd_lock */ + /** List of reply data */ + struct list_head ted_reply_list; + int ted_reply_cnt; + /** Reply data with highest transno is retained */ + struct tg_reply_data *ted_reply_last; + /* Statistics */ + int ted_reply_max; /* high water mark */ + int ted_release_xid; + int ted_release_tag; }; /** @@ -182,6 +195,7 @@ struct obd_export { struct list_head exp_obd_chain; struct hlist_node exp_uuid_hash; /** uuid-export hash*/ struct hlist_node exp_nid_hash; /** nid-export hash */ + struct hlist_node exp_gen_hash; /** last_rcvd clt gen hash */ /** * All exports eligible for ping evictor are linked into a list * through this field in "most time since last request on this export" diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index e5ef46a..ead5bb9 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -225,10 +225,7 @@ enum local_oid { LFSCK_BOOKMARK_OID = 17UL, OTABLE_IT_OID = 18UL, OSD_LPF_OID = 19UL, - /* These two definitions are obsolete - * OFD_GROUP0_LAST_OID = 20UL, - * OFD_GROUP4K_LAST_OID = 20UL+4096, - */ + REPLY_DATA_OID = 21UL, OFD_LAST_GROUP_OID = 4117UL, LLOG_CATALOGS_OID = 4118UL, MGS_CONFIGS_OID = 4119UL, diff --git a/lustre/include/obd.h b/lustre/include/obd.h index fad6b03..ec275fa 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -548,6 +548,8 @@ struct obd_device { struct cfs_hash *obd_nid_hash; /* nid stats body */ struct cfs_hash *obd_nid_stats_hash; + /* client_generation-export hash body */ + struct cfs_hash *obd_gen_hash; struct list_head obd_nid_stats; atomic_t obd_refcount; struct list_head obd_exports; diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index e07e87d..a4d400a 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -96,6 +96,9 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define HASH_NID_STATS_BKT_BITS 5 #define HASH_NID_STATS_CUR_BITS 7 #define HASH_NID_STATS_MAX_BITS 12 +#define HASH_GEN_BKT_BITS 5 +#define HASH_GEN_CUR_BITS 7 +#define HASH_GEN_MAX_BITS 12 #define HASH_LQE_BKT_BITS 5 #define HASH_LQE_CUR_BITS 7 #define HASH_LQE_MAX_BITS 12 diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 1bb8a0e..a8b9c48 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -71,6 +71,10 @@ #include #include +static unsigned int max_mod_rpcs_per_client = 8; +CFS_MODULE_PARM(max_mod_rpcs_per_client, "i", uint, 0644, + "maximum number of modify RPCs in flight allowed per client"); + mdl_mode_t mdt_mdl_lock_modes[] = { [LCK_MINMODE] = MDL_MINMODE, [LCK_EX] = MDL_EX, @@ -1754,7 +1758,10 @@ static int mdt_reint_internal(struct mdt_thread_info *info, if (rc != 0) GOTO(out_ucred, rc = err_serious(rc)); - if (mdt_check_resent(info, mdt_reconstruct, lhc)) { + rc = mdt_check_resent(info, mdt_reconstruct, lhc); + if (rc < 0) { + GOTO(out_ucred, rc); + } else if (rc == 1) { DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt."); rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg); GOTO(out_ucred, rc); @@ -3019,12 +3026,12 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info, return; } - /* - * If the xid matches, then we know this is a resent request, and allow - * it. (It's probably an OPEN, for which we don't send a lock. - */ - if (req_xid_is_last(req)) - return; + /* + * If the xid matches, then we know this is a resent request, and allow + * it. (It's probably an OPEN, for which we don't send a lock. + */ + if (req_can_reconstruct(req, NULL)) + return; /* * This remote handle isn't enqueued, so we never received or processed @@ -4846,6 +4853,18 @@ static int mdt_connect_internal(struct obd_export *exp, data->ocd_max_easize = mdt->mdt_max_ea_size; + /* NB: Disregard the rule against updating + * exp_connect_data.ocd_connect_flags in this case, since + * tgt_client_new() needs to know if this is client supports + * multiple modify RPCs, and it is safe to expose this flag before + * connection processing completes. */ + if (data->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) { + data->ocd_maxmodrpcs = max_mod_rpcs_per_client; + spin_lock(&exp->exp_lock); + *exp_connect_flags_ptr(exp) |= OBD_CONNECT_MULTIMODRPCS; + spin_unlock(&exp->exp_lock); + } + return 0; } diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 639a9ef..eb1ad98 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -65,14 +65,6 @@ #include #include -/* check if request's xid is equal to last one or not*/ -static inline int req_xid_is_last(struct ptlrpc_request *req) -{ - struct lsd_client_data *lcd = req->rq_export->exp_target_data.ted_lcd; - return (req->rq_xid == lcd->lcd_last_xid || - req->rq_xid == lcd->lcd_last_close_xid); -} - struct mdt_object; /* file data for open files on MDS */ @@ -431,6 +423,7 @@ struct mdt_thread_info { /* should be enough to fit lustre_mdt_attrs */ char mti_xattr_buf[128]; struct ldlm_enqueue_info mti_einfo; + struct tg_reply_data *mti_reply_data; }; extern struct lu_context_key mdt_thread_key; @@ -776,7 +769,7 @@ __u32 mdt_identity_get_perm(struct md_identity *, __u32, lnet_nid_t); int mdt_pack_remote_perm(struct mdt_thread_info *, struct mdt_object *, void *); /* mdt/mdt_recovery.c */ -void mdt_req_from_lcd(struct ptlrpc_request *req, struct lsd_client_data *lcd); +__u64 mdt_req_from_lrd(struct ptlrpc_request *req, struct tg_reply_data *trd); /* mdt/mdt_hsm.c */ int mdt_hsm_state_get(struct tgt_session_info *tsi); @@ -911,18 +904,27 @@ static inline int mdt_check_resent(struct mdt_thread_info *info, mdt_reconstruct_t reconstruct, struct mdt_lock_handle *lhc) { - struct ptlrpc_request *req = mdt_info_req(info); - ENTRY; - - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) { - if (req_xid_is_last(req)) { - reconstruct(info, lhc); - RETURN(1); - } - DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")", - req->rq_export->exp_target_data.ted_lcd->lcd_last_xid); - } - RETURN(0); + struct ptlrpc_request *req = mdt_info_req(info); + int rc = 0; + ENTRY; + + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) { + OBD_ALLOC_PTR(info->mti_reply_data); + if (info->mti_reply_data == NULL) + RETURN(-ENOMEM); + + if (req_can_reconstruct(req, info->mti_reply_data)) { + reconstruct(info, lhc); + rc = 1; + } else { + DEBUG_REQ(D_HA, req, + "no reply data found for RESENT req"); + rc = 0; + } + OBD_FREE_PTR(info->mti_reply_data); + info->mti_reply_data = NULL; + } + RETURN(rc); } struct lu_ucred *mdt_ucred(const struct mdt_thread_info *info); diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 36d96d6..4af70ff 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -206,6 +206,19 @@ static void mdt_empty_transno(struct mdt_thread_info *info, int rc) if (lustre_msg_get_transno(req->rq_repmsg) != 0) RETURN_EXIT; + if (tgt_is_multimodrpcs_client(req->rq_export)) { + struct thandle *th; + + /* generate an empty transaction to get a transno + * and reply data */ + th = dt_trans_create(info->mti_env, mdt->mdt_bottom); + if (!IS_ERR(th)) { + rc = dt_trans_start(info->mti_env, mdt->mdt_bottom, th); + dt_trans_stop(info->mti_env, mdt->mdt_bottom, th); + } + RETURN_EXIT; + } + spin_lock(&mdt->mdt_lut.lut_translock); if (rc != 0) { if (info->mti_transno != 0) { @@ -614,8 +627,6 @@ void mdt_reconstruct_open(struct mdt_thread_info *info, struct mdt_device *mdt = info->mti_mdt; struct req_capsule *pill = info->mti_pill; struct ptlrpc_request *req = mdt_info_req(info); - struct tg_export_data *ted = &req->rq_export->exp_target_data; - struct lsd_client_data *lcd = ted->ted_lcd; struct md_attr *ma = &info->mti_attr; struct mdt_reint_record *rr = &info->mti_rr; __u64 flags = info->mti_spec.sp_cr_flags; @@ -624,17 +635,18 @@ void mdt_reconstruct_open(struct mdt_thread_info *info, struct mdt_object *child; struct mdt_body *repbody; int rc; - ENTRY; + __u64 opdata; + ENTRY; LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN); ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP); repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); ma->ma_need = MA_INODE | MA_HSM; - ma->ma_valid = 0; + ma->ma_valid = 0; - mdt_req_from_lcd(req, lcd); - mdt_set_disposition(info, ldlm_rep, lcd->lcd_last_data); + opdata = mdt_req_from_lrd(req, info->mti_reply_data); + mdt_set_disposition(info, ldlm_rep, opdata); CDEBUG(D_INODE, "This is reconstruct open: disp="LPX64", result=%d\n", ldlm_rep->lock_policy_res1, req->rq_status); diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index e162279..859c40d 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -177,50 +177,43 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req) spin_unlock(&exp->exp_lock); } -/** - * VBR: restore versions - */ -static void mdt_vbr_reconstruct(struct ptlrpc_request *req, - struct lsd_client_data *lcd) +__u64 mdt_req_from_lrd(struct ptlrpc_request *req, + struct tg_reply_data *trd) { - __u64 pre_versions[4] = {0}; - pre_versions[0] = lcd->lcd_pre_versions[0]; - pre_versions[1] = lcd->lcd_pre_versions[1]; - pre_versions[2] = lcd->lcd_pre_versions[2]; - pre_versions[3] = lcd->lcd_pre_versions[3]; - lustre_msg_set_versions(req->rq_repmsg, pre_versions); -} + struct lsd_reply_data *lrd; -void mdt_req_from_lcd(struct ptlrpc_request *req, struct lsd_client_data *lcd) -{ - DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d", - lcd->lcd_last_transno, lcd->lcd_last_result); - - if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) { - req->rq_transno = lcd->lcd_last_close_transno; - req->rq_status = lcd->lcd_last_close_result; - } else { - req->rq_transno = lcd->lcd_last_transno; - req->rq_status = lcd->lcd_last_result; - mdt_vbr_reconstruct(req, lcd); - } - if (req->rq_status != 0) - req->rq_transno = 0; - lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); - lustre_msg_set_status(req->rq_repmsg, req->rq_status); - DEBUG_REQ(D_RPCTRACE, req, "restoring transno "LPD64"/status %d", - req->rq_transno, req->rq_status); - - mdt_steal_ack_locks(req); + LASSERT(trd != NULL); + lrd = &trd->trd_reply; + + DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d", + lrd->lrd_transno, lrd->lrd_result); + + req->rq_transno = lrd->lrd_transno; + req->rq_status = lrd->lrd_result; + + lustre_msg_set_versions(req->rq_repmsg, trd->trd_pre_versions); + + if (req->rq_status != 0) + req->rq_transno = 0; + lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); + lustre_msg_set_status(req->rq_repmsg, req->rq_status); + + DEBUG_REQ(D_RPCTRACE, req, "restoring transno "LPD64"/status %d", + req->rq_transno, req->rq_status); + + mdt_steal_ack_locks(req); + + return lrd->lrd_data; } + void mdt_reconstruct_generic(struct mdt_thread_info *mti, - struct mdt_lock_handle *lhc) + struct mdt_lock_handle *lhc) { - struct ptlrpc_request *req = mdt_info_req(mti); - struct tg_export_data *ted = &req->rq_export->exp_target_data; + struct ptlrpc_request *req = mdt_info_req(mti); - return mdt_req_from_lcd(req, ted->ted_lcd); + mdt_req_from_lrd(req, mti->mti_reply_data); + return; } /** @@ -246,17 +239,16 @@ static void mdt_fake_ma(struct md_attr *ma) static void mdt_reconstruct_create(struct mdt_thread_info *mti, struct mdt_lock_handle *lhc) { - struct ptlrpc_request *req = mdt_info_req(mti); - struct obd_export *exp = req->rq_export; - struct tg_export_data *ted = &exp->exp_target_data; - struct mdt_device *mdt = mti->mti_mdt; - struct mdt_object *child; - struct mdt_body *body; - int rc; + struct ptlrpc_request *req = mdt_info_req(mti); + struct obd_export *exp = req->rq_export; + struct mdt_device *mdt = mti->mti_mdt; + struct mdt_object *child; + struct mdt_body *body; + int rc; - mdt_req_from_lcd(req, ted->ted_lcd); - if (req->rq_status) - return; + mdt_req_from_lrd(req, mti->mti_reply_data); + if (req->rq_status) + return; /* if no error, so child was created with requested fid */ child = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid2); @@ -296,15 +288,14 @@ static void mdt_reconstruct_setattr(struct mdt_thread_info *mti, { struct ptlrpc_request *req = mdt_info_req(mti); struct obd_export *exp = req->rq_export; - struct mdt_export_data *med = &exp->exp_mdt_data; struct mdt_device *mdt = mti->mti_mdt; struct mdt_object *obj; struct mdt_body *body; int rc; - mdt_req_from_lcd(req, med->med_ted.ted_lcd); - if (req->rq_status) - return; + mdt_req_from_lrd(req, mti->mti_reply_data); + if (req->rq_status) + return; body = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY); obj = mdt_object_find(mti->mti_env, mdt, mti->mti_rr.rr_fid1); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 344bdd9..257cf03 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -862,6 +862,7 @@ struct obd_export *class_new_export(struct obd_device *obd, spin_lock_init(&export->exp_rpc_lock); INIT_HLIST_NODE(&export->exp_uuid_hash); INIT_HLIST_NODE(&export->exp_nid_hash); + INIT_HLIST_NODE(&export->exp_gen_hash); spin_lock_init(&export->exp_bl_list_lock); INIT_LIST_HEAD(&export->exp_bl_list); diff --git a/lustre/obdclass/lprocfs_status_server.c b/lustre/obdclass/lprocfs_status_server.c index c06964d..86e03ff 100644 --- a/lustre/obdclass/lprocfs_status_server.c +++ b/lustre/obdclass/lprocfs_status_server.c @@ -228,6 +228,36 @@ static int lprocfs_exp_hash_seq_show(struct seq_file *m, void *data) } LPROC_SEQ_FOPS_RO(lprocfs_exp_hash); +int lprocfs_exp_print_replydata_seq(struct cfs_hash *hs, struct cfs_hash_bd *bd, + struct hlist_node *hnode, void *cb_data) + +{ + struct obd_export *exp = cfs_hash_object(hs, hnode); + struct seq_file *m = cb_data; + struct tg_export_data *ted = &exp->exp_target_data; + + seq_printf(m, "reply_cnt: %d\n" + "reply_max: %d\n" + "reply_released_by_xid: %d\n" + "reply_released_by_tag: %d\n\n", + ted->ted_reply_cnt, + ted->ted_reply_max, + ted->ted_release_xid, + ted->ted_release_tag); + return 0; +} + +int lprocfs_exp_replydata_seq_show(struct seq_file *m, void *data) +{ + struct nid_stat *stats = m->private; + struct obd_device *obd = stats->nid_obd; + + cfs_hash_for_each_key(obd->obd_nid_hash, &stats->nid, + lprocfs_exp_print_replydata_seq, m); + return 0; +} +LPROC_SEQ_FOPS_RO(lprocfs_exp_replydata); + int lprocfs_nid_stats_clear_seq_show(struct seq_file *m, void *data) { return seq_printf(m, "%s\n", "Write into this file to clear all nid " @@ -373,6 +403,15 @@ int lprocfs_exp_setup(struct obd_export *exp, lnet_nid_t *nid) GOTO(destroy_new_ns, rc); } + entry = lprocfs_add_simple(new_stat->nid_proc, "reply_data", new_stat, + &lprocfs_exp_replydata_fops); + if (IS_ERR(entry)) { + rc = PTR_ERR(entry); + CWARN("%s: Error adding the reply_data file: rc = %d\n", + obd->obd_name, rc); + GOTO(destroy_new_ns, rc); + } + spin_lock(&exp->exp_lock); exp->exp_nid_stats = new_stat; spin_unlock(&exp->exp_lock); diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 28af8f8..bbedbdc 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -52,6 +52,7 @@ static struct cfs_hash_ops uuid_hash_ops; static struct cfs_hash_ops nid_hash_ops; static struct cfs_hash_ops nid_stat_hash_ops; +static struct cfs_hash_ops gen_hash_ops; /*********** string parsing utils *********/ @@ -478,6 +479,7 @@ int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg) obd->obd_uuid_hash = NULL; obd->obd_nid_hash = NULL; obd->obd_nid_stats_hash = NULL; + obd->obd_gen_hash = NULL; spin_unlock(&obd->obd_dev_lock); /* create an uuid-export lustre hash */ @@ -513,6 +515,17 @@ int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg) if (!obd->obd_nid_stats_hash) GOTO(err_hash, err = -ENOMEM); + /* create a client_generation-export lustre hash */ + obd->obd_gen_hash = cfs_hash_create("UUID_HASH", + HASH_GEN_CUR_BITS, + HASH_GEN_MAX_BITS, + HASH_GEN_BKT_BITS, 0, + CFS_HASH_MIN_THETA, + CFS_HASH_MAX_THETA, + &gen_hash_ops, CFS_HASH_DEFAULT); + if (!obd->obd_gen_hash) + GOTO(err_hash, err = -ENOMEM); + exp = class_new_export(obd, &obd->obd_uuid); if (IS_ERR(exp)) GOTO(err_hash, err = PTR_ERR(exp)); @@ -554,6 +567,10 @@ err_hash: cfs_hash_putref(obd->obd_nid_stats_hash); obd->obd_nid_stats_hash = NULL; } + if (obd->obd_gen_hash) { + cfs_hash_putref(obd->obd_gen_hash); + obd->obd_gen_hash = NULL; + } obd->obd_starting = 0; CERROR("setup %s failed (%d)\n", obd->obd_name, err); return err; @@ -682,6 +699,12 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg) obd->obd_nid_stats_hash = NULL; } + /* destroy a client_generation-export hash body */ + if (obd->obd_gen_hash) { + cfs_hash_putref(obd->obd_gen_hash); + obd->obd_gen_hash = NULL; + } + class_decref(obd, "setup", obd); obd->obd_set_up = 0; @@ -2105,3 +2128,73 @@ static struct cfs_hash_ops nid_stat_hash_ops = { .hs_get = nidstats_get, .hs_put_locked = nidstats_put_locked, }; + + +/* + * client_generation<->export hash operations + */ + +static unsigned +gen_hash(struct cfs_hash *hs, const void *key, unsigned mask) +{ + return cfs_hash_djb2_hash(key, sizeof(__u32), mask); +} + +static void * +gen_key(struct hlist_node *hnode) +{ + struct obd_export *exp; + + exp = hlist_entry(hnode, struct obd_export, exp_gen_hash); + + RETURN(&exp->exp_target_data.ted_lcd->lcd_generation); +} + +/* + * NOTE: It is impossible to find an export that is in failed + * state with this function + */ +static int +gen_kepcmp(const void *key, struct hlist_node *hnode) +{ + struct obd_export *exp; + + LASSERT(key); + exp = hlist_entry(hnode, struct obd_export, exp_gen_hash); + + RETURN(exp->exp_target_data.ted_lcd->lcd_generation == *(__u32 *)key && + !exp->exp_failed); +} + +static void * +gen_export_object(struct hlist_node *hnode) +{ + return hlist_entry(hnode, struct obd_export, exp_gen_hash); +} + +static void +gen_export_get(struct cfs_hash *hs, struct hlist_node *hnode) +{ + struct obd_export *exp; + + exp = hlist_entry(hnode, struct obd_export, exp_gen_hash); + class_export_get(exp); +} + +static void +gen_export_put_locked(struct cfs_hash *hs, struct hlist_node *hnode) +{ + struct obd_export *exp; + + exp = hlist_entry(hnode, struct obd_export, exp_gen_hash); + class_export_put(exp); +} + +static struct cfs_hash_ops gen_hash_ops = { + .hs_hash = gen_hash, + .hs_key = gen_key, + .hs_keycmp = gen_kepcmp, + .hs_object = gen_export_object, + .hs_get = gen_export_get, + .hs_put_locked = gen_export_put_locked, +}; diff --git a/lustre/osd-ldiskfs/osd_scrub.c b/lustre/osd-ldiskfs/osd_scrub.c index 458977f..909967a 100644 --- a/lustre/osd-ldiskfs/osd_scrub.c +++ b/lustre/osd-ldiskfs/osd_scrub.c @@ -1594,6 +1594,10 @@ static const struct osd_lf_map osd_lf_maps[] = { { LAST_RCVD, { FID_SEQ_LOCAL_FILE, LAST_RECV_OID, 0 }, OLF_SHOW_NAME, sizeof(LAST_RCVD) - 1, NULL, NULL }, + /* reply_data */ + { REPLY_DATA, { FID_SEQ_LOCAL_FILE, REPLY_DATA_OID, 0 }, OLF_SHOW_NAME, + sizeof(REPLY_DATA) - 1, NULL, NULL }, + /* lov_objid */ { LOV_OBJID, { FID_SEQ_LOCAL_FILE, MDD_LOV_OBJ_OID, 0 }, OLF_SHOW_NAME, sizeof(LOV_OBJID) - 1, NULL, NULL }, diff --git a/lustre/osd-zfs/osd_oi.c b/lustre/osd-zfs/osd_oi.c index 435e654..22f6ff0 100644 --- a/lustre/osd-zfs/osd_oi.c +++ b/lustre/osd-zfs/osd_oi.c @@ -94,6 +94,7 @@ static const struct named_oid oids[] = { { OFD_HEALTH_CHECK_OID, HEALTH_CHECK }, { ACCT_USER_OID, "acct_usr_inode" }, { ACCT_GROUP_OID, "acct_grp_inode" }, + { REPLY_DATA_OID, REPLY_DATA }, { 0, NULL } }; diff --git a/lustre/target/tgt_handler.c b/lustre/target/tgt_handler.c index a8091d5..2e369a6 100644 --- a/lustre/target/tgt_handler.c +++ b/lustre/target/tgt_handler.c @@ -526,11 +526,11 @@ static int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id) /* sanity check: if the xid matches, the request must be marked as a * resent or replayed */ - if (req_xid_is_last(req)) { + if (req_can_reconstruct(req, NULL)) { if (!(lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY))) { DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches " - "last_xid, expected REPLAY or RESENT flag " + "saved xid, expected REPLAY or RESENT flag " "(%x)", req->rq_xid, lustre_msg_get_flags(req->rq_reqmsg)); req->rq_status = -ENOTCONN; @@ -689,6 +689,16 @@ int tgt_request_handle(struct ptlrpc_request *req) request_fail_id = tgt->lut_request_fail_id; tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id; + /* try to release in-memory reply data */ + if (tgt_is_multimodrpcs_client(req->rq_export)) { + tgt_handle_received_xid(req->rq_export, + lustre_msg_get_last_xid(req->rq_reqmsg)); + if (!(lustre_msg_get_flags(req->rq_reqmsg) & + (MSG_RESENT | MSG_REPLAY))) + tgt_handle_tag(req->rq_export, + lustre_msg_get_tag(req->rq_reqmsg)); + } + h = tgt_handler_find_check(req); if (IS_ERR(h)) { req->rq_status = PTR_ERR(h); @@ -2172,3 +2182,44 @@ out: RETURN(rc); } EXPORT_SYMBOL(tgt_brw_write); + +/* Check if request can be reconstructed from saved reply data + * A copy of the reply data is returned in @trd if the pointer is not NULL + */ +bool req_can_reconstruct(struct ptlrpc_request *req, + struct tg_reply_data *trd) +{ + struct tg_export_data *ted = &req->rq_export->exp_target_data; + struct lsd_client_data *lcd = ted->ted_lcd; + bool found; + + if (tgt_is_multimodrpcs_client(req->rq_export)) + return tgt_lookup_reply(req, trd); + + mutex_lock(&ted->ted_lcd_lock); + found = req->rq_xid == lcd->lcd_last_xid || + req->rq_xid == lcd->lcd_last_close_xid; + + if (found && trd != NULL) { + if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) { + trd->trd_reply.lrd_xid = lcd->lcd_last_close_xid; + trd->trd_reply.lrd_transno = + lcd->lcd_last_close_transno; + trd->trd_reply.lrd_result = lcd->lcd_last_close_result; + } else { + trd->trd_reply.lrd_xid = lcd->lcd_last_xid; + trd->trd_reply.lrd_transno = lcd->lcd_last_transno; + trd->trd_reply.lrd_result = lcd->lcd_last_result; + trd->trd_reply.lrd_data = lcd->lcd_last_data; + trd->trd_pre_versions[0] = lcd->lcd_pre_versions[0]; + trd->trd_pre_versions[1] = lcd->lcd_pre_versions[1]; + trd->trd_pre_versions[2] = lcd->lcd_pre_versions[2]; + trd->trd_pre_versions[3] = lcd->lcd_pre_versions[3]; + } + } + mutex_unlock(&ted->ted_lcd_lock); + + return found; +} +EXPORT_SYMBOL(req_can_reconstruct); + diff --git a/lustre/target/tgt_internal.h b/lustre/target/tgt_internal.h index 603a4c2..ace6af9 100644 --- a/lustre/target/tgt_internal.h +++ b/lustre/target/tgt_internal.h @@ -55,6 +55,7 @@ struct tgt_thread_info { /* server and client data buffers */ struct lr_server_data tti_lsd; struct lsd_client_data tti_lcd; + struct lsd_reply_data tti_lrd; struct lu_buf tti_buf; loff_t tti_off; @@ -245,6 +246,8 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th, void *cookie); int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th, void *cookie); +int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid); +int tgt_handle_tag(struct obd_export *exp, __u16 tag); void update_records_dump(const struct update_records *records, unsigned int mask, bool dump_updates); diff --git a/lustre/target/tgt_lastrcvd.c b/lustre/target/tgt_lastrcvd.c index 1aa891f..558ee6b 100644 --- a/lustre/target/tgt_lastrcvd.c +++ b/lustre/target/tgt_lastrcvd.c @@ -44,6 +44,317 @@ #include "tgt_internal.h" + +/* Allocate a bitmap for a chunk of reply data slots */ +static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk) +{ + unsigned long *bm; + + OBD_ALLOC(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) * sizeof(long)); + if (bm == NULL) + return -ENOMEM; + + spin_lock(&lut->lut_client_bitmap_lock); + + if (lut->lut_reply_bitmap[chunk] != NULL) { + /* someone else already allocated the bitmap for this chunk */ + spin_unlock(&lut->lut_client_bitmap_lock); + OBD_FREE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) * + sizeof(long)); + return 0; + } + + lut->lut_reply_bitmap[chunk] = bm; + + spin_unlock(&lut->lut_client_bitmap_lock); + + return 0; +} + +/* Look for an available reply data slot in the bitmap + * of the target @lut + * Allocate bitmap chunk when first used + * XXX algo could be improved if this routine limits performance + */ +static int tgt_find_free_reply_slot(struct lu_target *lut) +{ + unsigned long *bmp; + int chunk = 0; + int rc; + int b; + + for (chunk = 0; chunk < LUT_REPLY_SLOTS_MAX_CHUNKS; chunk++) { + /* allocate the bitmap chunk if necessary */ + if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) { + rc = tgt_bitmap_chunk_alloc(lut, chunk); + if (rc != 0) + return rc; + } + bmp = lut->lut_reply_bitmap[chunk]; + + /* look for an available slot in this chunk */ + do { + b = find_first_zero_bit(bmp, LUT_REPLY_SLOTS_PER_CHUNK); + if (b >= LUT_REPLY_SLOTS_PER_CHUNK) + break; + + /* found one */ + if (test_and_set_bit(b, bmp) == 0) + return chunk * LUT_REPLY_SLOTS_PER_CHUNK + b; + } while (true); + } + + return -ENOSPC; +} + +/* Mark the reply data slot @idx 'used' in the corresponding bitmap chunk + * of the target @lut + * Allocate the bitmap chunk if necessary + */ +static int tgt_set_reply_slot(struct lu_target *lut, int idx) +{ + int chunk; + int b; + int rc; + + chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK; + b = idx % LUT_REPLY_SLOTS_PER_CHUNK; + + LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS); + LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK); + + /* allocate the bitmap chunk if necessary */ + if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) { + rc = tgt_bitmap_chunk_alloc(lut, chunk); + if (rc != 0) + return rc; + } + + /* mark the slot 'used' in this chunk */ + if (test_and_set_bit(b, lut->lut_reply_bitmap[chunk]) != 0) { + CERROR("%s: slot %d already set in bitmap\n", + tgt_name(lut), idx); + return -EALREADY; + } + + return 0; +} + + +/* Mark the reply data slot @idx 'unused' in the corresponding bitmap chunk + * of the target @lut + */ +static int tgt_clear_reply_slot(struct lu_target *lut, int idx) +{ + int chunk; + int b; + + chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK; + b = idx % LUT_REPLY_SLOTS_PER_CHUNK; + + LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS); + LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK); + + if (test_and_clear_bit(b, lut->lut_reply_bitmap[chunk]) == 0) { + CERROR("%s: slot %d already clear in bitmap\n", + tgt_name(lut), idx); + return -EALREADY; + } + + return 0; +} + + +/* Read header of reply_data file of target @tgt into structure @lrh */ +static int tgt_reply_header_read(const struct lu_env *env, + struct lu_target *tgt, + struct lsd_reply_header *lrh) +{ + int rc; + struct lsd_reply_header buf; + struct tgt_thread_info *tti = tgt_th_info(env); + + tti->tti_off = 0; + tti->tti_buf.lb_buf = &buf; + tti->tti_buf.lb_len = sizeof(buf); + + rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf, + &tti->tti_off); + if (rc != 0) + return rc; + + lrh->lrh_magic = le32_to_cpu(buf.lrh_magic); + lrh->lrh_header_size = le32_to_cpu(buf.lrh_header_size); + lrh->lrh_reply_size = le32_to_cpu(buf.lrh_reply_size); + + CDEBUG(D_HA, "%s: read %s header. magic=0x%08x " + "header_size=%d reply_size=%d\n", + tgt->lut_obd->obd_name, REPLY_DATA, + lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size); + + return 0; +} + +/* Write header into replay_data file of target @tgt from structure @lrh */ +static int tgt_reply_header_write(const struct lu_env *env, + struct lu_target *tgt, + struct lsd_reply_header *lrh) +{ + int rc; + struct lsd_reply_header buf; + struct tgt_thread_info *tti = tgt_th_info(env); + struct thandle *th; + struct dt_object *dto; + + CDEBUG(D_HA, "%s: write %s header. magic=0x%08x " + "header_size=%d reply_size=%d\n", + tgt->lut_obd->obd_name, REPLY_DATA, + lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size); + + buf.lrh_magic = cpu_to_le32(lrh->lrh_magic); + buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size); + buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size); + + th = dt_trans_create(env, tgt->lut_bottom); + if (IS_ERR(th)) + return PTR_ERR(th); + th->th_sync = 1; + + tti->tti_off = 0; + tti->tti_buf.lb_buf = &buf; + tti->tti_buf.lb_len = sizeof(buf); + + rc = dt_declare_record_write(env, tgt->lut_reply_data, + &tti->tti_buf, tti->tti_off, th); + if (rc) + GOTO(out, rc); + + rc = dt_trans_start(env, tgt->lut_bottom, th); + if (rc) + GOTO(out, rc); + + dto = dt_object_locate(tgt->lut_reply_data, th->th_dev); + rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th); +out: + dt_trans_stop(env, tgt->lut_bottom, th); + return rc; +} + +/* Write the reply data @lrd into reply_data file of target @tgt + * at offset @off + */ +static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt, + struct lsd_reply_data *lrd, loff_t off, + struct thandle *th) +{ + struct tgt_thread_info *tti = tgt_th_info(env); + struct dt_object *dto; + struct lsd_reply_data *buf = &tti->tti_lrd; + + lrd->lrd_result = ptlrpc_status_hton(lrd->lrd_result); + + buf->lrd_transno = cpu_to_le64(lrd->lrd_transno); + buf->lrd_xid = cpu_to_le64(lrd->lrd_xid); + buf->lrd_data = cpu_to_le64(lrd->lrd_data); + buf->lrd_result = cpu_to_le32(lrd->lrd_result); + buf->lrd_client_gen = cpu_to_le32(lrd->lrd_client_gen); + + lrd->lrd_result = ptlrpc_status_ntoh(lrd->lrd_result); + + tti->tti_off = off; + tti->tti_buf.lb_buf = buf; + tti->tti_buf.lb_len = sizeof(*buf); + + dto = dt_object_locate(tgt->lut_reply_data, th->th_dev); + return dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th); +} + +/* Read the reply data from reply_data file of target @tgt at offset @off + * into structure @lrd + */ +static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt, + struct lsd_reply_data *lrd, loff_t off) +{ + int rc; + struct tgt_thread_info *tti = tgt_th_info(env); + struct lsd_reply_data *buf = &tti->tti_lrd; + + tti->tti_off = off; + tti->tti_buf.lb_buf = buf; + tti->tti_buf.lb_len = sizeof(*buf); + + rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf, + &tti->tti_off); + if (rc != 0) + return rc; + + lrd->lrd_transno = le64_to_cpu(buf->lrd_transno); + lrd->lrd_xid = le64_to_cpu(buf->lrd_xid); + lrd->lrd_data = le64_to_cpu(buf->lrd_data); + lrd->lrd_result = le32_to_cpu(buf->lrd_result); + lrd->lrd_client_gen = le32_to_cpu(buf->lrd_client_gen); + + return 0; +} + + +/* Free the in-memory reply data structure @trd and release + * the corresponding slot in the reply_data file of target @lut + * Called with ted_lcd_lock held + */ +static void tgt_free_reply_data(struct lu_target *lut, + struct tg_export_data *ted, + struct tg_reply_data *trd) +{ + CDEBUG(D_TRACE, "%s: free reply data %p: xid %llu, transno %llu, " + "client gen %u, slot idx %d\n", + tgt_name(lut), trd, trd->trd_reply.lrd_xid, + trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen, + trd->trd_index); + + LASSERT(mutex_is_locked(&ted->ted_lcd_lock)); + + list_del(&trd->trd_list); + ted->ted_reply_cnt--; + tgt_clear_reply_slot(lut, trd->trd_index); + OBD_FREE_PTR(trd); +} + +/* Release the reply data @trd from target @lut + * The reply data with the highest transno for this export + * is retained to ensure correctness of target recovery + * Called with ted_lcd_lock held + */ +static void tgt_release_reply_data(struct lu_target *lut, + struct tg_export_data *ted, + struct tg_reply_data *trd) +{ + CDEBUG(D_TRACE, "%s: release reply data %p: xid %llu, transno %llu, " + "client gen %u, slot idx %d\n", + tgt_name(lut), trd, trd->trd_reply.lrd_xid, + trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen, + trd->trd_index); + + LASSERT(mutex_is_locked(&ted->ted_lcd_lock)); + + /* Do not free the reply data corresponding to the + * highest transno of this export. + * This ensures on-disk reply data is kept and + * last committed transno can be restored from disk in case + * of target recovery + */ + if (trd->trd_reply.lrd_transno == ted->ted_lcd->lcd_last_transno) { + /* free previous retained reply */ + if (ted->ted_reply_last != NULL) + tgt_free_reply_data(lut, ted, ted->ted_reply_last); + /* retain the reply */ + list_del_init(&trd->trd_list); + ted->ted_reply_last = trd; + } else { + tgt_free_reply_data(lut, ted, trd); + } +} + static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti) { tti->tti_buf.lb_buf = &tti->tti_lsd; @@ -71,6 +382,7 @@ int tgt_client_alloc(struct obd_export *exp) RETURN(-ENOMEM); /* Mark that slot is not yet valid, 0 doesn't work here */ exp->exp_target_data.ted_lr_idx = -1; + INIT_LIST_HEAD(&exp->exp_target_data.ted_reply_list); RETURN(0); } EXPORT_SYMBOL(tgt_client_alloc); @@ -82,9 +394,26 @@ void tgt_client_free(struct obd_export *exp) { struct tg_export_data *ted = &exp->exp_target_data; struct lu_target *lut = class_exp2tgt(exp); + struct tg_reply_data *trd, *tmp; LASSERT(exp != exp->exp_obd->obd_self_export); + /* free reply data */ + mutex_lock(&ted->ted_lcd_lock); + list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) { + tgt_release_reply_data(lut, ted, trd); + } + if (ted->ted_reply_last != NULL) { + tgt_free_reply_data(lut, ted, ted->ted_reply_last); + ted->ted_reply_last = NULL; + } + mutex_unlock(&ted->ted_lcd_lock); + + if (!hlist_unhashed(&exp->exp_gen_hash)) + cfs_hash_del(exp->exp_obd->obd_gen_hash, + &ted->ted_lcd->lcd_generation, + &exp->exp_gen_hash); + OBD_FREE_PTR(ted->ted_lcd); ted->ted_lcd = NULL; @@ -98,6 +427,9 @@ void tgt_client_free(struct obd_export *exp) exp->exp_obd->obd_name, ted->ted_lr_idx); LBUG(); } + + if (tgt_is_multimodrpcs_client(exp) && !exp->exp_obd->obd_stopping) + atomic_dec(&lut->lut_num_clients); } EXPORT_SYMBOL(tgt_client_free); @@ -396,6 +728,16 @@ void tgt_boot_epoch_update(struct lu_target *tgt) spin_lock(&tgt->lut_obd->obd_recovery_task_lock); list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue); spin_unlock(&tgt->lut_obd->obd_recovery_task_lock); + + /** Clear MULTI RPCS incompatibility flag if + * - target is MDT and + * - there is no client to recover or the recovery was aborted + */ + if (!strncmp(tgt->lut_obd->obd_type->typ_name, LUSTRE_MDT_NAME, 3) && + (tgt->lut_obd->obd_max_recoverable_clients == 0 || + tgt->lut_obd->obd_abort_recovery)) + tgt->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS; + /** update server epoch */ tgt_server_data_update(&env, tgt, 1); lu_env_fini(&env); @@ -566,18 +908,40 @@ repeat: goto repeat; } - CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n", - tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid); - ted->ted_lr_idx = idx; ted->ted_lr_off = tgt->lut_lsd.lsd_client_start + idx * tgt->lut_lsd.lsd_client_size; LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off); - CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n", + if (tgt_is_multimodrpcs_client(exp)) { + /* Set MULTI RPCS incompatibility flag to prevent previous + * Lustre versions to mount a target with reply_data file */ + atomic_inc(&tgt->lut_num_clients); + if (!(tgt->lut_lsd.lsd_feature_incompat & + OBD_INCOMPAT_MULTI_RPCS)) { + tgt->lut_lsd.lsd_feature_incompat |= + OBD_INCOMPAT_MULTI_RPCS; + rc = tgt_server_data_update(env, tgt, 1); + if (rc < 0) { + CERROR("%s: unable to set MULTI RPCS " + "incompatibility flag\n", + exp->exp_obd->obd_name); + RETURN(rc); + } + } + + /* assign client slot generation */ + ted->ted_lcd->lcd_generation = + atomic_inc_return(&tgt->lut_client_generation); + } else { + ted->ted_lcd->lcd_generation = 0; + } + + CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s' " + "generation %d\n", tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off, - ted->ted_lcd->lcd_uuid); + ted->ted_lcd->lcd_uuid, ted->ted_lcd->lcd_generation); if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD)) RETURN(-ENOSPC); @@ -591,10 +955,9 @@ repeat: } EXPORT_SYMBOL(tgt_client_new); -/* Add client data to the MDS. We use a bitmap to locate a free space - * in the last_rcvd file if cl_off is -1 (i.e. a new client). - * Otherwise, we just have to read the data from the last_rcvd file and - * we know its offset. +/* Add an existing client to the MDS in-memory state based on + * a client that was previously found in the last_rcvd file and + * already has an assigned slot (idx >= 0). * * It should not be possible to fail adding an existing client - otherwise * mdt_init_server_data() callsite needs to be fixed. @@ -618,9 +981,12 @@ int tgt_client_add(const struct lu_env *env, struct obd_export *exp, int idx) tgt->lut_obd->obd_name, idx); LBUG(); } + atomic_inc(&tgt->lut_num_clients); - CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n", - tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid); + CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added, " + "generation %d\n", + tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid, + ted->ted_lcd->lcd_generation); ted->ted_lr_idx = idx; ted->ted_lr_off = tgt->lut_lsd.lsd_client_start + @@ -779,6 +1145,74 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt, GOTO(srv_update, rc = 0); } + /* Target that supports multiple reply data */ + if (tgt_is_multimodrpcs_client(req->rq_export)) { + struct tg_reply_data *trd; + struct lsd_reply_data *lrd; + __u64 *pre_versions; + int i; + loff_t off; + + OBD_ALLOC_PTR(trd); + if (unlikely(trd == NULL)) + GOTO(srv_update, rc = -ENOMEM); + + /* update export last transno */ + mutex_lock(&ted->ted_lcd_lock); + if (tti->tti_transno > ted->ted_lcd->lcd_last_transno) + ted->ted_lcd->lcd_last_transno = tti->tti_transno; + mutex_unlock(&ted->ted_lcd_lock); + + /* fill reply data information */ + lrd = &trd->trd_reply; + lrd->lrd_transno = tti->tti_transno; + lrd->lrd_xid = req->rq_xid; + lrd->lrd_result = th->th_result; + lrd->lrd_data = opdata; + lrd->lrd_client_gen = ted->ted_lcd->lcd_generation; + trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg); + pre_versions = lustre_msg_get_versions(req->rq_repmsg); + if (pre_versions) { + trd->trd_pre_versions[0] = pre_versions[0]; + trd->trd_pre_versions[1] = pre_versions[1]; + trd->trd_pre_versions[2] = pre_versions[2]; + trd->trd_pre_versions[3] = pre_versions[3]; + } + + /* find a empty slot */ + i = tgt_find_free_reply_slot(tgt); + if (unlikely(i < 0)) { + CERROR("%s: couldn't find a slot for reply data: " + "rc = %d\n", tgt_name(tgt), i); + GOTO(srv_update, rc = i); + } + trd->trd_index = i; + + /* write reply data to disk */ + off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i; + rc = tgt_reply_data_write(env, tgt, lrd, off, th); + if (unlikely(rc != 0)) { + CERROR("%s: can't update %s file: rc = %d\n", + tgt_name(tgt), REPLY_DATA, rc); + RETURN(rc); + } + + /* add reply data to target export's reply list */ + mutex_lock(&ted->ted_lcd_lock); + list_add(&trd->trd_list, &ted->ted_reply_list); + ted->ted_reply_cnt++; + if (ted->ted_reply_cnt > ted->ted_reply_max) + ted->ted_reply_max = ted->ted_reply_cnt; + mutex_unlock(&ted->ted_lcd_lock); + + CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, " + "tag %hu, client gen %u, slot idx %d\n", + trd, lrd->lrd_xid, lrd->lrd_transno, + trd->trd_tag, lrd->lrd_client_gen, i); + + GOTO(srv_update, rc = 0); + } + mutex_lock(&ted->ted_lcd_lock); LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0)); if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) { @@ -894,6 +1328,8 @@ static int tgt_clients_data_init(const struct lu_env *env, int cl_idx; int rc = 0; loff_t off = lsd->lsd_client_start; + __u32 generation = 0; + struct cfs_hash *hash = NULL; ENTRY; @@ -904,6 +1340,10 @@ static int tgt_clients_data_init(const struct lu_env *env, if (lcd == NULL) RETURN(-ENOMEM); + hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash); + if (hash == NULL) + GOTO(err_out, rc = -ENODEV); + for (cl_idx = 0; off < last_size; cl_idx++) { struct obd_export *exp; __u64 last_transno; @@ -933,8 +1373,9 @@ static int tgt_clients_data_init(const struct lu_env *env, * need to be set up like real exports as connect does. */ CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 - " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx, - last_transno, lsd->lsd_last_transno, lcd_last_xid(lcd)); + " srv lr: "LPU64" lx: "LPU64" gen %u\n", lcd->lcd_uuid, + cl_idx, last_transno, lsd->lsd_last_transno, + lcd_last_xid(lcd), lcd->lcd_generation); exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid); if (IS_ERR(exp)) { @@ -959,6 +1400,25 @@ static int tgt_clients_data_init(const struct lu_env *env, exp->exp_in_recovery = 0; spin_unlock(&exp->exp_lock); obd->obd_max_recoverable_clients++; + + if (tgt->lut_lsd.lsd_feature_incompat & + OBD_INCOMPAT_MULTI_RPCS && + lcd->lcd_generation != 0) { + /* compute the highest valid client generation */ + generation = max(generation, lcd->lcd_generation); + + /* fill client_generation <-> export hash table */ + rc = cfs_hash_add_unique(hash, &lcd->lcd_generation, + &exp->exp_gen_hash); + if (rc != 0) { + CERROR("%s: duplicate export for client " + "generation %u\n", + tgt_name(tgt), lcd->lcd_generation); + class_export_put(exp); + GOTO(err_out, rc); + } + } + class_export_put(exp); /* Need to check last_rcvd even for duplicated exports. */ @@ -971,7 +1431,12 @@ static int tgt_clients_data_init(const struct lu_env *env, spin_unlock(&tgt->lut_translock); } + /* record highest valid client generation */ + atomic_set(&tgt->lut_client_generation, generation); + err_out: + if (hash != NULL) + cfs_hash_putref(hash); OBD_FREE_PTR(lcd); RETURN(rc); } @@ -988,7 +1453,8 @@ static struct server_compat_data tgt_scd[] = { .rocompat = OBD_ROCOMPAT_LOVOBJID, .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR | OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR | - OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI, + OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI | + OBD_INCOMPAT_MULTI_RPCS, .rocinit = OBD_ROCOMPAT_LOVOBJID, .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR | OBD_INCOMPAT_MULTI_OI, @@ -1263,3 +1729,228 @@ int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th, tgt_ses_req(tsi)); return rc; } + +int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt) +{ + struct tgt_thread_info *tti = tgt_th_info(env); + struct lsd_reply_data *lrd = &tti->tti_lrd; + unsigned long reply_data_size; + int rc; + struct lsd_reply_header *lrh = NULL; + struct lsd_client_data *lcd = NULL; + struct tg_reply_data *trd = NULL; + int idx; + loff_t off; + struct cfs_hash *hash = NULL; + struct obd_export *exp; + struct obd_export *tmp; + struct tg_export_data *ted; + int reply_data_recovered = 0; + + rc = dt_attr_get(env, tgt->lut_reply_data, &tti->tti_attr); + if (rc) + GOTO(out, rc); + reply_data_size = (unsigned long)tti->tti_attr.la_size; + + OBD_ALLOC_PTR(lrh); + if (lrh == NULL) + GOTO(out, rc = -ENOMEM); + + if (reply_data_size == 0) { + CDEBUG(D_INFO, "%s: new reply_data file, initializing\n", + tgt_name(tgt)); + lrh->lrh_magic = LRH_MAGIC; + lrh->lrh_header_size = sizeof(struct lsd_reply_header); + lrh->lrh_reply_size = sizeof(struct lsd_reply_data); + rc = tgt_reply_header_write(env, tgt, lrh); + if (rc) { + CERROR("%s: error writing %s: rc = %d\n", + tgt_name(tgt), REPLY_DATA, rc); + GOTO(out, rc); + } + } else { + rc = tgt_reply_header_read(env, tgt, lrh); + if (rc) { + CERROR("%s: error reading %s: rc = %d\n", + tgt_name(tgt), REPLY_DATA, rc); + GOTO(out, rc); + } + if (lrh->lrh_magic != LRH_MAGIC || + lrh->lrh_header_size != sizeof(struct lsd_reply_header) || + lrh->lrh_reply_size != sizeof(struct lsd_reply_data)) { + CERROR("%s: invalid header in %s\n", + tgt_name(tgt), REPLY_DATA); + GOTO(out, rc = -EINVAL); + } + + hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash); + if (hash == NULL) + GOTO(out, rc = -ENODEV); + + OBD_ALLOC_PTR(lcd); + if (lcd == NULL) + GOTO(out, rc = -ENOMEM); + + OBD_ALLOC_PTR(trd); + if (trd == NULL) + GOTO(out, rc = -ENOMEM); + + /* Load reply_data from disk */ + for (idx = 0, off = sizeof(struct lsd_reply_header); + off < reply_data_size; + idx++, off += sizeof(struct lsd_reply_data)) { + rc = tgt_reply_data_read(env, tgt, lrd, off); + if (rc) { + CERROR("%s: error reading %s: rc = %d\n", + tgt_name(tgt), REPLY_DATA, rc); + GOTO(out, rc); + } + + exp = cfs_hash_lookup(hash, &lrd->lrd_client_gen); + if (exp == NULL) { + /* old reply data from a disconnected client */ + continue; + } + ted = &exp->exp_target_data; + mutex_lock(&ted->ted_lcd_lock); + + /* create in-memory reply_data and link it to + * target export's reply list */ + tgt_set_reply_slot(tgt, idx); + trd->trd_reply = *lrd; + trd->trd_pre_versions[0] = 0; + trd->trd_pre_versions[1] = 0; + trd->trd_pre_versions[2] = 0; + trd->trd_pre_versions[3] = 0; + trd->trd_index = idx; + trd->trd_tag = 0; + list_add(&trd->trd_list, &ted->ted_reply_list); + ted->ted_reply_cnt++; + if (ted->ted_reply_cnt > ted->ted_reply_max) + ted->ted_reply_max = ted->ted_reply_cnt; + + CDEBUG(D_HA, "%s: restore reply %p: xid %llu, " + "transno %llu, client gen %u, slot idx %d\n", + tgt_name(tgt), trd, lrd->lrd_xid, + lrd->lrd_transno, lrd->lrd_client_gen, + trd->trd_index); + + /* update export last committed transation */ + exp->exp_last_committed = max(exp->exp_last_committed, + lrd->lrd_transno); + + mutex_unlock(&ted->ted_lcd_lock); + class_export_put(exp); + + /* update target last committed transaction */ + spin_lock(&tgt->lut_translock); + tgt->lut_last_transno = max(tgt->lut_last_transno, + lrd->lrd_transno); + spin_unlock(&tgt->lut_translock); + + reply_data_recovered++; + + OBD_ALLOC_PTR(trd); + if (trd == NULL) + GOTO(out, rc = -ENOMEM); + } + CDEBUG(D_INFO, "%s: %d reply data have been recovered\n", + tgt_name(tgt), reply_data_recovered); + + /* delete entries from client_generation<->export hash */ + spin_lock(&tgt->lut_obd->obd_dev_lock); + list_for_each_entry_safe(exp, tmp, + &tgt->lut_obd->obd_exports, + exp_obd_chain) { + struct tg_export_data *ted = &exp->exp_target_data; + + if (!hlist_unhashed(&exp->exp_gen_hash)) + cfs_hash_del(hash, + &ted->ted_lcd->lcd_generation, + &exp->exp_gen_hash); + } + spin_unlock(&tgt->lut_obd->obd_dev_lock); + } + + rc = 0; + +out: + if (hash != NULL) + cfs_hash_putref(hash); + if (lcd != NULL) + OBD_FREE_PTR(lcd); + if (trd != NULL) + OBD_FREE_PTR(trd); + if (lrh != NULL) + OBD_FREE_PTR(lrh); + return rc; +} + +/* Look for a reply data matching specified request @req + * A copy is returned in @trd if the pointer is not NULL + */ +bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd) +{ + struct tg_export_data *ted = &req->rq_export->exp_target_data; + struct tg_reply_data *reply, *tmp; + bool found = false; + + mutex_lock(&ted->ted_lcd_lock); + list_for_each_entry_safe(reply, tmp, &ted->ted_reply_list, trd_list) { + if (reply->trd_reply.lrd_xid == req->rq_xid) { + found = true; + break; + } + } + if (found && trd != NULL) + *trd = *reply; + mutex_unlock(&ted->ted_lcd_lock); + + CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d\n", + tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid, + found ? 1 : 0); + + return found; +} +EXPORT_SYMBOL(tgt_lookup_reply); + +int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid) +{ + struct tg_export_data *ted = &exp->exp_target_data; + struct lu_target *lut = class_exp2tgt(exp); + struct tg_reply_data *trd, *tmp; + + mutex_lock(&ted->ted_lcd_lock); + list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) { + if (trd->trd_reply.lrd_xid > rcvd_xid) + continue; + ted->ted_release_xid++; + tgt_release_reply_data(lut, ted, trd); + } + mutex_unlock(&ted->ted_lcd_lock); + + return 0; +} + +int tgt_handle_tag(struct obd_export *exp, __u16 tag) +{ + struct tg_export_data *ted = &exp->exp_target_data; + struct lu_target *lut = class_exp2tgt(exp); + struct tg_reply_data *trd, *tmp; + + if (tag == 0) + return 0; + + mutex_lock(&ted->ted_lcd_lock); + list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) { + if (trd->trd_tag != tag) + continue; + ted->ted_release_tag++; + tgt_release_reply_data(lut, ted, trd); + break; + } + mutex_unlock(&ted->ted_lcd_lock); + + return 0; +} + diff --git a/lustre/target/tgt_main.c b/lustre/target/tgt_main.c index 269d896..ea628fd 100644 --- a/lustre/target/tgt_main.c +++ b/lustre/target/tgt_main.c @@ -56,6 +56,10 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut, lut->lut_bottom = dt; lut->lut_last_rcvd = NULL; lut->lut_client_bitmap = NULL; + atomic_set(&lut->lut_num_clients, 0); + atomic_set(&lut->lut_client_generation, 0); + lut->lut_reply_data = NULL; + lut->lut_reply_bitmap = NULL; obd->u.obt.obt_lut = lut; obd->u.obt.obt_magic = OBT_MAGIC; @@ -93,13 +97,13 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut, rc = PTR_ERR(o); CERROR("%s: cannot open LAST_RCVD: rc = %d\n", tgt_name(lut), rc); - GOTO(out_bitmap, rc); + GOTO(out, rc); } lut->lut_last_rcvd = o; rc = tgt_server_data_init(env, lut); if (rc < 0) - GOTO(out_obj, rc); + GOTO(out, rc); /* prepare transactions callbacks */ lut->lut_txn_cb.dtc_txn_start = tgt_txn_start_cb; @@ -112,23 +116,89 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut, dt_txn_callback_add(lut->lut_bottom, &lut->lut_txn_cb); lut->lut_bottom->dd_lu_dev.ld_site->ls_tgt = lut; + /* reply_data is supported by MDT targets only for now */ + if (strncmp(obd->obd_type->typ_name, LUSTRE_MDT_NAME, 3) != 0) + RETURN(0); + + OBD_ALLOC(lut->lut_reply_bitmap, + LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *)); + if (lut->lut_reply_bitmap == NULL) + GOTO(out, rc); + + memset(&attr, 0, sizeof(attr)); + attr.la_valid = LA_MODE; + attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR; + dof.dof_type = dt_mode_to_dft(S_IFREG); + + lu_local_obj_fid(&fid, REPLY_DATA_OID); + + o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr); + if (IS_ERR(o)) { + rc = PTR_ERR(o); + CERROR("%s: cannot open REPLY_DATA: rc = %d\n", tgt_name(lut), + rc); + GOTO(out, rc); + } + lut->lut_reply_data = o; + + rc = tgt_reply_data_init(env, lut); + if (rc < 0) + GOTO(out, rc); + RETURN(0); -out_obj: - lu_object_put(env, &lut->lut_last_rcvd->do_lu); +out: + if (lut->lut_last_rcvd != NULL) + lu_object_put(env, &lut->lut_last_rcvd->do_lu); lut->lut_last_rcvd = NULL; -out_bitmap: - OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); + if (lut->lut_client_bitmap != NULL) + OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); lut->lut_client_bitmap = NULL; + if (lut->lut_reply_data != NULL) + lu_object_put(env, &lut->lut_reply_data->do_lu); + lut->lut_reply_data = NULL; + if (lut->lut_reply_bitmap != NULL) + OBD_FREE(lut->lut_reply_bitmap, + LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *)); + lut->lut_reply_bitmap = NULL; return rc; } EXPORT_SYMBOL(tgt_init); void tgt_fini(const struct lu_env *env, struct lu_target *lut) { + int i; + int rc; ENTRY; + if (lut->lut_lsd.lsd_feature_incompat & OBD_INCOMPAT_MULTI_RPCS && + atomic_read(&lut->lut_num_clients) == 0) { + /* Clear MULTI RPCS incompatibility flag that prevents previous + * Lustre versions to mount a target with reply_data file */ + lut->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS; + rc = tgt_server_data_update(env, lut, 1); + if (rc < 0) + CERROR("%s: unable to clear MULTI RPCS " + "incompatibility flag\n", + lut->lut_obd->obd_name); + } + sptlrpc_rule_set_free(&lut->lut_sptlrpc_rset); + if (lut->lut_reply_data != NULL) + lu_object_put(env, &lut->lut_reply_data->do_lu); + lut->lut_reply_data = NULL; + if (lut->lut_reply_bitmap != NULL) { + for (i = 0; i < LUT_REPLY_SLOTS_MAX_CHUNKS; i++) { + if (lut->lut_reply_bitmap[i] != NULL) + OBD_FREE(lut->lut_reply_bitmap[i], + BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) * + sizeof(long)); + lut->lut_reply_bitmap[i] = NULL; + } + OBD_FREE(lut->lut_reply_bitmap, + LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *)); + } + lut->lut_reply_bitmap = NULL; if (lut->lut_client_bitmap) { OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); lut->lut_client_bitmap = NULL;