Whamcloud - gitweb
LU-6175 ha: add health_check routine to the MDS, MGS and OSD
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index 7567acf..e53b6ee 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -42,9 +42,9 @@
 #define DEBUG_SUBSYSTEM S_OST
 
 #include <linux/module.h>
-#include <obd_ost.h>
 #include <lustre_dlm.h>
 #include <lprocfs_status.h>
+#include <obd_class.h>
 #include "ost_internal.h"
 
 static int oss_num_threads;
@@ -67,485 +67,19 @@ static char *oss_io_cpts;
 CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
                "CPU partitions OSS IO threads should run on");
 
-/**
- * Validate oa from client.
- * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
- * req are valid.
- *    a. objects in Single MDT FS  seq = FID_SEQ_OST_MDT0, oi_id != 0
- *    b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
- *       pack ost_id. Because non-zero oi_seq will make it diffcult to tell
- *       whether this is oi_fid or real ostid. So it will check
- *       OBD_CONNECT_FID, then convert the ostid to FID for old client.
- *    c. Old FID-disable osc will send IDIF.
- *    d. new FID-enable osc/osp will send normal FID.
- *
- * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
- * be used for LAST_ID file, and only being accessed inside OST now.
- */
-static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
-                            struct obd_ioobj *ioobj)
-{
-       int rc = 0;
-
-       if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
-                    fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
-               /* Sigh 2.[123] client still sends echo req with oi_id = 0
-                * during create, and we will reset this to 1, since this
-                * oi_id is basically useless in the following create process,
-                * but oi_id == 0 will make it difficult to tell whether it is
-                * real FID or ost_id. */
-               oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
-               oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
-               oa->o_oi.oi_fid.f_ver = 0;
-       } else {
-               if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
-                       GOTO(out, rc = -EPROTO);
-
-               /* Note: this check might be forced in 2.5 or 2.6, i.e.
-                * all of the requests are required to setup FLGROUP */
-               if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
-                       ostid_set_seq_mdt0(&oa->o_oi);
-                       if (ioobj)
-                               ostid_set_seq_mdt0(&ioobj->ioo_oid);
-                       oa->o_valid |= OBD_MD_FLGROUP;
-               }
-
-               if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
-                       GOTO(out, rc = -EPROTO);
-       }
-
-       if (ioobj != NULL) {
-               unsigned max_brw = ioobj_max_brw_get(ioobj);
-
-               if (unlikely((max_brw & (max_brw - 1)) != 0)) {
-                       CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
-                              ": rc = -EPROTO\n", exp->exp_obd->obd_name,
-                              obd_export_nid2str(exp), max_brw,
-                              POSTID(&oa->o_oi));
-                       GOTO(out, rc = -EPROTO);
-               }
-               ioobj->ioo_oid = oa->o_oi;
-       }
-
-out:
-       if (rc != 0)
-               CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
-                      exp->exp_obd->obd_name, obd_export_nid2str(exp),
-                      oa ? ostid_seq(&oa->o_oi) : -1,
-                      oa ? ostid_id(&oa->o_oi) : -1, rc);
-       return rc;
-}
-
-struct ost_prolong_data {
-        struct ptlrpc_request *opd_req;
-        struct obd_export     *opd_exp;
-        struct obdo           *opd_oa;
-        struct ldlm_res_id     opd_resid;
-        struct ldlm_extent     opd_extent;
-        ldlm_mode_t            opd_mode;
-        unsigned int           opd_locks;
-        int                    opd_timeout;
-};
-
-/* prolong locks for the current service time of the corresponding
- * portal (= OST_IO_PORTAL)
- */
-static inline int prolong_timeout(struct ptlrpc_request *req)
-{
-       struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
-
-       if (AT_OFF)
-               return obd_timeout / 2;
-
-       return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
-                  ldlm_timeout);
-}
-
-static void ost_prolong_lock_one(struct ost_prolong_data *opd,
-                                 struct ldlm_lock *lock)
-{
-       LASSERT(lock->l_export == opd->opd_exp);
-
-       if (lock->l_flags & LDLM_FL_DESTROYED) /* lock already cancelled */
-               return;
-
-        /* XXX: never try to grab resource lock here because we're inside
-         * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
-         * res lock and then exp_bl_list_lock. */
-
-        if (!(lock->l_flags & LDLM_FL_AST_SENT))
-                /* ignore locks not being cancelled */
-                return;
-
-        LDLM_DEBUG(lock,
-                   "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
-                   opd->opd_req->rq_xid, opd->opd_extent.start,
-                   opd->opd_extent.end, opd->opd_timeout);
-
-        /* OK. this is a possible lock the user holds doing I/O
-         * let's refresh eviction timer for it */
-        ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
-        ++opd->opd_locks;
-}
-
-static void ost_prolong_locks(struct ost_prolong_data *data)
-{
-        struct obd_export *exp = data->opd_exp;
-        struct obdo       *oa  = data->opd_oa;
-        struct ldlm_lock  *lock;
-        ENTRY;
-
-        if (oa->o_valid & OBD_MD_FLHANDLE) {
-                /* mostly a request should be covered by only one lock, try
-                 * fast path. */
-                lock = ldlm_handle2lock(&oa->o_handle);
-                if (lock != NULL) {
-                        /* Fast path to check if the lock covers the whole IO
-                         * region exclusively. */
-                        if (lock->l_granted_mode == LCK_PW &&
-                            ldlm_extent_contain(&lock->l_policy_data.l_extent,
-                                                &data->opd_extent)) {
-                                /* bingo */
-                                ost_prolong_lock_one(data, lock);
-                                LDLM_LOCK_PUT(lock);
-                                RETURN_EXIT;
-                        }
-                        LDLM_LOCK_PUT(lock);
-                }
-        }
-
-
-       spin_lock_bh(&exp->exp_bl_list_lock);
-        cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
-                LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
-                LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
-
-                if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
-                        continue;
-
-                if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
-                                         &data->opd_extent))
-                        continue;
-
-                ost_prolong_lock_one(data, lock);
-        }
-       spin_unlock_bh(&exp->exp_bl_list_lock);
-
-       EXIT;
-}
-
-/**
- * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
- * not.
- */
-static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
-                                   struct ldlm_lock *lock)
-{
-        struct niobuf_remote *nb;
-        struct obd_ioobj *ioo;
-        int mode, opc;
-        struct ldlm_extent ext;
-        ENTRY;
-
-        opc = lustre_msg_get_opc(req->rq_reqmsg);
-        LASSERT(opc == OST_READ || opc == OST_WRITE);
-
-        ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
-        LASSERT(ioo != NULL);
-
-        nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
-        LASSERT(nb != NULL);
-
-        ext.start = nb->offset;
-        nb += ioo->ioo_bufcnt - 1;
-        ext.end = nb->offset + nb->len - 1;
-
-       LASSERT(lock->l_resource != NULL);
-       if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
-               RETURN(0);
-
-        mode = LCK_PW;
-        if (opc == OST_READ)
-                mode |= LCK_PR;
-        if (!(lock->l_granted_mode & mode))
-                RETURN(0);
-
-        RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
-}
-
-/**
- * High-priority queue request check for whether the given PTLRPC request (\a
- * req) is blocking an LDLM lock cancel.
- *
- * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
- * cancel, 0 if it is not, and -EFAULT if the request is malformed.
- *
- * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
- * function looks only at OST_READs and OST_WRITEs.
- */
-static int ost_rw_hpreq_check(struct ptlrpc_request *req)
-{
-        struct obd_device *obd = req->rq_export->exp_obd;
-        struct ost_body *body;
-        struct obd_ioobj *ioo;
-        struct niobuf_remote *nb;
-        struct ost_prolong_data opd = { 0 };
-        int mode, opc;
-        ENTRY;
-
-        /*
-         * Use LASSERT to do sanity check because malformed RPCs should have
-         * been filtered out in ost_hpreq_handler().
-         */
-        opc = lustre_msg_get_opc(req->rq_reqmsg);
-        LASSERT(opc == OST_READ || opc == OST_WRITE);
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        LASSERT(body != NULL);
-
-        ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
-        LASSERT(ioo != NULL);
-
-        nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
-        LASSERT(nb != NULL);
-        LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
-
-       ostid_build_res_name(&ioo->ioo_oid, &opd.opd_resid);
-
-        opd.opd_req = req;
-        mode = LCK_PW;
-        if (opc == OST_READ)
-                mode |= LCK_PR;
-        opd.opd_mode = mode;
-        opd.opd_exp = req->rq_export;
-        opd.opd_oa  = &body->oa;
-        opd.opd_extent.start = nb->offset;
-        nb += ioo->ioo_bufcnt - 1;
-        opd.opd_extent.end = nb->offset + nb->len - 1;
-        opd.opd_timeout = prolong_timeout(req);
-
-       DEBUG_REQ(D_RPCTRACE, req,
-              "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
-              obd->obd_name, current->comm,
-              opd.opd_resid.name[0], opd.opd_resid.name[1],
-              opd.opd_extent.start, opd.opd_extent.end);
-
-        ost_prolong_locks(&opd);
-
-        CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
-               obd->obd_name, opd.opd_locks, req);
-
-        RETURN(opd.opd_locks > 0);
-}
-
-static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
-{
-        (void)ost_rw_hpreq_check(req);
-}
-
-/**
- * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
- */
-static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
-                                      struct ldlm_lock *lock)
-{
-        struct ost_body *body;
-        ENTRY;
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        LASSERT(body != NULL);
-
-        if (body->oa.o_valid & OBD_MD_FLHANDLE &&
-            body->oa.o_handle.cookie == lock->l_handle.h_cookie)
-                RETURN(1);
-
-        RETURN(0);
-}
-
-/**
- * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
- */
-static int ost_punch_hpreq_check(struct ptlrpc_request *req)
-{
-        struct obd_device *obd = req->rq_export->exp_obd;
-        struct ost_body *body;
-        struct obdo *oa;
-        struct ost_prolong_data opd = { 0 };
-        __u64 start, end;
-        ENTRY;
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        LASSERT(body != NULL);
-
-        oa = &body->oa;
-        LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
-                !(oa->o_flags & OBD_FL_SRVLOCK));
-
-        start = oa->o_size;
-        end = start + oa->o_blocks;
-
-        opd.opd_req = req;
-        opd.opd_mode = LCK_PW;
-        opd.opd_exp = req->rq_export;
-        opd.opd_oa  = oa;
-        opd.opd_extent.start = start;
-        opd.opd_extent.end   = end;
-        if (oa->o_blocks == OBD_OBJECT_EOF)
-                opd.opd_extent.end = OBD_OBJECT_EOF;
-        opd.opd_timeout = prolong_timeout(req);
-
-       ostid_build_res_name(&oa->o_oi, &opd.opd_resid);
-
-        CDEBUG(D_DLMTRACE,
-               "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
-               obd->obd_name,
-               opd.opd_resid.name[0], opd.opd_resid.name[1],
-               opd.opd_extent.start, opd.opd_extent.end);
-
-        ost_prolong_locks(&opd);
-
-        CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
-               obd->obd_name, opd.opd_locks, req);
-
-        RETURN(opd.opd_locks > 0);
-}
-
-static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
-{
-        (void)ost_punch_hpreq_check(req);
-}
-
-struct ptlrpc_hpreq_ops ost_hpreq_rw = {
-        .hpreq_lock_match = ost_rw_hpreq_lock_match,
-        .hpreq_check      = ost_rw_hpreq_check,
-        .hpreq_fini       = ost_rw_hpreq_fini
-};
-
-struct ptlrpc_hpreq_ops ost_hpreq_punch = {
-        .hpreq_lock_match = ost_punch_hpreq_lock_match,
-        .hpreq_check      = ost_punch_hpreq_check,
-        .hpreq_fini       = ost_punch_hpreq_fini
-};
-
-/** Assign high priority operations to the request if needed. */
-static int ost_io_hpreq_handler(struct ptlrpc_request *req)
-{
-        ENTRY;
-        if (req->rq_export) {
-                int opc = lustre_msg_get_opc(req->rq_reqmsg);
-                struct ost_body *body;
-
-                if (opc == OST_READ || opc == OST_WRITE) {
-                        struct niobuf_remote *nb;
-                        struct obd_ioobj *ioo;
-                        int objcount, niocount;
-                        int rc;
-                        int i;
-
-                        /* RPCs on the H-P queue can be inspected before
-                         * ost_handler() initializes their pills, so we
-                         * initialize that here.  Capsule initialization is
-                         * idempotent, as is setting the pill's format (provided
-                         * it doesn't change).
-                         */
-                        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
-                        if (opc == OST_READ)
-                                req_capsule_set(&req->rq_pill,
-                                                &RQF_OST_BRW_READ);
-                        else
-                                req_capsule_set(&req->rq_pill,
-                                                &RQF_OST_BRW_WRITE);
-
-                        body = req_capsule_client_get(&req->rq_pill,
-                                                      &RMF_OST_BODY);
-                        if (body == NULL) {
-                                CERROR("Missing/short ost_body\n");
-                                RETURN(-EFAULT);
-                        }
-
-                        objcount = req_capsule_get_size(&req->rq_pill,
-                                                        &RMF_OBD_IOOBJ,
-                                                        RCL_CLIENT) /
-                                                        sizeof(*ioo);
-                        if (objcount == 0) {
-                                CERROR("Missing/short ioobj\n");
-                                RETURN(-EFAULT);
-                        }
-                        if (objcount > 1) {
-                                CERROR("too many ioobjs (%d)\n", objcount);
-                                RETURN(-EFAULT);
-                        }
-
-                        ioo = req_capsule_client_get(&req->rq_pill,
-                                                     &RMF_OBD_IOOBJ);
-                        if (ioo == NULL) {
-                                CERROR("Missing/short ioobj\n");
-                                RETURN(-EFAULT);
-                        }
-
-                        rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
-                        if (rc) {
-                                CERROR("invalid object ids\n");
-                                RETURN(rc);
-                        }
-
-                        for (niocount = i = 0; i < objcount; i++) {
-                                if (ioo[i].ioo_bufcnt == 0) {
-                                        CERROR("ioo[%d] has zero bufcnt\n", i);
-                                        RETURN(-EFAULT);
-                                }
-                                niocount += ioo[i].ioo_bufcnt;
-                        }
-                        if (niocount > PTLRPC_MAX_BRW_PAGES) {
-                                DEBUG_REQ(D_RPCTRACE, req,
-                                          "bulk has too many pages (%d)",
-                                          niocount);
-                                RETURN(-EFAULT);
-                        }
-
-                        nb = req_capsule_client_get(&req->rq_pill,
-                                                    &RMF_NIOBUF_REMOTE);
-                        if (nb == NULL) {
-                                CERROR("Missing/short niobuf\n");
-                                RETURN(-EFAULT);
-                        }
-
-                        if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
-                                req->rq_ops = &ost_hpreq_rw;
-                } else if (opc == OST_PUNCH) {
-                        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
-                        req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
-
-                        body = req_capsule_client_get(&req->rq_pill,
-                                                      &RMF_OST_BODY);
-                        if (body == NULL) {
-                                CERROR("Missing/short ost_body\n");
-                                RETURN(-EFAULT);
-                        }
-
-                        if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
-                            !(body->oa.o_flags & OBD_FL_SRVLOCK))
-                                req->rq_ops = &ost_hpreq_punch;
-                }
-        }
-        RETURN(0);
-}
-
 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
 
 static struct cfs_cpt_table    *ost_io_cptable;
 
