Whamcloud - gitweb
LU-13004 ptlrpc: Allow BULK_BUF_KIOV to accept a kvec
[fs/lustre-release.git] / lustre / ptlrpc / nrs_crr.c
index 0335bf9..6d0a0be 100644 (file)
@@ -20,7 +20,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2011 Intel Corporation
+ * Copyright (c) 2013, 2017, Intel Corporation.
  *
  * Copyright 2012 Xyratex Technology Limited
  */
@@ -41,9 +41,6 @@
 #ifdef HAVE_SERVER_SUPPORT
 
 #define DEBUG_SUBSYSTEM S_RPC
-#ifndef __KERNEL__
-#include <liblustre.h>
-#endif
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_net.h>
@@ -75,7 +72,8 @@
  * \retval 0 e1 > e2
  * \retval 1 e1 <= e2
  */
-static int crrn_req_compare(cfs_binheap_node_t *e1, cfs_binheap_node_t *e2)
+static int
+crrn_req_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
 {
        struct ptlrpc_nrs_request *nrq1;
        struct ptlrpc_nrs_request *nrq2;
@@ -91,88 +89,53 @@ static int crrn_req_compare(cfs_binheap_node_t *e1, cfs_binheap_node_t *e2)
        return nrq1->nr_u.crr.cr_sequence < nrq2->nr_u.crr.cr_sequence;
 }
 
-static cfs_binheap_ops_t nrs_crrn_heap_ops = {
+static struct cfs_binheap_ops nrs_crrn_heap_ops = {
        .hop_enter      = NULL,
        .hop_exit       = NULL,
        .hop_compare    = crrn_req_compare,
 };
 
 /**
- * libcfs_hash operations for nrs_crrn_net::cn_cli_hash
+ * rhashtable operations for nrs_crrn_net::cn_cli_hash
  *
  * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
  * nrs_crrn_client objects.
  */
-#define NRS_NID_BKT_BITS       8
-#define NRS_NID_BITS           16
-
-static unsigned nrs_crrn_hop_hash(cfs_hash_t *hs, const void *key,
-                                 unsigned mask)
+static u32 nrs_crrn_hashfn(const void *data, u32 len, u32 seed)
 {
-       return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
-}
+       const lnet_nid_t *nid = data;
 
-static int nrs_crrn_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
-{
-       lnet_nid_t              *nid = (lnet_nid_t *)key;
-       struct nrs_crrn_client  *cli = cfs_hlist_entry(hnode,
-                                                      struct nrs_crrn_client,
-                                                      cc_hnode);
-       return *nid == cli->cc_nid;
+       seed ^= cfs_hash_64((u64)nid, 32);
+       return seed;
 }
 
-static void *nrs_crrn_hop_key(cfs_hlist_node_t *hnode)
+static int nrs_crrn_cmpfn(struct rhashtable_compare_arg *arg, const void *obj)
 {
-       struct nrs_crrn_client  *cli = cfs_hlist_entry(hnode,
-                                                      struct nrs_crrn_client,
-                                                      cc_hnode);
-       return &cli->cc_nid;
-}
+       const struct nrs_crrn_client *cli = obj;
+       const lnet_nid_t *nid = arg->key;
 
-static void *nrs_crrn_hop_object(cfs_hlist_node_t *hnode)
-{
-       return cfs_hlist_entry(hnode, struct nrs_crrn_client, cc_hnode);
+       return *nid != cli->cc_nid;
 }
 
-static void nrs_crrn_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
-{
-       struct nrs_crrn_client *cli = cfs_hlist_entry(hnode,
-                                                     struct nrs_crrn_client,
-                                                     cc_hnode);
-       cfs_atomic_inc(&cli->cc_ref);
-}
+static const struct rhashtable_params nrs_crrn_hash_params = {
+       .key_len        = sizeof(lnet_nid_t),
+       .key_offset     = offsetof(struct nrs_crrn_client, cc_nid),
+       .head_offset    = offsetof(struct nrs_crrn_client, cc_rhead),
+       .hashfn         = nrs_crrn_hashfn,
+       .obj_cmpfn      = nrs_crrn_cmpfn,
+};
 
-static void nrs_crrn_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+static void nrs_crrn_exit(void *vcli, void *data)
 {
-       struct nrs_crrn_client  *cli = cfs_hlist_entry(hnode,
-                                                      struct nrs_crrn_client,
-                                                      cc_hnode);
-       cfs_atomic_dec(&cli->cc_ref);
-}
+       struct nrs_crrn_client *cli = vcli;
 
-static void nrs_crrn_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
-{
-       struct nrs_crrn_client  *cli = cfs_hlist_entry(hnode,
-                                                      struct nrs_crrn_client,
-                                                      cc_hnode);
-       LASSERTF(cfs_atomic_read(&cli->cc_ref) == 0,
+       LASSERTF(atomic_read(&cli->cc_ref) == 0,
                 "Busy CRR-N object from client with NID %s, with %d refs\n",
-                libcfs_nid2str(cli->cc_nid), cfs_atomic_read(&cli->cc_ref));
+                libcfs_nid2str(cli->cc_nid), atomic_read(&cli->cc_ref));
 
        OBD_FREE_PTR(cli);
 }
 
-static cfs_hash_ops_t nrs_crrn_hash_ops = {
-       .hs_hash        = nrs_crrn_hop_hash,
-       .hs_keycmp      = nrs_crrn_hop_keycmp,
-       .hs_key         = nrs_crrn_hop_key,
-       .hs_object      = nrs_crrn_hop_object,
-       .hs_get         = nrs_crrn_hop_get,
-       .hs_put         = nrs_crrn_hop_put,
-       .hs_put_locked  = nrs_crrn_hop_put,
-       .hs_exit        = nrs_crrn_hop_exit,
-};
-
 /**
  * Called when a CRR-N policy instance is started.
  *
@@ -181,7 +144,7 @@ static cfs_hash_ops_t nrs_crrn_hash_ops = {
  * \retval -ENOMEM OOM error
  * \retval 0      success
  */
-static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy)
+static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy, char *arg)
 {
        struct nrs_crrn_net    *net;
        int                     rc = 0;
@@ -196,17 +159,11 @@ static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy)
                                             nrs_pol2cptab(policy),
                                             nrs_pol2cptid(policy));
        if (net->cn_binheap == NULL)
