Whamcloud - gitweb
LU-6349 ptlrpc: remove LUSTRE_MSG_MAGIC_V1 support
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
index 4731969..d747a18 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2010, 2013, Intel Corporation.
+ * Copyright (c) 2010, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -43,6 +43,7 @@
 
 #define DEBUG_SUBSYSTEM S_LDLM
 
+#include <linux/kthread.h>
 #include <libcfs/libcfs.h>
 #include <obd.h>
 #include <obd_class.h>
@@ -508,7 +509,7 @@ int client_connect_import(const struct lu_env *env,
 
         *exp = NULL;
        down_write(&cli->cl_sem);
-        if (cli->cl_conn_count > 0 )
+       if (cli->cl_conn_count > 0)
                 GOTO(out_sem, rc = -EALREADY);
 
         rc = class_connect(&conn, obd, cluuid);
@@ -580,17 +581,17 @@ int client_disconnect_export(struct obd_export *exp)
         imp = cli->cl_import;
 
        down_write(&cli->cl_sem);
-        CDEBUG(D_INFO, "disconnect %s - %d\n", obd->obd_name,
-               cli->cl_conn_count);
+       CDEBUG(D_INFO, "disconnect %s - %zu\n", obd->obd_name,
+               cli->cl_conn_count);
 
-        if (!cli->cl_conn_count) {
+       if (cli->cl_conn_count == 0) {
                 CERROR("disconnecting disconnected device (%s)\n",
                        obd->obd_name);
                 GOTO(out_disconnect, rc = -EINVAL);
         }
 
         cli->cl_conn_count--;
-        if (cli->cl_conn_count)
+       if (cli->cl_conn_count != 0)
                 GOTO(out_disconnect, rc = 0);
 
        /* Mark import deactivated now, so we don't try to reconnect if any
@@ -752,7 +753,6 @@ void target_client_add_cb(struct obd_device *obd, __u64 transno, void *cb_data,
        spin_unlock(&exp->exp_lock);
        class_export_cb_put(exp);
 }
-EXPORT_SYMBOL(target_client_add_cb);
 
 static void
 check_and_start_recovery_timer(struct obd_device *obd,
@@ -871,6 +871,20 @@ int target_handle_connect(struct ptlrpc_request *req)
         if (rc)
                 GOTO(out, rc);
 
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
+       /* Don't allow clients to connect that are using old 1.8 format
+        * protocol conventions (LUSTRE_MSG_MAGIC_v1, !MSGHDR_CKSUM_INCOMPAT18,
+        * ldlm_flock_policy_wire format, MDT_ATTR_xTIME_SET, etc).  The
+        * FULL20 flag should be set on all connections since 2.0, but no
+        * longer affects behaviour.
+        *
+        * Later this check will be disabled and the flag can be retired
+        * completely once interop with 3.0 is no longer needed.
+        */
+       if (!(data->ocd_connect_flags & OBD_CONNECT_FULL20))
+               GOTO(out, rc = -EPROTO);
+#endif
+
        if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT) {
                if (data->ocd_version < LUSTRE_VERSION_CODE -
                                               LUSTRE_VERSION_ALLOWED_OFFSET ||
@@ -900,6 +914,38 @@ int target_handle_connect(struct ptlrpc_request *req)
                mds_conn = (data->ocd_connect_flags & OBD_CONNECT_MDS) != 0;
                lw_client = (data->ocd_connect_flags &
                             OBD_CONNECT_LIGHTWEIGHT) != 0;
+
+               /* OBD_CONNECT_MNE_SWAB is defined as OBD_CONNECT_MDS_MDS
+                * for Imperative Recovery connection from MGC to MGS.
+                *
+                * Via check OBD_CONNECT_FID, we can distinguish whether
+                * the OBD_CONNECT_MDS_MDS/OBD_CONNECT_MNE_SWAB is from
+                * MGC or MDT. */
+               if (!lw_client &&
+                   (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+                   (data->ocd_connect_flags & OBD_CONNECT_FID) &&
+                   (data->ocd_connect_flags & OBD_CONNECT_VERSION)) {
+                       __u32 major = OBD_OCD_VERSION_MAJOR(data->ocd_version);
+                       __u32 minor = OBD_OCD_VERSION_MINOR(data->ocd_version);
+                       __u32 patch = OBD_OCD_VERSION_PATCH(data->ocd_version);
+
+                       /* We do not support the MDT-MDT interoperations with
+                        * different version MDT because of protocol changes. */
+                       if (unlikely(major != LUSTRE_MAJOR ||
+                                    minor != LUSTRE_MINOR ||
+                                    abs(patch - LUSTRE_PATCH) > 3)) {
+                               LCONSOLE_WARN("%s (%u.%u.%u.%u) refused the "
+                                       "connection from different version MDT "
+                                       "(%d.%d.%d.%d) %s %s\n",
+                                       target->obd_name, LUSTRE_MAJOR,
+                                       LUSTRE_MINOR, LUSTRE_PATCH, LUSTRE_FIX,
+                                       major, minor, patch,
+                                       OBD_OCD_VERSION_FIX(data->ocd_version),
+                                       libcfs_nid2str(req->rq_peer.nid), str);
+
+                               GOTO(out, rc = -EPROTO);
+                       }
+               }
        }
 
         /* lctl gets a backstage, all-access pass. */
@@ -1033,30 +1079,30 @@ no_export:
                        t = cfs_timer_deadline(&target->obd_recovery_timer);
                        t = cfs_time_sub(t, cfs_time_current());
                        t = cfs_duration_sec(t);
-                       LCONSOLE_WARN("%s: Denying connection for new client "
-                                     "%s (at %s), waiting for all %d known "
-                                     "clients (%d recovered, %d in progress, "
-                                     "and %d evicted) to recover in %d:%.02d\n",
+                       LCONSOLE_WARN("%s: Denying connection for new client %s"
+                                     "(at %s), waiting for %d known clients "
+                                     "(%d recovered, %d in progress, and %d "
+                                     "evicted) to recover in %d:%.02d\n",
                                      target->obd_name, cluuid.uuid,
                                      libcfs_nid2str(req->rq_peer.nid), k,
                                      c - i, i, s, (int)t / 60,
                                      (int)t % 60);
-                        rc = -EBUSY;
-                } else {
+                       rc = -EBUSY;
+               } else {
 dont_check_exports:
-                        rc = obd_connect(req->rq_svc_thread->t_env,
-                                         &export, target, &cluuid, data,
-                                         client_nid);
+                       rc = obd_connect(req->rq_svc_thread->t_env,
+                                        &export, target, &cluuid, data,
+                                        client_nid);
                        if (mds_conn && OBD_FAIL_CHECK(OBD_FAIL_TGT_RCVG_FLAG))
                                lustre_msg_add_op_flags(req->rq_repmsg,
-                                               MSG_CONNECT_RECOVERING);
-                        if (rc == 0)
-                                conn.cookie = export->exp_handle.h_cookie;
-                }
-        } else {
-                rc = obd_reconnect(req->rq_svc_thread->t_env,
-                                   export, target, &cluuid, data, client_nid);
-        }
+                                                       MSG_CONNECT_RECOVERING);
+                       if (rc == 0)
+                               conn.cookie = export->exp_handle.h_cookie;
+               }
+       } else {
+               rc = obd_reconnect(req->rq_svc_thread->t_env,
+                                  export, target, &cluuid, data, client_nid);
+       }
        if (rc)
                GOTO(out, rc);
 
