Whamcloud - gitweb
b=22731 server should not fall into LBUG if client send invalid parameter
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index c8473da..ef830bb 100644 (file)
@@ -89,8 +89,12 @@ void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
         if (oti == NULL)
                 return;
 
-        if (req->rq_repmsg)
+        if (req->rq_repmsg) {
+                __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
+                versions[0] = oti->oti_pre_version;
+                lustre_msg_set_versions(req->rq_repmsg, versions);
+        }
         req->rq_transno = oti->oti_transno;
 
         /* XXX 4 == entries in oti_ack_locks??? */
@@ -170,8 +174,11 @@ static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
         ENTRY;
 
         LASSERT(!lustre_handle_is_used(lh));
-        LASSERT((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) ==
-                (OBD_MD_FLID | OBD_MD_FLGROUP));
+        /* o_id and o_gr are used for localizing resource, if client miss to set
+         * them, do not trigger ASSERTION. */
+        if (unlikely((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) !=
+                     (OBD_MD_FLID | OBD_MD_FLGROUP)))
+                RETURN(-EPROTO);
 
         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
             !(oa->o_flags & OBD_FL_SRVLOCK))
@@ -620,6 +627,43 @@ static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *ob
         RETURN(opd.opd_lock_match);
 }
 
+/* Allocate thread local buffers if needed */
+static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
+{
+        struct ost_thread_local_cache *tls =
+                (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
+
+        /* In normal mode of operation an I/O request is serviced only
+         * by ll_ost_io threads each of them has own tls buffers allocated by
+         * ost_thread_init().
+         * During recovery, an I/O request may be queued until any of the ost
+         * service threads process it. Not necessary it should be one of
+         * ll_ost_io threads. In that case we dynamically allocating tls
+         * buffers for the request service time. */
+        if (unlikely(tls == NULL)) {
+                LASSERT(r->rq_export->exp_in_recovery);
+                OBD_ALLOC_PTR(tls);
+                if (tls != NULL) {
+                        tls->temporary = 1;
+                        r->rq_svc_thread->t_data = tls;
+                }
+        }
+        return  tls;
+}
+
+/* Free thread local buffers if they were allocated only for servicing
+ * this one request */
+static void ost_tls_put(struct ptlrpc_request *r)
+{
+        struct ost_thread_local_cache *tls =
+                (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
+
+        if (unlikely(tls->temporary)) {
+                OBD_FREE_PTR(tls);
+                r->rq_svc_thread->t_data = NULL;
+        }
+}
+
 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 {
         struct ptlrpc_bulk_desc *desc = NULL;
@@ -633,6 +677,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         struct lustre_handle lockh = { 0 };
         int niocount, npages, nob = 0, rc, i;
         int no_reply = 0;
+        struct ost_thread_local_cache *tls;
         ENTRY;
 
         req->rq_bulk_read = 1;
@@ -681,20 +726,18 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 }
         }
 
-        req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER, 0);
         rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 GOTO(out, rc);
 
-        /*
-         * Per-thread array of struct niobuf_{local,remote}'s was allocated by
-         * ost_thread_init().
-         */
-        local_nb = ost_tls(req)->local;
+        tls = ost_tls_get(req);
+        if (tls == NULL)
+                GOTO(out_bulk, rc = -ENOMEM);
+        local_nb = tls->local;
 
         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
         if (rc != 0)
-                GOTO(out_bulk, rc);
+                GOTO(out_tls, rc);
 
         /*
          * If getting the lock took more time than
@@ -855,6 +898,8 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
 out_lock:
         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
+out_tls:
+        ost_tls_put(req);
 out_bulk:
         if (desc)
                 ptlrpc_free_bulk(desc);
@@ -902,6 +947,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
         int                      no_reply = 0;
         __u32                    o_uid = 0, o_gid = 0;
+        struct ost_thread_local_cache *tls;
         ENTRY;
 
         req->rq_bulk_write = 1;
@@ -969,15 +1015,14 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val);
         rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
 
-        /*
-         * Per-thread array of struct niobuf_{local,remote}'s was allocated by
-         * ost_thread_init().
-         */
-        local_nb = ost_tls(req)->local;
+        tls = ost_tls_get(req);
+        if (tls == NULL)
+                GOTO(out_bulk, rc = -ENOMEM);
+        local_nb = tls->local;
 
         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
         if (rc != 0)
