#define OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK 0x51c
#define OBD_FAIL_PTLRPC_CLIENT_BULK_CB3 0x520
#define OBD_FAIL_PTLRPC_BULK_ATTACH 0x521
+#define OBD_FAIL_PTLRPC_ROUND_XID 0x530
#define OBD_FAIL_PTLRPC_CONNECT_RACE 0x531
#define OBD_FAIL_OBD_PING_NET 0x600
struct distribute_txn_replay_req *dtrq;
dtrq = distribute_txn_lookup_finish_list(tdtd,
- req->rq_xid);
+ transno);
LASSERT(dtrq != NULL);
spin_lock(&tdtd->tdtd_replay_list_lock);
list_del_init(&dtrq->dtrq_list);
}
LASSERT(trd->trd_processing_task == current_pid());
- DEBUG_REQ(D_HA, req, "processing t%lld from %s",
+ DEBUG_REQ(D_HA, req, "processing x%llu t%lld from %s",
+ req->rq_xid,
lustre_msg_get_transno(req->rq_reqmsg),
libcfs_nid2str(req->rq_peer.nid));
spin_unlock(&req->rq_import->imp_lock);
}
+static __u64 ptlrpc_last_xid;
+static spinlock_t ptlrpc_last_xid_lock;
+
int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
__u32 version, int opcode, char **bufs,
struct ptlrpc_cli_ctx *ctx)
ptlrpc_at_set_req_timeout(request);
lustre_msg_set_opc(request->rq_reqmsg, opcode);
- ptlrpc_assign_next_xid(request);
/* Let's setup deadline for req/reply/bulk unlink for opcode. */
if (cfs_fail_val == opcode) {
else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) {
fail_t = &request->rq_reply_deadline;
fail2_t = &request->rq_bulk_deadline;
+ } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_ROUND_XID)) {
+ time64_t now = ktime_get_real_seconds();
+ spin_lock(&ptlrpc_last_xid_lock);
+ ptlrpc_last_xid = ((__u64)now >> 4) << 24;
+ spin_unlock(&ptlrpc_last_xid_lock);
}
if (fail_t) {
msleep(4 * MSEC_PER_SEC);
}
}
+ ptlrpc_assign_next_xid(request);
RETURN(0);
}
}
-static __u64 ptlrpc_last_xid;
-static spinlock_t ptlrpc_last_xid_lock;
-
/**
* Initialize the XID for the node. This is common among all requests on
* this node, and only requires the property that it is monotonically
tti->tti_tea.ta_handle,
tti->tti_u.update.tti_update_reply,
tti->tti_u.update.tti_update_reply_index);
+
+ CDEBUG(D_INFO, "%s: "DFID" index insert %s: rc = %d\n",
+ tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
+ name, rc);
+
RETURN(rc);
}
struct distribute_txn_replay_req_sub *tmp;
LASSERT(list_empty(&dtrq->dtrq_list));
+ CDEBUG(D_HA, "destroy x%llu t%llu\n", dtrq->dtrq_xid,
+ dtrq->dtrq_master_transno);
spin_lock(&dtrq->dtrq_sub_list_lock);
list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
struct sub_thandle_cookie *stc;
struct distribute_txn_replay_req *
distribute_txn_lookup_finish_list(struct target_distribute_txn_data *tdtd,
- __u64 xid)
+ __u64 transno)
{
struct distribute_txn_replay_req *dtrq = NULL;
struct distribute_txn_replay_req *iter;
spin_lock(&tdtd->tdtd_replay_list_lock);
list_for_each_entry(iter, &tdtd->tdtd_replay_finish_list, dtrq_list) {
- if (iter->dtrq_xid == xid) {
+ if (iter->dtrq_master_transno == transno) {
dtrq = iter;
break;
}
if (tgt->lut_tdtd == NULL)
return false;
- dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd, req->rq_xid);
+ dtrq = distribute_txn_lookup_finish_list(tgt->lut_tdtd,
+ lustre_msg_get_transno(req->rq_reqmsg));
if (dtrq == NULL)
return false;
lrd->lrd_result = le32_to_cpu(lrd->lrd_result);
lrd->lrd_client_gen = le32_to_cpu(lrd->lrd_client_gen);
+ CDEBUG(D_HA, "xid=%llu transno=%llu\n", lrd->lrd_xid, lrd->lrd_transno);
if (lrd->lrd_transno != tgt_th_info(env)->tti_transno)
return;
}
run_test 28 "lock replay should be ordered: waiting after granted"
+test_29() {
+ local dir0=$DIR/$tdir/d0
+ local dir1=$DIR/$tdir/d1
+
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+ [ $CLIENTCOUNT -lt 2 ] && skip "needs >= 2 clients" && return 0
+ [ "$CLIENT1" == "$CLIENT2" ] &&
+ skip "clients must be on different nodes" && return 0
+
+ mkdir -p $DIR/$tdir
+ $LFS mkdir -i0 $dir0
+ $LFS mkdir -i1 $dir1
+ sync
+
+ replay_barrier mds2
+ # create a remote dir, drop reply
+ #define OBD_FAIL_PTLRPC_ROUND_XID 0x530
+ $LCTL set_param fail_loc=0x530 fail_val=36
+ #define OBD_FAIL_MDS_REINT_MULTI_NET_REP 0x15a
+ do_facet mds2 $LCTL set_param fail_loc=0x8000015a
+ echo make remote dir d0 for $dir0
+ $LFS mkdir -i1 -c1 $dir0/d3 &
+ sleep 1
+
+ echo make local dir d1 for $dir1
+ do_node $CLIENT2 $LCTL set_param fail_loc=0x530 fail_val=36
+ do_node $CLIENT2 mkdir $dir1/d4
+
+ fail mds2
+}
+run_test 29 "replay vs update with the same xid"
+
complete $SECONDS
SLEEP=$((SECONDS - $NOW))
[ $SLEEP -lt $TIMEOUT ] && sleep $SLEEP