-               GOTO(failed, rc = -ENOMEM);
-
-       net->cn_cli_hash = cfs_hash_create("nrs_crrn_nid_hash",
-                                          NRS_NID_BITS, NRS_NID_BITS,
-                                          NRS_NID_BKT_BITS, 0,
-                                          CFS_HASH_MIN_THETA,
-                                          CFS_HASH_MAX_THETA,
-                                          &nrs_crrn_hash_ops,
-                                          CFS_HASH_RW_BKTLOCK);
-       if (net->cn_cli_hash == NULL)
-               GOTO(failed, rc = -ENOMEM);
+               GOTO(out_net, rc = -ENOMEM);
+
+       rc = rhashtable_init(&net->cn_cli_hash, &nrs_crrn_hash_params);
+       if (rc)
+               GOTO(out_binheap, rc);
 
        /**
         * Set default quantum value to max_rpcs_in_flight for non-MDS OSCs;
@@ -214,7 +171,7 @@ static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy)
         * with the default max_rpcs_in_flight value, as we are scheduling over
         * NIDs, and there may be more than one mount point per client.
         */
-       net->cn_quantum = OSC_MAX_RIF_DEFAULT;
+       net->cn_quantum = OBD_MAX_RIF_DEFAULT;
        /**
         * Set to 1 so that the test inside nrs_crrn_req_add() can evaluate to
         * true.
@@ -225,10 +182,9 @@ static int nrs_crrn_start(struct ptlrpc_nrs_policy *policy)
 
        RETURN(rc);
 
-failed:
-       if (net->cn_binheap != NULL)
-               cfs_binheap_destroy(net->cn_binheap);
-
+out_binheap:
+       cfs_binheap_destroy(net->cn_binheap);
+out_net:
        OBD_FREE_PTR(net);
 
        RETURN(rc);
@@ -250,11 +206,10 @@ static void nrs_crrn_stop(struct ptlrpc_nrs_policy *policy)
 
        LASSERT(net != NULL);
        LASSERT(net->cn_binheap != NULL);
-       LASSERT(net->cn_cli_hash != NULL);
        LASSERT(cfs_binheap_is_empty(net->cn_binheap));
 
+       rhashtable_free_and_destroy(&net->cn_cli_hash, nrs_crrn_exit, NULL);
        cfs_binheap_destroy(net->cn_binheap);
-       cfs_hash_putref(net->cn_cli_hash);
 
        OBD_FREE_PTR(net);
 }
@@ -267,18 +222,19 @@ static void nrs_crrn_stop(struct ptlrpc_nrs_policy *policy)
  * \param[in]    opc    the opcode
  * \param[in,out] arg   used for passing parameters and information
  *
- * \pre spin_is_locked(&policy->pol_nrs->->nrs_lock)
- * \post spin_is_locked(&policy->pol_nrs->->nrs_lock)
+ * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
+ * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
  *
  * \retval 0   operation carried out successfully
  * \retval -ve error
  */
