struct lfsck_request *,
struct lfsck_reply *,
struct lfsck_query *));
-bool req_can_reconstruct(struct ptlrpc_request *req, struct tg_reply_data *trd);
+int req_can_reconstruct(struct ptlrpc_request *req, struct tg_reply_data *trd);
extern struct tgt_handler tgt_sec_ctx_handlers[];
extern struct tgt_handler tgt_lfsck_handlers[];
int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg,
int sync);
int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt);
-bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd);
+int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd);
int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
struct tg_export_data *ted, struct tg_reply_data *trd,
+ struct ptlrpc_request *req,
struct thandle *th, bool update_lrd_file);
struct tg_reply_data *tgt_lookup_reply_by_xid(struct tg_export_data *ted,
__u64 xid);
/** highest XID received by export client that has no
* unreceived lower-numbered XID
*/
- __u64 exp_last_xid;
+ __u64 exp_last_xid;
+ long *exp_used_slots;
};
#define exp_target_data u.eu_target_data
opc = lustre_msg_get_opc(req->rq_reqmsg);
tag = obd_get_mod_rpc_slot(cli, opc, it);
lustre_msg_set_tag(req->rq_reqmsg, tag);
+ ptlrpc_reassign_next_xid(req);
}
static inline void mdc_put_mod_rpc_slot(struct ptlrpc_request *req,
unsigned int
rq_hp:1, /**< high priority RPC */
rq_at_linked:1, /**< link into service's srv_at_array */
- rq_packed_final:1; /**< packed final reply */
+ rq_packed_final:1, /**< packed final reply */
+ rq_obsolete:1; /* aborted by a signal on a client */
/** @} */
/** one of RQ_PHASE_* */
__u64 ptlrpc_next_xid(void);
__u64 ptlrpc_sample_next_xid(void);
__u64 ptlrpc_req_xid(struct ptlrpc_request *request);
+void ptlrpc_reassign_next_xid(struct ptlrpc_request *req);
/* Set of routines to run a function in ptlrpcd context */
void *ptlrpcd_alloc_work(struct obd_import *imp,
#define OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK 0x51c
#define OBD_FAIL_PTLRPC_CLIENT_BULK_CB3 0x520
#define OBD_FAIL_PTLRPC_BULK_ATTACH 0x521
+#define OBD_FAIL_PTLRPC_RESEND_RACE 0x525
#define OBD_FAIL_PTLRPC_ROUND_XID 0x530
#define OBD_FAIL_PTLRPC_CONNECT_RACE 0x531
OBD_CONNECT2_FLR |\
OBD_CONNECT2_LOCK_CONVERT | \
OBD_CONNECT2_ARCHIVE_ID_ARRAY | \
+ OBD_CONNECT2_INC_XID | \
OBD_CONNECT2_SELINUX_POLICY | \
OBD_CONNECT2_LSOM | \
OBD_CONNECT2_ASYNC_DISCARD | \
OBD_CONNECT_GRANT_PARAM | \
OBD_CONNECT_SHORTIO | OBD_CONNECT_FLAGS2)
-#define OST_CONNECT_SUPPORTED2 OBD_CONNECT2_LOCKAHEAD
+#define OST_CONNECT_SUPPORTED2 (OBD_CONNECT2_LOCKAHEAD | OBD_CONNECT2_INC_XID)
#define ECHO_CONNECT_SUPPORTED (OBD_CONNECT_FID)
#define ECHO_CONNECT_SUPPORTED2 0
OBD_CONNECT2_FLR |
OBD_CONNECT2_LOCK_CONVERT |
OBD_CONNECT2_ARCHIVE_ID_ARRAY |
+ OBD_CONNECT2_INC_XID |
OBD_CONNECT2_LSOM |
OBD_CONNECT2_ASYNC_DISCARD |
OBD_CONNECT2_PCC;
data->ocd_connect_flags |= OBD_CONNECT_LOCKAHEAD_OLD;
#endif
- data->ocd_connect_flags2 = OBD_CONNECT2_LOCKAHEAD;
+ data->ocd_connect_flags2 = OBD_CONNECT2_LOCKAHEAD |
+ OBD_CONNECT2_INC_XID;
if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_GRANT_PARAM))
data->ocd_connect_flags |= OBD_CONNECT_GRANT_PARAM;
* If the xid matches, then we know this is a resent request, and allow
* it. (It's probably an OPEN, for which we don't send a lock.
*/
- if (req_can_reconstruct(req, NULL))
+ if (req_can_reconstruct(req, NULL) == 1)
return;
/*
exp->exp_connecting = 1;
spin_unlock(&exp->exp_lock);
+ OBD_ALLOC(exp->exp_used_slots,
+ BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+ if (exp->exp_used_slots == NULL)
+ RETURN(-ENOMEM);
+
/* self-export doesn't need client data and ldlm initialization */
if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
&exp->exp_client_uuid)))
err_free:
tgt_client_free(exp);
err:
+ OBD_FREE(exp->exp_used_slots,
+ BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+ exp->exp_used_slots = NULL;
+
CERROR("%s: Failed to initialize export: rc = %d\n",
exp->exp_obd->obd_name, rc);
return rc;
ENTRY;
target_destroy_export(exp);
+ if (exp->exp_used_slots)
+ OBD_FREE(exp->exp_used_slots,
+ BITS_TO_LONGS(OBD_MAX_RIF_MAX) * sizeof(long));
+
/* destroy can be called from failed obd_setup, so
* checking uuid is safer than obd_self_export */
if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
if (info->mti_reply_data == NULL)
RETURN(-ENOMEM);
- if (req_can_reconstruct(req, info->mti_reply_data)) {
+ rc = req_can_reconstruct(req, info->mti_reply_data);
+ if (rc == 1) {
reconstruct(info, lhc);
- rc = 1;
} else {
DEBUG_REQ(D_HA, req,
- "no reply data found for RESENT req");
- rc = 0;
+ "no reply data found for RESENT req, rc = %d",
+ rc);
}
OBD_FREE_PTR(info->mti_reply_data);
info->mti_reply_data = NULL;
if (info->mti_dlm_req)
ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
+ OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
+
repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
if (IS_ERR(mo))
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
RETURN(err_serious(-ENOENT));
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE)) {
+ req->rq_no_reply = 1;
+ RETURN(err_serious(-ENOENT));
+ }
+
if (info->mti_dlm_req)
ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
spin_unlock(&cli->cl_mod_rpcs_lock);
/* tag 0 is reserved for non-modify RPCs */
+
+ CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
+ "opc %u, max %hu\n",
+ cli->cl_import->imp_obd->obd_name,
+ i + 1, opc, max);
+
return i + 1;
}
spin_unlock(&cli->cl_mod_rpcs_lock);
static atomic64_t ptlrpc_last_xid;
+void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
+{
+ spin_lock(&req->rq_import->imp_lock);
+ list_del_init(&req->rq_unreplied_list);
+ ptlrpc_assign_next_xid_nolock(req);
+ spin_unlock(&req->rq_import->imp_lock);
+ DEBUG_REQ(D_RPCTRACE, req, "reassign xid");
+}
+EXPORT_SYMBOL(ptlrpc_reassign_next_xid);
+
int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
__u32 version, int opcode, char **bufs,
struct ptlrpc_cli_ctx *ctx)
#include <lu_object.h>
#include <uapi/linux/lnet/lnet-types.h>
#include "ptlrpc_internal.h"
+#include <linux/delay.h>
/* The following are visible and mutable through /sys/module/ptlrpc */
int test_req_buffer_pressure = 0;
}
}
+static void ptlrpc_add_exp_list_nolock(struct ptlrpc_request *req,
+ struct obd_export *export, bool hp)
+{
+ __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+ if (hp)
+ list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
+ else
+ list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+ if (tag && export->exp_used_slots)
+ set_bit(tag - 1, export->exp_used_slots);
+}
+
+static void ptlrpc_del_exp_list(struct ptlrpc_request *req)
+{
+ __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+ spin_lock(&req->rq_export->exp_rpc_lock);
+ list_del_init(&req->rq_exp_list);
+ if (tag && !req->rq_obsolete && req->rq_export->exp_used_slots)
+ clear_bit(tag - 1, req->rq_export->exp_used_slots);
+ spin_unlock(&req->rq_export->exp_rpc_lock);
+}
+
/** Change request export and move hp request from old export to new */
void ptlrpc_request_change_export(struct ptlrpc_request *req,
struct obd_export *export)
if (req->rq_export != NULL) {
LASSERT(!list_empty(&req->rq_exp_list));
/* remove rq_exp_list from last export */
- spin_lock(&req->rq_export->exp_rpc_lock);
- list_del_init(&req->rq_exp_list);
- spin_unlock(&req->rq_export->exp_rpc_lock);
- /*
- * export has one reference already, so it`s safe to
+ ptlrpc_del_exp_list(req);
+ /* export has one reference already, so it's safe to
* add req to export queue here and get another
* reference for request later
*/
spin_lock(&export->exp_rpc_lock);
- if (req->rq_ops != NULL) /* hp request */
- list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
- else
- list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+ ptlrpc_add_exp_list_nolock(req, export, req->rq_ops != NULL);
spin_unlock(&export->exp_rpc_lock);
class_export_rpc_dec(req->rq_export);
return tmp;
}
+#ifdef HAVE_SERVER_SUPPORT
+static void ptlrpc_server_mark_obsolete(struct ptlrpc_request *req)
+{
+ req->rq_obsolete = 1;
+}
+
+static void
+ptlrpc_server_mark_in_progress_obsolete(struct ptlrpc_request *req)
+{
+ struct ptlrpc_request *tmp = NULL;
+ __u16 tag;
+
+ if (!tgt_is_increasing_xid_client(req->rq_export) ||
+ req->rq_export->exp_used_slots == NULL)
+ return;
+
+ tag = lustre_msg_get_tag(req->rq_reqmsg);
+ if (tag == 0)
+ return;
+
+ if (!test_bit(tag - 1, req->rq_export->exp_used_slots))
+ return;
+
+ /* This list should not be longer than max_requests in
+ * flights on the client, so it is not all that long.
+ * Also we only hit this codepath in case of a resent
+ * request which makes it even more rarely hit */
+ list_for_each_entry(tmp, &req->rq_export->exp_reg_rpcs, rq_exp_list) {
+ if (tag == lustre_msg_get_tag(tmp->rq_reqmsg) &&
+ req->rq_xid > tmp->rq_xid)
+ ptlrpc_server_mark_obsolete(tmp);
+
+ }
+ list_for_each_entry(tmp, &req->rq_export->exp_hp_rpcs, rq_exp_list) {
+ if (tag == lustre_msg_get_tag(tmp->rq_reqmsg) &&
+ req->rq_xid > tmp->rq_xid)
+ ptlrpc_server_mark_obsolete(tmp);
+ }
+}
+#endif
+
/**
* Check if a request should be assigned with a high priority.
*
if (req->rq_ops && req->rq_ops->hpreq_fini)
req->rq_ops->hpreq_fini(req);
- spin_lock(&req->rq_export->exp_rpc_lock);
- list_del_init(&req->rq_exp_list);
- spin_unlock(&req->rq_export->exp_rpc_lock);
+ ptlrpc_del_exp_list(req);
}
EXIT;
}
hp = rc > 0;
ptlrpc_nrs_req_initialize(svcpt, req, hp);
- if (req->rq_export != NULL) {
+ while (req->rq_export != NULL) {
struct obd_export *exp = req->rq_export;
/*
* atomically
*/
spin_lock_bh(&exp->exp_rpc_lock);
+#ifdef HAVE_SERVER_SUPPORT
+ ptlrpc_server_mark_in_progress_obsolete(req);
+#endif
orig = ptlrpc_server_check_resend_in_progress(req);
+ if (orig && OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE)) {
+ spin_unlock_bh(&exp->exp_rpc_lock);
+
+ OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
+ msleep(4 * MSEC_PER_SEC);
+ continue;
+ }
+
if (orig && likely(atomic_inc_not_zero(&orig->rq_refcount))) {
bool linked;
ptlrpc_at_add_timed(orig);
ptlrpc_server_drop_request(orig);
ptlrpc_nrs_req_finalize(req);
+
+ /* don't mark slot unused for resend in progress */
+ req->rq_obsolete = 1;
+
RETURN(-EBUSY);
}
- if (hp || req->rq_ops != NULL)
- list_add(&req->rq_exp_list, &exp->exp_hp_rpcs);
- else
- list_add(&req->rq_exp_list, &exp->exp_reg_rpcs);
+ ptlrpc_add_exp_list_nolock(req, exp, hp || req->rq_ops != NULL);
+
spin_unlock_bh(&exp->exp_rpc_lock);
+ break;
}
/*
/* sanity check: if the xid matches, the request must be marked as a
* resent or replayed */
- if (req_can_reconstruct(req, NULL)) {
+ if (req_can_reconstruct(req, NULL) == 1) {
if (!(lustre_msg_get_flags(req->rq_reqmsg) &
(MSG_RESENT | MSG_REPLAY))) {
DEBUG_REQ(D_WARNING, req,
static int process_req_last_xid(struct ptlrpc_request *req)
{
__u64 last_xid;
+ int rc = 0;
+ struct obd_export *exp = req->rq_export;
+ struct tg_export_data *ted = &exp->exp_target_data;
+ bool need_lock = tgt_is_multimodrpcs_client(exp);
ENTRY;
+ if (need_lock)
+ mutex_lock(&ted->ted_lcd_lock);
/* check request's xid is consistent with export's last_xid */
last_xid = lustre_msg_get_last_xid(req->rq_reqmsg);
- if (last_xid > req->rq_export->exp_last_xid)
- req->rq_export->exp_last_xid = last_xid;
+ if (last_xid > exp->exp_last_xid)
+ exp->exp_last_xid = last_xid;
- if (req->rq_xid == 0 ||
- (req->rq_xid <= req->rq_export->exp_last_xid)) {
- DEBUG_REQ(D_WARNING, req,
- "unexpected rq_xid=%llx != exp_last_xid=%llx",
- req->rq_xid, req->rq_export->exp_last_xid);
+ if (req->rq_xid == 0 || req->rq_xid <= exp->exp_last_xid) {
/* Some request is allowed to be sent during replay,
* such as OUT update requests, FLD requests, so it
* is possible that replay requests has smaller XID
* - The former RPC got chance to be processed;
*/
if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
- RETURN(-EPROTO);
+ rc = -EPROTO;
+
+ DEBUG_REQ(D_WARNING, req,
+ "unexpected xid=%llx != exp_last_xid=%llx, rc = %d",
+ req->rq_xid, exp->exp_last_xid, rc);
+ if (rc)
+ GOTO(out, rc);
}
/* The "last_xid" is the minimum xid among unreplied requests,
*/
if (req->rq_export->exp_conn_cnt >
lustre_msg_get_conn_cnt(req->rq_reqmsg))
- RETURN(-ESTALE);
+ GOTO(out, rc = -ESTALE);
/* try to release in-memory reply data */
- if (tgt_is_multimodrpcs_client(req->rq_export)) {
- tgt_handle_received_xid(req->rq_export,
- lustre_msg_get_last_xid(req->rq_reqmsg));
- if (!(lustre_msg_get_flags(req->rq_reqmsg) &
- (MSG_RESENT | MSG_REPLAY)))
- tgt_handle_tag(req->rq_export,
- lustre_msg_get_tag(req->rq_reqmsg));
+ if (tgt_is_multimodrpcs_client(exp)) {
+ tgt_handle_received_xid(exp, last_xid);
+ rc = tgt_handle_tag(req);
}
- RETURN(0);
+
+out:
+ if (need_lock)
+ mutex_unlock(&ted->ted_lcd_lock);
+
+ RETURN(rc);
}
int tgt_request_handle(struct ptlrpc_request *req)
/* Check if request can be reconstructed from saved reply data
* A copy of the reply data is returned in @trd if the pointer is not NULL
*/
-bool req_can_reconstruct(struct ptlrpc_request *req,
+int req_can_reconstruct(struct ptlrpc_request *req,
struct tg_reply_data *trd)
{
struct tg_export_data *ted = &req->rq_export->exp_target_data;
struct lsd_client_data *lcd = ted->ted_lcd;
- bool found;
+ int found;
if (tgt_is_multimodrpcs_client(req->rq_export))
return tgt_lookup_reply(req, trd);
int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
void *cookie);
int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid);
-int tgt_handle_tag(struct obd_export *exp, __u16 tag);
+int tgt_handle_tag(struct ptlrpc_request *req);
void update_records_dump(const struct update_records *records,
unsigned int mask, bool dump_updates);
}
EXPORT_SYMBOL(tgt_client_del);
+static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag)
+{
+ struct tg_export_data *ted = &exp->exp_target_data;
+ struct lu_target *lut = class_exp2tgt(exp);
+ struct tg_reply_data *trd, *tmp;
+
+ if (tag == 0)
+ return;
+
+ list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
+ if (trd->trd_tag != tag)
+ continue;
+
+ LASSERT(ergo(tgt_is_increasing_xid_client(exp),
+ trd->trd_reply.lrd_xid <= xid));
+
+ ted->ted_release_tag++;
+ tgt_release_reply_data(lut, ted, trd);
+ }
+}
+
int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
struct tg_export_data *ted, struct tg_reply_data *trd,
+ struct ptlrpc_request *req,
struct thandle *th, bool update_lrd_file)
{
struct lsd_reply_data *lrd;
}
/* add reply data to target export's reply list */
mutex_lock(&ted->ted_lcd_lock);
+ if (req != NULL) {
+ int exclude = tgt_is_increasing_xid_client(req->rq_export) ?
+ MSG_REPLAY : MSG_REPLAY|MSG_RESENT;
+
+ if (req->rq_obsolete) {
+ mutex_unlock(&ted->ted_lcd_lock);
+ RETURN(-EALREADY);
+ }
+
+ if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude))
+ tgt_clean_by_tag(req->rq_export, req->rq_xid,
+ trd->trd_tag);
+ }
list_add(&trd->trd_list, &ted->ted_reply_list);
ted->ted_reply_cnt++;
if (ted->ted_reply_cnt > ted->ted_reply_max)
CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
"tag %hu, client gen %u, slot idx %d\n",
trd, lrd->lrd_xid, lrd->lrd_transno,
- trd->trd_tag, lrd->lrd_client_gen, i);
+ trd->trd_tag, lrd->lrd_client_gen, trd->trd_index);
+
RETURN(0);
}
EXPORT_SYMBOL(tgt_add_reply_data);
trd->trd_pre_versions[3] = pre_versions[3];
}
- rc = tgt_add_reply_data(env, tgt, ted, trd, th, write_update);
+ rc = tgt_add_reply_data(env, tgt, ted, trd, req,
+ th, write_update);
if (rc < 0)
OBD_FREE_PTR(trd);
return rc;
return rc;
}
-struct tg_reply_data *tgt_lookup_reply_by_xid(struct tg_export_data *ted,
- __u64 xid)
+static int tgt_check_lookup_req(struct ptlrpc_request *req, int lookup,
+ struct tg_reply_data *trd)
{
- struct tg_reply_data *found = NULL;
- struct tg_reply_data *reply;
+ struct tg_export_data *ted = &req->rq_export->exp_target_data;
+ struct lu_target *lut = class_exp2tgt(req->rq_export);
+ __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+ int rc = 0;
+ struct tg_reply_data *reply;
+ bool check_increasing;
+
+ if (tag == 0)
+ return 0;
+
+ check_increasing = tgt_is_increasing_xid_client(req->rq_export) &&
+ !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
+ if (!lookup && !check_increasing)
+ return 0;
- mutex_lock(&ted->ted_lcd_lock);
list_for_each_entry(reply, &ted->ted_reply_list, trd_list) {
- if (reply->trd_reply.lrd_xid == xid) {
- found = reply;
+ if (lookup && reply->trd_reply.lrd_xid == req->rq_xid) {
+ rc = 1;
+ if (trd != NULL)
+ *trd = *reply;
+ break;
+ } else if (check_increasing && reply->trd_tag == tag &&
+ reply->trd_reply.lrd_xid > req->rq_xid) {
+ rc = -EPROTO;
+ CERROR("%s: busy tag=%u req_xid=%llu, trd=%p: xid=%llu transno=%llu client_gen=%u slot_idx=%d: rc = %d\n",
+ tgt_name(lut), tag, req->rq_xid, trd,
+ reply->trd_reply.lrd_xid,
+ reply->trd_reply.lrd_transno,
+ reply->trd_reply.lrd_client_gen,
+ reply->trd_index, rc);
break;
}
}
- mutex_unlock(&ted->ted_lcd_lock);
- return found;
+
+ return rc;
}
-EXPORT_SYMBOL(tgt_lookup_reply_by_xid);
/* Look for a reply data matching specified request @req
* A copy is returned in @trd if the pointer is not NULL
*/
-bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
+int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
{
- struct tg_export_data *ted = &req->rq_export->exp_target_data;
- struct tg_reply_data *reply;
- bool found = false;
-
- reply = tgt_lookup_reply_by_xid(ted, req->rq_xid);
- if (reply != NULL) {
- found = true;
- if (trd != NULL)
- *trd = *reply;
+ struct tg_export_data *ted = &req->rq_export->exp_target_data;
+ int found = 0;
+ bool not_replay = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
+
+ mutex_lock(&ted->ted_lcd_lock);
+ if (not_replay && req->rq_xid <= req->rq_export->exp_last_xid) {
+ /* A check for the last_xid is needed here in case there is
+ * no reply data is left in the list. It may happen if another
+ * RPC on another slot increased the last_xid between our
+ * process_req_last_xid & tgt_lookup_reply calls */
+ found = -EPROTO;
+ } else {
+ found = tgt_check_lookup_req(req, 1, trd);
}
+ mutex_unlock(&ted->ted_lcd_lock);
- CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d\n",
- tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid,
- found ? 1 : 0);
+ CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d last_xid %llu\n",
+ tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid, found,
+ req->rq_export->exp_last_xid);
return found;
}
struct lu_target *lut = class_exp2tgt(exp);
struct tg_reply_data *trd, *tmp;
- mutex_lock(&ted->ted_lcd_lock);
+
list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
if (trd->trd_reply.lrd_xid > rcvd_xid)
continue;
ted->ted_release_xid++;
tgt_release_reply_data(lut, ted, trd);
}
- mutex_unlock(&ted->ted_lcd_lock);
return 0;
}
-int tgt_handle_tag(struct obd_export *exp, __u16 tag)
+int tgt_handle_tag(struct ptlrpc_request *req)
{
- struct tg_export_data *ted = &exp->exp_target_data;
- struct lu_target *lut = class_exp2tgt(exp);
- struct tg_reply_data *trd, *tmp;
-
- if (tag == 0)
- return 0;
-
- mutex_lock(&ted->ted_lcd_lock);
- list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
- if (trd->trd_tag != tag)
- continue;
- ted->ted_release_tag++;
- tgt_release_reply_data(lut, ted, trd);
- break;
- }
- mutex_unlock(&ted->ted_lcd_lock);
-
- return 0;
+ return tgt_check_lookup_req(req, 0, NULL);
}
}
run_test 136 "changelog_deregister leaving pending records"
+test_137() {
+ df $DIR
+ mkdir -p $DIR/d1
+ mkdir -p $DIR/d2
+ dd if=/dev/zero of=$DIR/d1/$tfile bs=4096 count=1
+ dd if=/dev/zero of=$DIR/d2/$tfile bs=4096 count=1
+ cancel_lru_locks osc
+
+ #define OBD_FAIL_PTLRPC_RESEND_RACE 0x525
+ do_facet $SINGLEMDS "lctl set_param fail_loc=0x80000525"
+
+ # RPC1: any reply is to be delayed to disable last_xid logic
+ ln $DIR/d1/$tfile $DIR/d1/f2 &
+ sleep 1
+
+ # RPC2: setattr1 reply is delayed & resent
+ # original reply comes to client; the resend get asleep
+ chmod 666 $DIR/d2/$tfile
+
+ # RPC3: setattr2 on the same file; run ahead of RPC2 resend
+ chmod 777 $DIR/d2/$tfile
+
+ # RPC2 resend wakes up
+ sleep 5
+ [ $(stat -c "%a" $DIR/d2/$tfile) == 777 ] || error "resend got applied"
+}
+run_test 137 "late resend must be skipped if already applied"
+
complete $SECONDS
check_and_cleanup_lustre
exit_status
}
run_test 421g "rmfid to return errors properly"
+test_422() {
+ test_mkdir -i 0 -c 1 -p $DIR/$tdir/d1
+ test_mkdir -i 0 -c 1 -p $DIR/$tdir/d2
+ test_mkdir -i 0 -c 1 -p $DIR/$tdir/d3
+ dd if=/dev/zero of=$DIR/$tdir/d1/file1 bs=1k count=1
+ dd if=/dev/zero of=$DIR/$tdir/d2/file1 bs=1k count=1
+
+ local amc=$(at_max_get client)
+ local amo=$(at_max_get mds1)
+ local timeout=`lctl get_param -n timeout`
+
+ at_max_set 0 client
+ at_max_set 0 mds1
+
+#define OBD_FAIL_PTLRPC_PAUSE_REQ 0x50a
+ do_facet mds1 $LCTL set_param fail_loc=0x8000050a \
+ fail_val=$(((2*timeout + 10)*1000))
+ touch $DIR/$tdir/d3/file &
+ sleep 2
+#define OBD_FAIL_TGT_REPLY_DATA_RACE 0x722
+ do_facet mds1 $LCTL set_param fail_loc=0x80000722 \
+ fail_val=$((2*timeout + 5))
+ mv $DIR/$tdir/d1/file1 $DIR/$tdir/d1/file2 &
+ local pid=$!
+ sleep 1
+ kill -9 $pid
+ sleep $((2 * timeout))
+ echo kill $pid
+ kill -9 $pid
+ lctl mark touch
+ touch $DIR/$tdir/d2/file3
+ touch $DIR/$tdir/d2/file4
+ touch $DIR/$tdir/d2/file5
+
+ wait
+ at_max_set $amc client
+ at_max_set $amo mds1
+}
+run_test 422 "kill a process with RPC in progress"
+
prep_801() {
[[ $(lustre_version_code mds1) -lt $(version_code 2.9.55) ]] ||
[[ $OST1_VERSION -lt $(version_code 2.9.55) ]] &&