-#ifdef LPROCFS
+#ifdef CONFIG_PROC_FS
 LPROC_SEQ_FOPS_RO_TYPE(ost, uuid);
 
-static struct lprocfs_seq_vars lprocfs_ost_obd_vars[] = {
-       { "uuid",       &ost_uuid_fops  },
-       { 0 }
+static struct lprocfs_vars lprocfs_ost_obd_vars[] = {
+       { .name =       "uuid",
+         .fops =       &ost_uuid_fops  },
+       { NULL }
 };
-#endif /* LPROCFS */
+#endif /* CONFIG_PROC_FS */
 
 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
@@ -556,13 +90,9 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
        int rc;
        ENTRY;
 
-        rc = cfs_cleanup_group_info();
-        if (rc)
-                RETURN(rc);
-
-#ifdef LPROCFS
+#ifdef CONFIG_PROC_FS
        obd->obd_vars = lprocfs_ost_obd_vars;
-       lprocfs_seq_obd_setup(obd);
+       lprocfs_obd_setup(obd);
 #endif
        mutex_init(&ost->ost_health_mutex);
 
@@ -642,7 +172,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                GOTO(out_service, rc);
         }
 
-       mask = cfs_cpt_table->ctb_nodemask;
+       mask = cfs_cpt_nodemask(cfs_cpt_table, CFS_CPT_ANY);
        /* event CPT feature is disabled in libcfs level by set partition
         * number to 1, we still want to set node affinity for io service */
        if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) {
@@ -698,7 +228,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                        .so_thr_init            = tgt_io_thread_init,
                        .so_thr_done            = tgt_io_thread_done,
                        .so_req_handler         = tgt_request_handle,
-                       .so_hpreq_handler       = ost_io_hpreq_handler,
+                       .so_hpreq_handler       = tgt_hpreq_handler,
                        .so_req_printer         = target_print_req,
                },
        };