-                GOTO(out_bulk, rc);
+                GOTO(out_tls, rc);
 
         /*
          * If getting the lock took more time than
@@ -1130,6 +1175,13 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 repbody->oa.o_gid = o_gid;
         }
 
+        /*
+         * Disable sending mtime back to the client. If the client locked the
+         * whole object, then it has already updated the mtime on its side,
+         * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
+         */
+        repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
+
         if (unlikely(client_cksum != server_cksum && rc == 0)) {
                 int  new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
                 char *msg;
@@ -1195,10 +1247,20 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
 out_lock:
         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
+out_tls:
+        ost_tls_put(req);
 out_bulk:
         if (desc)
                 ptlrpc_free_bulk(desc);
 out:
+       /* XXX: don't send reply if obd rdonly mode, this can cause data loss
+        * on client, see bug 22190. Remove this when async bulk will be done.
+        * Meanwhile, if this is umount then don't reply anything. */
+        if (req->rq_export->exp_obd->obd_no_transno) {
+                no_reply = req->rq_export->exp_obd->obd_stopping;
+                rc = -EIO;
+        }
+
         if (rc == 0) {
                 oti_to_request(oti, req);
                 target_committed_to_req(req);
@@ -1898,7 +1960,12 @@ static int ost_hpreq_handler(struct ptlrpc_request *req)
                          * it doesn't change).
                          */
                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
-                        req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
+                        if (opc == OST_READ)
+                                req_capsule_set(&req->rq_pill,
+                                                &RQF_OST_BRW_READ);
+                        else
+                                req_capsule_set(&req->rq_pill,
+                                                &RQF_OST_BRW_WRITE);
 
                         body = req_capsule_client_get(&req->rq_pill,
                                                       &RMF_OST_BODY);
@@ -1990,8 +2057,6 @@ int ost_handle(struct ptlrpc_request *req)
         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
 
         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
-                int recovering;
-
                 if (!class_connected_export(req->rq_export)) {
                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
                                lustre_msg_get_opc(req->rq_reqmsg),
@@ -2003,10 +2068,7 @@ int ost_handle(struct ptlrpc_request *req)
                 obd = req->rq_export->exp_obd;
 
                 /* Check for aborted recovery. */
-                cfs_spin_lock_bh(&obd->obd_processing_task_lock);
-                recovering = obd->obd_recovering;
-                cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
-                if (recovering) {
+                if (obd->obd_recovering) {
                         rc = ost_filter_recovery_request(req, obd,
                                                          &should_process);
                         if (rc || !should_process)
@@ -2081,7 +2143,7 @@ int ost_handle(struct ptlrpc_request *req)
                 rc = ost_setattr(req->rq_export, req, oti);
                 break;
         case OST_WRITE:
-                req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
+                req_capsule_set(&req->rq_pill, &RQF_OST_BRW_WRITE);
                 CDEBUG(D_INODE, "write\n");
                 /* req->rq_request_portal would be nice, if it was set */
                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){
@@ -2102,7 +2164,7 @@ int ost_handle(struct ptlrpc_request *req)
                 /* ost_brw_write sends its own replies */
                 RETURN(rc);
         case OST_READ:
-                req_capsule_set(&req->rq_pill, &RQF_OST_BRW);
+                req_capsule_set(&req->rq_pill, &RQF_OST_BRW_READ);
                 CDEBUG(D_INODE, "read\n");
                 /* req->rq_request_portal would be nice, if it was set */
                 if (req->rq_rqbd->rqbd_service->srv_req_portal !=OST_IO_PORTAL){