Whamcloud - gitweb
b=22731 server should not fall into LBUG if client send invalid parameter
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index 28f3deb..ef830bb 100644 (file)
@@ -89,8 +89,12 @@ void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
         if (oti == NULL)
                 return;
 
-        if (req->rq_repmsg)
+        if (req->rq_repmsg) {
+                __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
+                versions[0] = oti->oti_pre_version;
+                lustre_msg_set_versions(req->rq_repmsg, versions);
+        }
         req->rq_transno = oti->oti_transno;
 
         /* XXX 4 == entries in oti_ack_locks??? */
@@ -170,8 +174,11 @@ static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
         ENTRY;
 
         LASSERT(!lustre_handle_is_used(lh));
-        LASSERT((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) ==
-                (OBD_MD_FLID | OBD_MD_FLGROUP));
+        /* o_id and o_gr are used for localizing resource, if client miss to set
+         * them, do not trigger ASSERTION. */
+        if (unlikely((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) !=
+                     (OBD_MD_FLID | OBD_MD_FLGROUP)))
+                RETURN(-EPROTO);
 
         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
             !(oa->o_flags & OBD_FL_SRVLOCK))
@@ -620,6 +627,43 @@ static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *ob
         RETURN(opd.opd_lock_match);
 }
 
+/* Allocate thread local buffers if needed */
+static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
+{
+        struct ost_thread_local_cache *tls =
+                (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
+
+        /* In normal mode of operation an I/O request is serviced only
+         * by ll_ost_io threads each of them has own tls buffers allocated by
+         * ost_thread_init().
+         * During recovery, an I/O request may be queued until any of the ost
+         * service threads process it. Not necessary it should be one of
+         * ll_ost_io threads. In that case we dynamically allocating tls
+         * buffers for the request service time. */
+        if (unlikely(tls == NULL)) {
+                LASSERT(r->rq_export->exp_in_recovery);
+                OBD_ALLOC_PTR(tls);
+                if (tls != NULL) {
+                        tls->temporary = 1;
+                        r->rq_svc_thread->t_data = tls;
+                }
+        }
+        return  tls;
+}
+
+/* Free thread local buffers if they were allocated only for servicing
+ * this one request */
+static void ost_tls_put(struct ptlrpc_request *r)
+{
+        struct ost_thread_local_cache *tls =
+                (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
+
+        if (unlikely(tls->temporary)) {
+                OBD_FREE_PTR(tls);
+                r->rq_svc_thread->t_data = NULL;
+        }
+}
+
 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 {
         struct ptlrpc_bulk_desc *desc = NULL;
@@ -633,6 +677,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         struct lustre_handle lockh = { 0 };
         int niocount, npages, nob = 0, rc, i;
         int no_reply = 0;
+        struct ost_thread_local_cache *tls;
         ENTRY;
 
         req->rq_bulk_read = 1;
@@ -685,15 +730,14 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (rc)
                 GOTO(out, rc);
 
-        /*
-         * Per-thread array of struct niobuf_{local,remote}'s was allocated by
-         * ost_thread_init().
-         */
-        local_nb = ost_tls(req)->local;
+        tls = ost_tls_get(req);
+        if (tls == NULL)
+                GOTO(out_bulk, rc = -ENOMEM);
+        local_nb = tls->local;
 
         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
         if (rc != 0)
-                GOTO(out_bulk, rc);
+                GOTO(out_tls, rc);
 
         /*
          * If getting the lock took more time than
@@ -854,6 +898,8 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
 out_lock:
         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
+out_tls:
+        ost_tls_put(req);
 out_bulk:
         if (desc)
                 ptlrpc_free_bulk(desc);
@@ -901,6 +947,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
         int                      no_reply = 0;
         __u32                    o_uid = 0, o_gid = 0;
+        struct ost_thread_local_cache *tls;
         ENTRY;
 
         req->rq_bulk_write = 1;
@@ -968,15 +1015,14 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val);
         rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
 
-        /*
-         * Per-thread array of struct niobuf_{local,remote}'s was allocated by
-         * ost_thread_init().
-         */
-        local_nb = ost_tls(req)->local;
+        tls = ost_tls_get(req);
+        if (tls == NULL)
+                GOTO(out_bulk, rc = -ENOMEM);
+        local_nb = tls->local;
 
         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
         if (rc != 0)
-                GOTO(out_bulk, rc);
+                GOTO(out_tls, rc);
 
         /*
          * If getting the lock took more time than
@@ -1201,10 +1247,20 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
 out_lock:
         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
+out_tls:
+        ost_tls_put(req);
 out_bulk:
         if (desc)
                 ptlrpc_free_bulk(desc);
 out:
+       /* XXX: don't send reply if obd rdonly mode, this can cause data loss
+        * on client, see bug 22190. Remove this when async bulk will be done.
+        * Meanwhile, if this is umount then don't reply anything. */
+        if (req->rq_export->exp_obd->obd_no_transno) {
+                no_reply = req->rq_export->exp_obd->obd_stopping;
+                rc = -EIO;
+        }
+
         if (rc == 0) {
                 oti_to_request(oti, req);
                 target_committed_to_req(req);
@@ -2001,8 +2057,6 @@ int ost_handle(struct ptlrpc_request *req)
         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
 
         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
-                int recovering;
-
                 if (!class_connected_export(req->rq_export)) {
                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
                                lustre_msg_get_opc(req->rq_reqmsg),
@@ -2014,10 +2068,7 @@ int ost_handle(struct ptlrpc_request *req)
                 obd = req->rq_export->exp_obd;
 
                 /* Check for aborted recovery. */
-                cfs_spin_lock_bh(&obd->obd_processing_task_lock);
-                recovering = obd->obd_recovering;
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
-                if (recovering) {
+                if (obd->obd_recovering) {
                         rc = ost_filter_recovery_request(req, obd,
                                                          &should_process);
                         if (rc || !should_process)