Whamcloud - gitweb
Branch b_release_1_8_1
authorzam <zam>
Wed, 13 May 2009 06:51:17 +0000 (06:51 +0000)
committerzam <zam>
Wed, 13 May 2009 06:51:17 +0000 (06:51 +0000)
b=19195
i=johann
i=tappro
i=oleg.drokin

Temporarily attach ost_thread_cache_local object to non ost_io thread if an
i/o request comes to that thread during ost recovery.

lustre/ost/ost_handler.c
lustre/ost/ost_internal.h

index 50364e6..1b21669 100644 (file)
@@ -579,6 +579,43 @@ static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *ob
         RETURN(opd.opd_lock_match);
 }
 
+/* Allocate thread local buffers if needed */
+static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
+{
+        struct ost_thread_local_cache *tls =
+                (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
+
+        /* In normal mode of operation an I/O request is serviced only
+         * by ll_ost_io threads each of them has own tls buffers allocated by
+         * ost_thread_init().
+         * During recovery, an I/O request may be queued until any of the ost
+         * service threads process it. Not necessary it should be one of
+         * ll_ost_io threads. In that case we dynamically allocating tls
+         * buffers for the request service time. */
+        if (unlikely(tls == NULL)) {
+                LASSERT(r->rq_export->exp_in_recovery);
+                OBD_ALLOC_PTR(tls);
+                if (tls != NULL) {
+                        tls->temporary = 1;
+                        r->rq_svc_thread->t_data = tls;
+                }
+        }
+        return  tls;
+}
+
+/* Free thread local buffers if they were allocated only for servicing
+ * this one request */
+static void ost_tls_put(struct ptlrpc_request *r)
+{
+        struct ost_thread_local_cache *tls =
+                (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
+
+        if (unlikely(tls->temporary)) {
+                OBD_FREE_PTR(tls);
+                r->rq_svc_thread->t_data = NULL;
+        }
+}
+
 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 {
         struct ptlrpc_bulk_desc *desc = NULL;
@@ -592,6 +629,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         __u32  size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         int niocount, npages, nob = 0, rc, i;
         int no_reply = 0;
+        struct ost_thread_local_cache *tls;
         ENTRY;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
@@ -627,15 +665,14 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (rc)
                 GOTO(out, rc);
 
-        /*
-         * Per-thread array of struct niobuf_{local,remote}'s was allocated by
-         * ost_thread_init().
-         */
-        local_nb = ost_tls(req)->local;
+        tls = ost_tls_get(req);
+        if (tls == NULL)
+                GOTO(out_bulk, rc = -ENOMEM);
+        local_nb = tls->local;
 
         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
         if (rc != 0)
-                GOTO(out_bulk, rc);
+                GOTO(out_tls, rc);
 
         /*
          * If getting the lock took more time than
@@ -780,6 +817,8 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
  out_lock:
         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
+ out_tls:
+        ost_tls_put(req);
  out_bulk:
         if (desc)
                 ptlrpc_free_bulk(desc);
@@ -826,6 +865,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         obd_count                client_cksum = 0, server_cksum = 0;
         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
         int                      no_reply = 0;
+        struct ost_thread_local_cache *tls;
         ENTRY;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
@@ -873,15 +913,14 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
                              niocount * sizeof(*rcs));
 
-        /*
-         * Per-thread array of struct niobuf_{local,remote}'s was allocated by
-         * ost_thread_init().
-         */
-        local_nb = ost_tls(req)->local;
+        tls = ost_tls_get(req);
+        if (tls == NULL)
+                GOTO(out_bulk, rc = -ENOMEM);
+        local_nb = tls->local;
 
         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
         if (rc != 0)
-                GOTO(out_bulk, rc);
+                GOTO(out_tls, rc);
 
         /*
          * If getting the lock took more time than
@@ -1085,6 +1124,8 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
  out_lock:
         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
+ out_tls:
+        ost_tls_put(req);
  out_bulk:
         if (desc)
                 ptlrpc_free_bulk(desc);
@@ -2104,11 +2145,6 @@ static int ost_health_check(struct obd_device *obd)
         return rc;
 }
 
-struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
-{
-        return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
-}
-
 /* use obd ops to offer management infrastructure */
 static struct obd_ops ost_obd_ops = {
         .o_owner        = THIS_MODULE,
index 3cbb4fa..750fb34 100644 (file)
@@ -61,10 +61,9 @@ struct ost_thread_local_cache {
         struct page          *page  [OST_THREAD_POOL_SIZE];
         struct niobuf_local   local [OST_THREAD_POOL_SIZE];
         struct niobuf_remote  remote[OST_THREAD_POOL_SIZE];
+        unsigned int          temporary:1;
 };
 
-struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r);
-
 #define OSS_DEF_CREATE_THREADS  2UL
 #define OSS_MAX_CREATE_THREADS 16UL