if (oti == NULL)
return;
- if (req->rq_repmsg)
+ if (req->rq_repmsg) {
+ __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
+ versions[0] = oti->oti_pre_version;
+ lustre_msg_set_versions(req->rq_repmsg, versions);
+ }
req->rq_transno = oti->oti_transno;
/* XXX 4 == entries in oti_ack_locks??? */
ENTRY;
LASSERT(!lustre_handle_is_used(lh));
- LASSERT((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) ==
- (OBD_MD_FLID | OBD_MD_FLGROUP));
+ /* o_id and o_gr are used for localizing resource, if client miss to set
+ * them, do not trigger ASSERTION. */
+ if (unlikely((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) !=
+ (OBD_MD_FLID | OBD_MD_FLGROUP)))
+ RETURN(-EPROTO);
if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
!(oa->o_flags & OBD_FL_SRVLOCK))
RETURN(opd.opd_lock_match);
}
+/* Allocate thread local buffers if needed */
+static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
+{
+ struct ost_thread_local_cache *tls =
+ (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
+
+ /* In normal mode of operation an I/O request is serviced only
+ * by ll_ost_io threads each of them has own tls buffers allocated by
+ * ost_thread_init().
+ * During recovery, an I/O request may be queued until any of the ost
+ * service threads process it. Not necessary it should be one of
+ * ll_ost_io threads. In that case we dynamically allocating tls
+ * buffers for the request service time. */
+ if (unlikely(tls == NULL)) {
+ LASSERT(r->rq_export->exp_in_recovery);
+ OBD_ALLOC_PTR(tls);
+ if (tls != NULL) {
+ tls->temporary = 1;
+ r->rq_svc_thread->t_data = tls;
+ }
+ }
+ return tls;
+}
+
+/* Free thread local buffers if they were allocated only for servicing
+ * this one request */
+static void ost_tls_put(struct ptlrpc_request *r)
+{
+ struct ost_thread_local_cache *tls =
+ (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
+
+ if (unlikely(tls->temporary)) {
+ OBD_FREE_PTR(tls);
+ r->rq_svc_thread->t_data = NULL;
+ }
+}
+
static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
struct ptlrpc_bulk_desc *desc = NULL;
struct lustre_handle lockh = { 0 };
int niocount, npages, nob = 0, rc, i;
int no_reply = 0;
+ struct ost_thread_local_cache *tls;
ENTRY;
req->rq_bulk_read = 1;
if (rc)
GOTO(out, rc);
- /*
- * Per-thread array of struct niobuf_{local,remote}'s was allocated by
- * ost_thread_init().
- */
- local_nb = ost_tls(req)->local;
+ tls = ost_tls_get(req);
+ if (tls == NULL)
+ GOTO(out_bulk, rc = -ENOMEM);
+ local_nb = tls->local;
rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
if (rc != 0)
- GOTO(out_bulk, rc);
+ GOTO(out_tls, rc);
/*
* If getting the lock took more time than
out_lock:
ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
+out_tls:
+ ost_tls_put(req);
out_bulk:
if (desc)
ptlrpc_free_bulk(desc);
cksum_type_t cksum_type = OBD_CKSUM_CRC32;
int no_reply = 0;
__u32 o_uid = 0, o_gid = 0;
+ struct ost_thread_local_cache *tls;
ENTRY;
req->rq_bulk_write = 1;
OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val);
rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
- /*
- * Per-thread array of struct niobuf_{local,remote}'s was allocated by
- * ost_thread_init().
- */
- local_nb = ost_tls(req)->local;
+ tls = ost_tls_get(req);
+ if (tls == NULL)
+ GOTO(out_bulk, rc = -ENOMEM);
+ local_nb = tls->local;
rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
if (rc != 0)
- GOTO(out_bulk, rc);
+ GOTO(out_tls, rc);
/*
* If getting the lock took more time than
out_lock:
ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
+out_tls:
+ ost_tls_put(req);
out_bulk:
if (desc)
ptlrpc_free_bulk(desc);
out:
+ /* XXX: don't send reply if obd rdonly mode, this can cause data loss
+ * on client, see bug 22190. Remove this when async bulk will be done.
+ * Meanwhile, if this is umount then don't reply anything. */
+ if (req->rq_export->exp_obd->obd_no_transno) {
+ no_reply = req->rq_export->exp_obd->obd_stopping;
+ rc = -EIO;
+ }
+
if (rc == 0) {
oti_to_request(oti, req);
target_committed_to_req(req);