@@ -801,6 +331,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
        ping_evictor_start();
 
        RETURN(0);
+
 out_seq:
        ptlrpc_unregister_service(ost->ost_seq_service);
        ost->ost_seq_service = NULL;
@@ -856,23 +387,17 @@ static int ost_cleanup(struct obd_device *obd)
 
 static int ost_health_check(const struct lu_env *env, struct obd_device *obd)
 {
-        struct ost_obd *ost = &obd->u.ost;
-        int rc = 0;
+       struct ost_obd *ost = &obd->u.ost;
+       int rc = 0;
 
        mutex_lock(&ost->ost_health_mutex);
-        rc |= ptlrpc_service_health_check(ost->ost_service);
-        rc |= ptlrpc_service_health_check(ost->ost_create_service);
-        rc |= ptlrpc_service_health_check(ost->ost_io_service);
+       rc |= ptlrpc_service_health_check(ost->ost_service);
+       rc |= ptlrpc_service_health_check(ost->ost_create_service);
+       rc |= ptlrpc_service_health_check(ost->ost_io_service);
+       rc |= ptlrpc_service_health_check(ost->ost_seq_service);
        mutex_unlock(&ost->ost_health_mutex);
 
-        /*
-         * health_check to return 0 on healthy
-         * and 1 on unhealthy.
-         */
-        if( rc != 0)
-                rc = 1;
-
-        return rc;
+       return rc != 0 ? 1 : 0;
 }
 
 /* use obd ops to offer management infrastructure */
@@ -890,11 +415,8 @@ static int __init ost_init(void)
 
        ENTRY;
 
-       rc = class_register_type(&ost_obd_ops, NULL, NULL,
-#ifndef HAVE_ONLY_PROCFS_SEQ
-                               NULL,
-#endif
-                               LUSTRE_OSS_NAME, NULL);
+       rc = class_register_type(&ost_obd_ops, NULL, true, NULL,
+                                LUSTRE_OSS_NAME, NULL);
 
         if (ost_num_threads != 0 && oss_num_threads == 0) {
                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "