RETURN(opd.opd_lock_match);
}
+/* Allocate thread local buffers if needed */
+static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
+{
+ struct ost_thread_local_cache *tls =
+ (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
+
+ /* In normal mode of operation an I/O request is serviced only
+ * by ll_ost_io threads each of them has own tls buffers allocated by
+ * ost_thread_init().
+ * During recovery, an I/O request may be queued until any of the ost
+ * service threads process it. Not necessary it should be one of
+ * ll_ost_io threads. In that case we dynamically allocating tls
+ * buffers for the request service time. */
+ if (unlikely(tls == NULL)) {
+ LASSERT(r->rq_export->exp_in_recovery);
+ OBD_ALLOC_PTR(tls);
+ if (tls != NULL) {
+ tls->temporary = 1;
+ r->rq_svc_thread->t_data = tls;
+ }
+ }
+ return tls;
+}
+
+/* Free thread local buffers if they were allocated only for servicing
+ * this one request */
+static void ost_tls_put(struct ptlrpc_request *r)
+{
+ struct ost_thread_local_cache *tls =
+ (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
+
+ if (unlikely(tls->temporary)) {
+ OBD_FREE_PTR(tls);
+ r->rq_svc_thread->t_data = NULL;
+ }
+}
+
static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
{
struct ptlrpc_bulk_desc *desc = NULL;
__u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
int niocount, npages, nob = 0, rc, i;
int no_reply = 0;
+ struct ost_thread_local_cache *tls;
ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
if (rc)
GOTO(out, rc);
- /*
- * Per-thread array of struct niobuf_{local,remote}'s was allocated by
- * ost_thread_init().
- */
- local_nb = ost_tls(req)->local;
+ tls = ost_tls_get(req);
+ if (tls == NULL)
+ GOTO(out_bulk, rc = -ENOMEM);
+ local_nb = tls->local;
rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
if (rc != 0)
- GOTO(out_bulk, rc);
+ GOTO(out_tls, rc);
/*
* If getting the lock took more time than
out_lock:
ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
+ out_tls:
+ ost_tls_put(req);
out_bulk:
if (desc)
ptlrpc_free_bulk(desc);
obd_count client_cksum = 0, server_cksum = 0;
cksum_type_t cksum_type = OBD_CKSUM_CRC32;
int no_reply = 0;
+ struct ost_thread_local_cache *tls;
ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
niocount * sizeof(*rcs));
- /*
- * Per-thread array of struct niobuf_{local,remote}'s was allocated by
- * ost_thread_init().
- */
- local_nb = ost_tls(req)->local;
+ tls = ost_tls_get(req);
+ if (tls == NULL)
+ GOTO(out_bulk, rc = -ENOMEM);
+ local_nb = tls->local;
rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
if (rc != 0)
- GOTO(out_bulk, rc);
+ GOTO(out_tls, rc);
/*
* If getting the lock took more time than
out_lock:
ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
+ out_tls:
+ ost_tls_put(req);
out_bulk:
if (desc)
ptlrpc_free_bulk(desc);
return rc;
}
-struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
-{
- return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
-}
-
/* use obd ops to offer management infrastructure */
static struct obd_ops ost_obd_ops = {
.o_owner = THIS_MODULE,