@@ -1205,17 +1251,12 @@ dont_check_exports:
         * ptlrpc_handle_server_req_in->lustre_unpack_msg(). */
         revimp->imp_msg_magic = req->rq_reqmsg->lm_magic;
 
-       if ((data->ocd_connect_flags & OBD_CONNECT_AT) &&
-           (revimp->imp_msg_magic != LUSTRE_MSG_MAGIC_V1))
+       if (data->ocd_connect_flags & OBD_CONNECT_AT)
                revimp->imp_msghdr_flags |= MSGHDR_AT_SUPPORT;
        else
                revimp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
 
-       if ((data->ocd_connect_flags & OBD_CONNECT_FULL20) &&
-            (revimp->imp_msg_magic != LUSTRE_MSG_MAGIC_V1))
-                revimp->imp_msghdr_flags |= MSGHDR_CKSUM_INCOMPAT18;
-        else
-                revimp->imp_msghdr_flags &= ~MSGHDR_CKSUM_INCOMPAT18;
+       revimp->imp_msghdr_flags |= MSGHDR_CKSUM_INCOMPAT18;
 
        rc = sptlrpc_import_sec_adapt(revimp, req->rq_svc_ctx, &req->rq_flvr);
        if (rc) {
@@ -1249,7 +1290,6 @@ out:
                req->rq_status = rc;
        RETURN(rc);
 }