-int nrs_crrn_ctl(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc,
-                void *arg)
+static int nrs_crrn_ctl(struct ptlrpc_nrs_policy *policy,
+                       enum ptlrpc_nrs_ctl opc,
+                       void *arg)
 {
-       LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock));
+       assert_spin_locked(&policy->pol_nrs->nrs_lock);
 
-       switch(opc) {
+       switch((enum nrs_ctl_crr)opc) {
        default:
                RETURN(-EINVAL);
 
@@ -329,10 +285,10 @@ int nrs_crrn_ctl(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc,
  *
  * \see nrs_resource_get_safe()
  */
-int nrs_crrn_res_get(struct ptlrpc_nrs_policy *policy,
-                    struct ptlrpc_nrs_request *nrq,
-                    const struct ptlrpc_nrs_resource *parent,
-                    struct ptlrpc_nrs_resource **resp, bool moving_req)
+static int nrs_crrn_res_get(struct ptlrpc_nrs_policy *policy,
+                           struct ptlrpc_nrs_request *nrq,
+                           const struct ptlrpc_nrs_resource *parent,
+                           struct ptlrpc_nrs_resource **resp, bool moving_req)
 {
        struct nrs_crrn_net     *net;
        struct nrs_crrn_client  *cli;
@@ -347,26 +303,32 @@ int nrs_crrn_res_get(struct ptlrpc_nrs_policy *policy,
        net = container_of(parent, struct nrs_crrn_net, cn_res);
        req = container_of(nrq, struct ptlrpc_request, rq_nrq);
 
-       cli = cfs_hash_lookup(net->cn_cli_hash, &req->rq_peer.nid);
-       if (cli != NULL)
+       cli = rhashtable_lookup_fast(&net->cn_cli_hash, &req->rq_peer.nid,
+                                    nrs_crrn_hash_params);
+       if (cli)
                goto out;
 
        OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
-                         sizeof(*cli), moving_req ? CFS_ALLOC_ATOMIC :
-                         CFS_ALLOC_IO);
+                         sizeof(*cli), moving_req ? GFP_ATOMIC : GFP_NOFS);
        if (cli == NULL)
                return -ENOMEM;
 
        cli->cc_nid = req->rq_peer.nid;
 
-       cfs_atomic_set(&cli->cc_ref, 1);
-       tmp = cfs_hash_findadd_unique(net->cn_cli_hash, &cli->cc_nid,
-                                     &cli->cc_hnode);
-       if (tmp != cli) {
+       atomic_set(&cli->cc_ref, 0);
+
+       tmp = rhashtable_lookup_get_insert_fast(&net->cn_cli_hash,
+                                               &cli->cc_rhead,
+                                               nrs_crrn_hash_params);
+       if (tmp) {
+               /* insertion failed */
                OBD_FREE_PTR(cli);
+               if (IS_ERR(tmp))
+                       return PTR_ERR(tmp);
                cli = tmp;
        }
 out:
+       atomic_inc(&cli->cc_ref);
        *resp = &cli->cc_res;
 
        return 1;
@@ -382,8 +344,7 @@ out:
 static void nrs_crrn_res_put(struct ptlrpc_nrs_policy *policy,
                             const struct ptlrpc_nrs_resource *res)
 {
-       struct nrs_crrn_net     *net;
-       struct nrs_crrn_client  *cli;
+       struct nrs_crrn_client *cli;
 
        /**
         * Do nothing for freeing parent, nrs_crrn_net resources
@@ -392,9 +353,8 @@ static void nrs_crrn_res_put(struct ptlrpc_nrs_policy *policy,
                return;
 
        cli = container_of(res, struct nrs_crrn_client, cc_res);
-       net = container_of(res->res_parent, struct nrs_crrn_net, cn_res);
 
-       cfs_hash_put(net->cn_cli_hash, &cli->cc_hnode);
+       atomic_dec(&cli->cc_ref);
 }
 
 /**
@@ -417,7 +377,7 @@ struct ptlrpc_nrs_request *nrs_crrn_req_get(struct ptlrpc_nrs_policy *policy,
                                            bool peek, bool force)
 {
        struct nrs_crrn_net       *net = policy->pol_private;
-       cfs_binheap_node_t        *node = cfs_binheap_root(net->cn_binheap);
+       struct cfs_binheap_node   *node = cfs_binheap_root(net->cn_binheap);
        struct ptlrpc_nrs_request *nrq;
 
        nrq = unlikely(node == NULL) ? NULL :
@@ -439,7 +399,7 @@ struct ptlrpc_nrs_request *nrs_crrn_req_get(struct ptlrpc_nrs_policy *policy,
 
                CDEBUG(D_RPCTRACE,
                       "NRS: starting to handle %s request from %s, with round "
-                      LPU64"\n", NRS_POL_NAME_CRRN,
+                      "%llu\n", NRS_POL_NAME_CRRN,
                       libcfs_id2str(req->rq_peer), nrq->nr_u.crr.cr_round);
 
                /** Peek at the next request to be served */
@@ -579,7 +539,7 @@ static void nrs_crrn_req_del(struct ptlrpc_nrs_policy *policy,
         */
        if (unlikely(is_root)) {
                /** Peek at the next request to be served */
-               cfs_binheap_node_t *node = cfs_binheap_root(net->cn_binheap);
+               struct cfs_binheap_node *node = cfs_binheap_root(net->cn_binheap);
 
                /** No more requests */
                if (unlikely(node == NULL)) {
@@ -608,15 +568,13 @@ static void nrs_crrn_req_stop(struct ptlrpc_nrs_policy *policy,
                                                  rq_nrq);
 
        CDEBUG(D_RPCTRACE,
-              "NRS: finished handling %s request from %s, with round "LPU64
+              "NRS: finished handling %s request from %s, with round %llu"
               "\n", NRS_POL_NAME_CRRN,
               libcfs_id2str(req->rq_peer), nrq->nr_u.crr.cr_round);
 }
 
-#ifdef LPROCFS
-
 /**
- * lprocfs interface
+ * debugfs interface
  */
 
 /**
@@ -633,14 +591,12 @@ static void nrs_crrn_req_stop(struct ptlrpc_nrs_policy *policy,
  *     reg_quantum:8
  *     hp_quantum:4
  */
-static int ptlrpc_lprocfs_rd_nrs_crrn_quantum(char *page, char **start,
-                                             off_t off, int count, int *eof,
-                                             void *data)
+static int
+ptlrpc_lprocfs_nrs_crrn_quantum_seq_show(struct seq_file *m, void *data)
 {
-       struct ptlrpc_service       *svc = data;
-       __u16                        quantum;
-       int                          rc;
-       int                          rc2 = 0;
+       struct ptlrpc_service   *svc = m->private;
+       __u16                   quantum;
+       int                     rc;
 
        /**
         * Perform two separate calls to this as only one of the NRS heads'
@@ -652,9 +608,8 @@ static int ptlrpc_lprocfs_rd_nrs_crrn_quantum(char *page, char **start,
                                       NRS_CTL_CRRN_RD_QUANTUM,
                                       true, &quantum);
        if (rc == 0) {
-               *eof = 1;
-               rc2 = snprintf(page, count, NRS_LPROCFS_QUANTUM_NAME_REG
-                              "%-5d\n", quantum);
+               seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_REG
+                          "%-5d\n", quantum);
                /**
                 * Ignore -ENODEV as the regular NRS head's policy may be in the
                 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
@@ -671,9 +626,7 @@ static int ptlrpc_lprocfs_rd_nrs_crrn_quantum(char *page, char **start,
                                       NRS_CTL_CRRN_RD_QUANTUM,
                                       true, &quantum);
        if (rc == 0) {
-               *eof = 1;
-               rc2 += snprintf(page + rc2, count - rc2,
-                               NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
+               seq_printf(m, NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
                /**
                 * Ignore -ENODEV as the high priority NRS head's policy may be
                 * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
@@ -683,8 +636,7 @@ static int ptlrpc_lprocfs_rd_nrs_crrn_quantum(char *page, char **start,
        }
 
 no_hp:
-
-       return rc2 ? : rc;
+       return rc;
 }
 
 /**
@@ -707,25 +659,27 @@ no_hp:
  * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
  * are skipped later by nrs_crrn_ctl().
  */
-static int ptlrpc_lprocfs_wr_nrs_crrn_quantum(struct file *file,
-                                             const char *buffer,
-                                             unsigned long count, void *data)
+static ssize_t
+ptlrpc_lprocfs_nrs_crrn_quantum_seq_write(struct file *file,
+                                         const char __user *buffer,
+                                         size_t count,
+                                         loff_t *off)
 {
-       struct ptlrpc_service       *svc = data;
+       struct ptlrpc_service       *svc = ((struct seq_file *)file->private_data)->private;
        enum ptlrpc_nrs_queue_type   queue = 0;
        char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
        char                        *val;
        long                         quantum_reg;
        long                         quantum_hp;
        /** lprocfs_find_named_value() modifies its argument, so keep a copy */
-       unsigned long                count_copy;
+       size_t                       count_copy;
        int                          rc = 0;
        int                          rc2 = 0;
 
         if (count > (sizeof(kernbuf) - 1))
                 return -EINVAL;
 
-       if (cfs_copy_from_user(kernbuf, buffer, count))
+       if (copy_from_user(kernbuf, buffer, count))
                return -EFAULT;
 
         kernbuf[count] = '\0';
@@ -738,7 +692,9 @@ static int ptlrpc_lprocfs_wr_nrs_crrn_quantum(struct file *file,
        val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
                                       &count_copy);
        if (val != kernbuf) {
-               quantum_reg = simple_strtol(val, NULL, 10);
+               rc = kstrtol(val, 10, &quantum_reg);
+               if (rc)
+                       return rc;
 
                queue |= PTLRPC_NRS_QUEUE_REG;
        }
@@ -754,7 +710,9 @@ static int ptlrpc_lprocfs_wr_nrs_crrn_quantum(struct file *file,
                if (!nrs_svc_has_hp(svc))
                        return -ENODEV;
 
-               quantum_hp = simple_strtol(val, NULL, 10);
+               rc = kstrtol(val, 10, &quantum_hp);
+               if (rc)
+                       return rc;
 
                queue |= PTLRPC_NRS_QUEUE_HP;
        }
@@ -764,10 +722,9 @@ static int ptlrpc_lprocfs_wr_nrs_crrn_quantum(struct file *file,
         * value
         */
        if (queue == 0) {
-               if (!isdigit(kernbuf[0]))
-                       return -EINVAL;
-
-               quantum_reg = simple_strtol(kernbuf, NULL, 10);
+               rc = kstrtol(kernbuf, 10, &quantum_reg);
+               if (rc)
+                       return rc;
 
                queue = PTLRPC_NRS_QUEUE_REG;
 
@@ -816,6 +773,8 @@ static int ptlrpc_lprocfs_wr_nrs_crrn_quantum(struct file *file,
        return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
 }
 
+LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_crrn_quantum);
+
 /**
  * Initializes a CRR-N policy's lprocfs interface for service \a svc
  *
@@ -824,41 +783,21 @@ static int ptlrpc_lprocfs_wr_nrs_crrn_quantum(struct file *file,
  * \retval 0   success
  * \retval != 0        error
  */
-int nrs_crrn_lprocfs_init(struct ptlrpc_service *svc)
+static int nrs_crrn_lprocfs_init(struct ptlrpc_service *svc)
 {
-       int     rc;
-
        struct lprocfs_vars nrs_crrn_lprocfs_vars[] = {
                { .name         = "nrs_crrn_quantum",
-                 .read_fptr    = ptlrpc_lprocfs_rd_nrs_crrn_quantum,
-                 .write_fptr   = ptlrpc_lprocfs_wr_nrs_crrn_quantum,
+                 .fops         = &ptlrpc_lprocfs_nrs_crrn_quantum_fops,
                  .data = svc },
                { NULL }
        };
 
-       if (svc->srv_procroot == NULL)
+       if (!svc->srv_debugfs_entry)
                return 0;
 
-       rc = lprocfs_add_vars(svc->srv_procroot, nrs_crrn_lprocfs_vars, NULL);
-
-       return rc;
-}
-
-/**
- * Cleans up a CRR-N policy's lprocfs interface for service \a svc
- *
- * \param[in] svc the service
- */
-void nrs_crrn_lprocfs_fini(struct ptlrpc_service *svc)
-{
-       if (svc->srv_procroot == NULL)
-               return;
-
-       lprocfs_remove_proc_entry("nrs_crrn_quantum", svc->srv_procroot);
+       return ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_crrn_lprocfs_vars, NULL);
 }
 
-#endif /* LPROCFS */
-
 /**
  * CRR-N policy operations
  */
@@ -872,10 +811,7 @@ static const struct ptlrpc_nrs_pol_ops nrs_crrn_ops = {
        .op_req_enqueue         = nrs_crrn_req_add,
        .op_req_dequeue         = nrs_crrn_req_del,
        .op_req_stop            = nrs_crrn_req_stop,
-#ifdef LPROCFS
        .op_lprocfs_init        = nrs_crrn_lprocfs_init,
-       .op_lprocfs_fini        = nrs_crrn_lprocfs_fini,
-#endif
 };
 
 /**