+/**
+ * Serializes in-flight MDT-modifying RPC requests to preserve idempotency.
+ *
+ * This mutex is used to implement execute-once semantics on the MDT.
+ * The MDT stores the last transaction ID and result for every client in
+ * its last_rcvd file. If the client doesn't get a reply, it can safely
+ * resend the request and the MDT will reconstruct the reply being aware
+ * that the request has already been executed. Without this lock,
+ * execution status of concurrent in-flight requests would be
+ * overwritten.
+ *
+ * This imlpementation limits the extent to which we can keep a full pipeline
+ * of in-flight requests from a single client. This limitation can be
+ * overcome by allowing multiple slots per client in the last_rcvd file,
+ * see LU-6864.
+ */
+#define OSP_FAKE_RPCL_IT ((void *)0x2c0012bfUL)
+
+static inline void osp_init_rpc_lock(struct osp_device *osp)
+{
+ struct osp_rpc_lock *lck = &osp->opd_rpc_lock;
+
+ mutex_init(&lck->rpcl_mutex);
+ lck->rpcl_fakes = 0;
+}
+
+static inline void osp_get_rpc_lock(struct osp_device *osp)
+{
+ struct osp_rpc_lock *lck = &osp->opd_rpc_lock;
+
+ /* This would normally block until the existing request finishes.
+ * If fail_loc is set it will block until the regular request is
+ * done, then increment rpcl_fakes. Once that is non-zero it
+ * will only be cleared when all fake requests are finished.
+ * Only when all fake requests are finished can normal requests
+ * be sent, to ensure they are recoverable again.
+ */
+ again:
+ mutex_lock(&lck->rpcl_mutex);
+
+ if (CFS_FAIL_CHECK_QUIET(OBD_FAIL_MDC_RPCS_SEM) ||
+ CFS_FAIL_CHECK_QUIET(OBD_FAIL_OSP_RPCS_SEM)) {
+ lck->rpcl_fakes++;
+ mutex_unlock(&lck->rpcl_mutex);
+
+ return;
+ }
+
+ /* This will only happen when the CFS_FAIL_CHECK() was just turned
+ * off but there are still requests in progress. Wait until they
+ * finish. It doesn't need to be efficient in this extremely rare
+ * case, just have low overhead in the common case when it isn't true.
+ */
+ if (unlikely(lck->rpcl_fakes)) {
+ mutex_unlock(&lck->rpcl_mutex);
+ schedule_timeout(cfs_time_seconds(1) / 4);
+
+ goto again;
+ }
+}
+
+static inline void osp_put_rpc_lock(struct osp_device *osp)
+{
+ struct osp_rpc_lock *lck = &osp->opd_rpc_lock;
+
+ if (lck->rpcl_fakes) { /* OBD_FAIL_OSP_RPCS_SEM */
+ mutex_lock(&lck->rpcl_mutex);
+
+ if (lck->rpcl_fakes) /* check again under lock */
+ lck->rpcl_fakes--;
+ }
+
+ mutex_unlock(&lck->rpcl_mutex);
+}
+
+static inline int osp_fid_diff(const struct lu_fid *fid1,
+ const struct lu_fid *fid2)
+{
+ /* In 2.6+ ost_idx is packed into IDIF FID, while in 2.4 and 2.5 IDIF
+ * is always FID_SEQ_IDIF(0x100000000ULL), which does not include OST
+ * index in the seq. So we can not compare IDIF FID seq here */
+ if (fid_is_idif(fid1) && fid_is_idif(fid2)) {
+ __u32 ost_idx1 = fid_idif_ost_idx(fid1);
+ __u32 ost_idx2 = fid_idif_ost_idx(fid2);
+
+ LASSERTF(ost_idx1 == 0 || ost_idx2 == 0 || ost_idx1 == ost_idx2,
+ "fid1: "DFID", fid2: "DFID"\n", PFID(fid1),
+ PFID(fid2));
+
+ return fid_idif_id(fid1->f_seq, fid1->f_oid, 0) -
+ fid_idif_id(fid2->f_seq, fid2->f_oid, 0);
+ }
+
+ LASSERTF(fid_seq(fid1) == fid_seq(fid2), "fid1:"DFID", fid2:"DFID"\n",
+ PFID(fid1), PFID(fid2));
+
+ return fid_oid(fid1) - fid_oid(fid2);
+}
+
+static inline void osp_fid_to_obdid(struct lu_fid *last_fid, u64 *osi_id)
+{
+ if (fid_is_idif((last_fid)))
+ *osi_id = fid_idif_id(fid_seq(last_fid), fid_oid(last_fid),
+ fid_ver(last_fid));
+ else
+ *osi_id = fid_oid(last_fid);
+}