-EXPORT_SYMBOL(target_handle_connect);
 
 int target_handle_disconnect(struct ptlrpc_request *req)
 {
@@ -1265,7 +1305,6 @@ int target_handle_disconnect(struct ptlrpc_request *req)
 
         RETURN(0);
 }
-EXPORT_SYMBOL(target_handle_disconnect);
 
 void target_destroy_export(struct obd_export *exp)
 {
@@ -1474,7 +1513,7 @@ void target_cleanup_recovery(struct obd_device *obd)
        spin_unlock(&obd->obd_recovery_task_lock);
 
        list_for_each_entry_safe(req, n, &clean_list, rq_list) {
-               LASSERT(req->rq_reply_state == 0);
+               LASSERT(req->rq_reply_state == NULL);
                target_exp_dequeue_req_replay(req);
                target_request_copy_put(req);
        }
@@ -1485,7 +1524,7 @@ void target_cleanup_recovery(struct obd_device *obd)
        spin_unlock(&obd->obd_recovery_task_lock);
 
        list_for_each_entry_safe(req, n, &clean_list, rq_list) {
-                LASSERT(req->rq_reply_state == 0);
+               LASSERT(req->rq_reply_state == NULL);
                 target_request_copy_put(req);
         }
 
@@ -1499,7 +1538,6 @@ void target_cancel_recovery_timer(struct obd_device *obd)
         CDEBUG(D_HA, "%s: cancel recovery timer\n", obd->obd_name);
         cfs_timer_disarm(&obd->obd_recovery_timer);
 }
-EXPORT_SYMBOL(target_cancel_recovery_timer);
 
 static void target_start_recovery_timer(struct obd_device *obd)
 {
@@ -1768,7 +1806,11 @@ repeat:
                obd->obd_abort_recovery = 1;
        }
 
-       wait_event(obd->obd_next_transno_waitq, check_routine(obd));
+       while (wait_event_timeout(obd->obd_next_transno_waitq,
+                                 check_routine(obd),
+                                 msecs_to_jiffies(60 * MSEC_PER_SEC)) == 0)
+               /* wait indefinitely for event, but don't trigger watchdog */;
+
        if (obd->obd_abort_recovery) {
                CWARN("recovery is aborted, evict exports in recovery\n");
                /** evict exports which didn't finish recovery yet */
@@ -2321,14 +2363,12 @@ added:
        wake_up(&obd->obd_next_transno_waitq);
        RETURN(0);
 }
-EXPORT_SYMBOL(target_queue_recovery_request);
 
 int target_handle_ping(struct ptlrpc_request *req)
 {
         obd_ping(req->rq_svc_thread->t_env, req->rq_export);
         return req_capsule_server_pack(&req->rq_pill);
 }
-EXPORT_SYMBOL(target_handle_ping);
 
 void target_committed_to_req(struct ptlrpc_request *req)
 {
@@ -2345,7 +2385,6 @@ void target_committed_to_req(struct ptlrpc_request *req)
         CDEBUG(D_INFO, "last_committed "LPU64", transno "LPU64", xid "LPU64"\n",
                exp->exp_last_committed, req->rq_transno, req->rq_xid);
 }
-EXPORT_SYMBOL(target_committed_to_req);
 
 #endif /* HAVE_SERVER_SUPPORT */
 
@@ -2376,9 +2415,9 @@ int target_pack_pool_reply(struct ptlrpc_request *req)
 
         RETURN(0);
 }
-EXPORT_SYMBOL(target_pack_pool_reply);
 
-int target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id)
+static int target_send_reply_msg(struct ptlrpc_request *req,
+                                int rc, int fail_id)
 {
         if (OBD_FAIL_CHECK_ORSET(fail_id & ~OBD_FAIL_ONCE, OBD_FAIL_ONCE)) {
                 DEBUG_REQ(D_ERROR, req, "dropping reply");
@@ -2487,7 +2526,6 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
        spin_unlock(&svcpt->scp_rep_lock);
        EXIT;
 }
-EXPORT_SYMBOL(target_send_reply);
 
 ldlm_mode_t lck_compat_array[] = {
        [LCK_EX]    = LCK_COMPAT_EX,
@@ -2577,7 +2615,6 @@ ldlm_error_t ldlm_errno2error(int err_no)
         }
         return error;
 }
-EXPORT_SYMBOL(ldlm_errno2error);
 
 #if LUSTRE_TRACKS_LOCK_EXP_REFS
 void ldlm_dump_export_locks(struct obd_export *exp)
@@ -2606,9 +2643,13 @@ static int target_bulk_timeout(void *data)
         RETURN(1);
 }
 
-static inline char *bulk2type(struct ptlrpc_bulk_desc *desc)
+static inline const char *bulk2type(struct ptlrpc_request *req)
 {
-        return desc->bd_type == BULK_GET_SINK ? "GET" : "PUT";
+       if (req->rq_bulk_read)
+               return "READ";
+       if (req->rq_bulk_write)
+               return "WRITE";
+       return "UNKNOWN";
 }
 
 int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
@@ -2635,7 +2676,7 @@ int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
            exp->exp_conn_cnt > lustre_msg_get_conn_cnt(req->rq_reqmsg)) {
                rc = -ENOTCONN;
        } else {
-               if (desc->bd_type == BULK_PUT_SINK)
+               if (req->rq_bulk_read)
                        rc = sptlrpc_svc_wrap_bulk(req, desc);
                if (rc == 0)
                        rc = ptlrpc_start_bulk_transfer(desc);
@@ -2643,7 +2684,7 @@ int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
 
        if (rc < 0) {
                DEBUG_REQ(D_ERROR, req, "bulk %s failed: rc %d",
-                         bulk2type(desc), rc);
+                         bulk2type(req), rc);
                RETURN(rc);
        }
 
@@ -2682,31 +2723,36 @@ int target_bulk_io(struct obd_export *exp, struct ptlrpc_bulk_desc *desc,
 
        if (rc == -ETIMEDOUT) {
                DEBUG_REQ(D_ERROR, req, "timeout on bulk %s after %ld%+lds",
-                         bulk2type(desc), deadline - start,
+                         bulk2type(req), deadline - start,
                          cfs_time_current_sec() - deadline);
                ptlrpc_abort_bulk(desc);
        } else if (exp->exp_failed) {
                DEBUG_REQ(D_ERROR, req, "Eviction on bulk %s",
-                         bulk2type(desc));
+                         bulk2type(req));
                rc = -ENOTCONN;
                ptlrpc_abort_bulk(desc);
        } else if (exp->exp_conn_cnt >
                   lustre_msg_get_conn_cnt(req->rq_reqmsg)) {
                DEBUG_REQ(D_ERROR, req, "Reconnect on bulk %s",
-                         bulk2type(desc));
+                         bulk2type(req));
                /* We don't reply anyway. */
                rc = -ETIMEDOUT;
                ptlrpc_abort_bulk(desc);
-       } else if (desc->bd_failure ||
-                  desc->bd_nob_transferred != desc->bd_nob) {
-               DEBUG_REQ(D_ERROR, req, "%s bulk %s %d(%d)",
-                         desc->bd_failure ? "network error on" : "truncated",
-                         bulk2type(desc), desc->bd_nob_transferred,
-                         desc->bd_nob);
-               /* XXX Should this be a different errno? */
+       } else if (desc->bd_failure) {
+               DEBUG_REQ(D_ERROR, req, "network error on bulk %s",
+                         bulk2type(req));
+               /* XXX should this be a different errno? */
                rc = -ETIMEDOUT;
-       } else if (desc->bd_type == BULK_GET_SINK) {
-               rc = sptlrpc_svc_unwrap_bulk(req, desc);
+       } else {
+               if (req->rq_bulk_write)
+                       rc = sptlrpc_svc_unwrap_bulk(req, desc);
+               if (rc == 0 && desc->bd_nob_transferred != desc->bd_nob) {
+                       DEBUG_REQ(D_ERROR, req, "truncated bulk %s %d(%d)",
+                                 bulk2type(req), desc->bd_nob_transferred,
+                                 desc->bd_nob);
+                       /* XXX should this be a different errno? */
+                       rc = -ETIMEDOUT;
+               }
        }
 
        RETURN(rc);