Whamcloud - gitweb
LU-398 ptlrpc: Add the NRS ORR and TRR policies
authorNikitas Angelinas <nikitas_angelinas@xyratex.com>
Wed, 9 Jan 2013 02:40:21 +0000 (02:40 +0000)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 24 Apr 2013 21:19:19 +0000 (17:19 -0400)
The ORR (Object-based Round Robin) policy schedules brw RPCs in
per-backend-filesystem-object groupings; RPCs in each group are
sorted according to their logical file or physical disk offsets.

The TRR (Target-based Round Robin) policy performs the same
function as ORR, but instead schedules brw RPCs in per-OST
groupings.

Both these policies aim to provide for increased read throughput
in certain use cases, either by minimizing costly disk seek
operations (by ordering OST_READ, and perhaps also OST_WRITE
RPCs), but may also allow for improved performance through better
resource utilization and by taking advantage of locality of
reference characteristics of the I/O load.

Signed-off-by: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
Co-authored-by: Liang Zhen <liang@whamcloud.com>
Change-Id: I1f5a367f2f4a1cf296a3b38f3e395ab28a10668e
Oracle-bug-id: b=13634
Xyratex-bug-id: MRP-73
Reviewed-on: http://review.whamcloud.com/4938
Tested-by: Hudson
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
lustre/include/lustre_net.h
lustre/include/obd_support.h
lustre/ptlrpc/Makefile.in
lustre/ptlrpc/autoMakefile.am
lustre/ptlrpc/nrs.c
lustre/ptlrpc/nrs_orr.c [new file with mode: 0644]
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/service.c

index c1bfedb..3ec5a24 100644 (file)
@@ -787,6 +787,18 @@ enum ptlrpc_nrs_ctl {
 };
 
 /**
 };
 
 /**
+ * ORR policy operations
+ */
+enum nrs_ctl_orr {
+       NRS_CTL_ORR_RD_QUANTUM = PTLRPC_NRS_CTL_1ST_POL_SPEC,
+       NRS_CTL_ORR_WR_QUANTUM,
+       NRS_CTL_ORR_RD_OFF_TYPE,
+       NRS_CTL_ORR_WR_OFF_TYPE,
+       NRS_CTL_ORR_RD_SUPP_REQ,
+       NRS_CTL_ORR_WR_SUPP_REQ,
+};
+
+/**
  * NRS policy operations.
  *
  * These determine the behaviour of a policy, and are called in response to
  * NRS policy operations.
  *
  * These determine the behaviour of a policy, and are called in response to
@@ -1505,6 +1517,184 @@ enum nrs_ctl_crr {
 /** @} CRR-N */
 
 /**
 /** @} CRR-N */
 
 /**
+ * \name ORR/TRR
+ *
+ * ORR/TRR (Object-based Round Robin/Target-based Round Robin) NRS policies
+ * @{
+ */
+
+/**
+ * Lower and upper byte offsets of a brw RPC
+ */
+struct nrs_orr_req_range {
+       __u64           or_start;
+       __u64           or_end;
+};
+
+/**
+ * RPC types supported by the ORR/TRR policies
+ */
+enum nrs_orr_supp {
+       NOS_OST_READ  = (1 << 0),
+       NOS_OST_WRITE = (1 << 1),
+       NOS_OST_RW    = (NOS_OST_READ | NOS_OST_WRITE),
+       /**
+        * Default value for policies.
+        */
+       NOS_DFLT      = NOS_OST_READ
+};
+
+/**
+ * As unique keys for grouping RPCs together, we use the object's OST FID for
+ * the ORR policy, and the OST index for the TRR policy.
+ *
+ * XXX: We waste some space for TRR policy instances by using a union, but it
+ *     allows to consolidate some of the code between ORR and TRR, and these
+ *     policies will probably eventually merge into one anyway.
+ */
+struct nrs_orr_key {
+       union {
+               /** object FID for ORR */
+               struct lu_fid   ok_fid;
+               /** OST index for TRR */
+               __u32           ok_idx;
+       };
+};
+
+/**
+ * The largest base string for unique hash/slab object names is
+ * "nrs_orr_reg_", so 13 characters. We add 3 to this to be used for the CPT
+ * id number, so this _should_ be more than enough for the maximum number of
+ * CPTs on any system. If it does happen that this statement is incorrect,
+ * nrs_orr_genobjname() will inevitably yield a non-unique name and cause
+ * cfs_mem_cache_create() to complain (on Linux), so the erroneous situation
+ * will hopefully not go unnoticed.
+ */
+#define NRS_ORR_OBJ_NAME_MAX   (sizeof("nrs_orr_reg_") + 3)
+
+/**
+ * private data structure for ORR and TRR NRS
+ */
+struct nrs_orr_data {
+       struct ptlrpc_nrs_resource      od_res;
+       cfs_binheap_t                  *od_binheap;
+       cfs_hash_t                     *od_obj_hash;
+       cfs_mem_cache_t                *od_cache;
+       /**
+        * Used when a new scheduling round commences, in order to synchronize
+        * all object or OST batches with the new round number.
+        */
+       __u64                           od_round;
+       /**
+        * Determines the relevant ordering amongst request batches within a
+        * scheduling round.
+        */
+       __u64                           od_sequence;
+       /**
+        * RPC types that are currently supported.
+        */
+       enum nrs_orr_supp               od_supp;
+       /**
+        * Round Robin quantum; the maxium number of RPCs that each request
+        * batch for each object or OST can have in a scheduling round.
+        */
+       __u16                           od_quantum;
+       /**
+        * Whether to use physical disk offsets or logical file offsets.
+        */
+       bool                            od_physical;
+       /**
+        * XXX: We need to provide a persistently allocated string to hold
+        * unique object names for this policy, since in currently supported
+        * versions of Linux by Lustre, kmem_cache_create() just sets a pointer
+        * to the name string provided. kstrdup() is used in the version of
+        * kmeme_cache_create() in current Linux mainline, so we may be able to
+        * remove this in the future.
+        */
+       char                            od_objname[NRS_ORR_OBJ_NAME_MAX];
+};
+
+/**
+ * Represents a backend-fs object or OST in the ORR and TRR policies
+ * respectively
+ */
+struct nrs_orr_object {
+       struct ptlrpc_nrs_resource      oo_res;
+       cfs_hlist_node_t                oo_hnode;
+       /**
+        * The round number against which requests are being scheduled for this
+        * object or OST
+        */
+       __u64                           oo_round;
+       /**
+        * The sequence number used for requests scheduled for this object or
+        * OST during the current round number.
+        */
+       __u64                           oo_sequence;
+       /**
+        * The key of the object or OST for which this structure instance is
+        * scheduling RPCs
+        */
+       struct nrs_orr_key              oo_key;
+       cfs_atomic_t                    oo_ref;
+       /**
+        * Round Robin quantum; the maximum number of RPCs that are allowed to
+        * be scheduled for the object or OST in a single batch of each round.
+        */
+       __u16                           oo_quantum;
+       /**
+        * # of pending requests for this object or OST, on all existing rounds
+        */
+       __u16                           oo_active;
+};
+
+/**
+ * ORR/TRR NRS request definition
+ */
+struct nrs_orr_req {
+       /**
+        * The offset range this request covers
+        */
+       struct nrs_orr_req_range        or_range;
+       /**
+        * Round number for this request; shared with all other requests in the
+        * same batch.
+        */
+       __u64                           or_round;
+       /**
+        * Sequence number for this request; shared with all other requests in
+        * the same batch.
+        */
+       __u64                           or_sequence;
+       /**
+        * For debugging purposes.
+        */
+       struct nrs_orr_key              or_key;
+       /**
+        * An ORR policy instance has filled in request information while
+        * enqueueing the request on the service partition's regular NRS head.
+        */
+       unsigned int                    or_orr_set:1;
+       /**
+        * A TRR policy instance has filled in request information while
+        * enqueueing the request on the service partition's regular NRS head.
+        */
+       unsigned int                    or_trr_set:1;
+       /**
+        * Request offset ranges have been filled in with logical offset
+        * values.
+        */
+       unsigned int                    or_logical_set:1;
+       /**
+        * Request offset ranges have been filled in with physical offset
+        * values.
+        */
+       unsigned int                    or_physical_set:1;
+};
+
+/** @} ORR/TRR */
+
+/**
  * NRS request
  *
  * Instances of this object exist embedded within ptlrpc_request; the main
  * NRS request
  *
  * Instances of this object exist embedded within ptlrpc_request; the main
@@ -1543,6 +1733,8 @@ struct ptlrpc_nrs_request {
                 * CRR-N request defintion
                 */
                struct nrs_crrn_req     crr;
                 * CRR-N request defintion
                 */
                struct nrs_crrn_req     crr;
+               /** ORR and TRR share the same request definition */
+               struct nrs_orr_req      orr;
        } nr_u;
        /**
         * Externally-registering policies may want to use this to allocate
        } nr_u;
        /**
         * Externally-registering policies may want to use this to allocate
index 06396a0..9c9ae28 100644 (file)
@@ -824,7 +824,7 @@ do {                                                                          \
 #define OBD_SLAB_ALLOC_PTR_GFP(ptr, slab, flags)                             \
        OBD_SLAB_ALLOC_GFP(ptr, slab, sizeof *(ptr), flags)
 
 #define OBD_SLAB_ALLOC_PTR_GFP(ptr, slab, flags)                             \
        OBD_SLAB_ALLOC_GFP(ptr, slab, sizeof *(ptr), flags)
 
-#define OBD_SLAB_CPT_ALLOC_PTR_GFP(ptr, slab, ctab, cpt, flags)                      \
+#define OBD_SLAB_CPT_ALLOC_PTR_GFP(ptr, slab, cptab, cpt, flags)                     \
        OBD_SLAB_CPT_ALLOC_GFP(ptr, slab, cptab, cpt, sizeof *(ptr), flags)
 
 #define OBD_SLAB_FREE_PTR(ptr, slab)                                         \
        OBD_SLAB_CPT_ALLOC_GFP(ptr, slab, cptab, cpt, sizeof *(ptr), flags)
 
 #define OBD_SLAB_FREE_PTR(ptr, slab)                                         \
index 9cf66f2..ef70c05 100644 (file)
@@ -14,7 +14,7 @@ ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o
 ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o
 ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o
 ptlrpc_objs += sec.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o
 ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o
 ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o
 ptlrpc_objs += sec.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o
-ptlrpc_objs += sec_null.o sec_plain.o nrs.o nrs_fifo.o nrs_crr.o
+ptlrpc_objs += sec_null.o sec_plain.o nrs.o nrs_fifo.o nrs_crr.o nrs_orr.o
 
 target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
 
 
 target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
 
index d1f2a83..d27f72b 100644 (file)
@@ -95,6 +95,7 @@ ptlrpc_SOURCES =      \
        nrs.c           \
        nrs_fifo.c      \
        nrs_crr.c       \
        nrs.c           \
        nrs_fifo.c      \
        nrs_crr.c       \
+       nrs_orr.c       \
        wiretest.c      \
        sec.c           \
        sec_bulk.c      \
        wiretest.c      \
        sec.c           \
        sec_bulk.c      \
index f4700f8..5343ab3 100644 (file)
@@ -1744,6 +1744,9 @@ extern struct ptlrpc_nrs_pol_conf nrs_conf_fifo;
 #if defined HAVE_SERVER_SUPPORT && defined(__KERNEL__)
 /* ptlrpc/nrs_crr.c */
 extern struct ptlrpc_nrs_pol_conf nrs_conf_crrn;
 #if defined HAVE_SERVER_SUPPORT && defined(__KERNEL__)
 /* ptlrpc/nrs_crr.c */
 extern struct ptlrpc_nrs_pol_conf nrs_conf_crrn;
+/* ptlrpc/nrs_orr.c */
+extern struct ptlrpc_nrs_pol_conf nrs_conf_orr;
+extern struct ptlrpc_nrs_pol_conf nrs_conf_trr;
 #endif
 
 /**
 #endif
 
 /**
@@ -1769,6 +1772,14 @@ int ptlrpc_nrs_init(void)
        rc = ptlrpc_nrs_policy_register(&nrs_conf_crrn);
        if (rc != 0)
                GOTO(fail, rc);
        rc = ptlrpc_nrs_policy_register(&nrs_conf_crrn);
        if (rc != 0)
                GOTO(fail, rc);
+
+       rc = ptlrpc_nrs_policy_register(&nrs_conf_orr);
+       if (rc != 0)
+               GOTO(fail, rc);
+
+       rc = ptlrpc_nrs_policy_register(&nrs_conf_trr);
+       if (rc != 0)
+               GOTO(fail, rc);
 #endif
 
        RETURN(rc);
 #endif
 
        RETURN(rc);
diff --git a/lustre/ptlrpc/nrs_orr.c b/lustre/ptlrpc/nrs_orr.c
new file mode 100644 (file)
index 0000000..c660916
--- /dev/null
@@ -0,0 +1,2034 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License version 2 for more details.  A copy is
+ * included in the COPYING file that accompanied this code.
+
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2011 Intel Corporation
+ *
+ * Copyright 2012 Xyratex Technology Limited
+ */
+/*
+ * lustre/ptlrpc/nrs_orr.c
+ *
+ * Network Request Scheduler (NRS) ORR and TRR policies
+ *
+ * Request scheduling in a Round-Robin manner over backend-fs objects and OSTs
+ * respectively
+ *
+ * Author: Liang Zhen <liang@whamcloud.com>
+ * Author: Nikitas Angelinas <nikitas_angelinas@xyratex.com>
+ */
+#ifdef HAVE_SERVER_SUPPORT
+
+/**
+ * \addtogoup nrs
+ * @{
+ */
+#define DEBUG_SUBSYSTEM S_RPC
+#include <obd_support.h>
+#include <obd_class.h>
+#include <lustre_net.h>
+#include <lustre/lustre_idl.h>
+#include <lustre_req_layout.h>
+#include "ptlrpc_internal.h"
+
+/**
+ * \name ORR/TRR policy
+ *
+ * ORR/TRR (Object-based Round Robin/Target-based Round Robin) NRS policies
+ *
+ * ORR performs batched Round Robin shceduling of brw RPCs, based on the FID of
+ * the backend-fs object that the brw RPC pertains to; the TRR policy performs
+ * batched Round Robin scheduling of brw RPCs, based on the OST index that the
+ * RPC pertains to. Both policies also order RPCs in each batch in ascending
+ * offset order, which is lprocfs-tunable between logical file offsets, and
+ * physical disk offsets, as reported by fiemap.
+ *
+ * The TRR policy reuses much of the functionality of ORR. These two scheduling
+ * algorithms could alternatively be implemented under a single NRS policy, that
+ * uses an lprocfs tunable in order to switch between the two types of
+ * scheduling behaviour. The two algorithms have been implemented as separate
+ * policies for reasons of clarity to the user, and to avoid issues that would
+ * otherwise arise at the point of switching between behaviours in the case of
+ * having a single policy, such as resource cleanup for nrs_orr_object
+ * instances. It is possible that this may need to be re-examined in the future,
+ * along with potentially coalescing other policies that perform batched request
+ * scheduling in a Round-Robin manner, all into one policy.
+ *
+ * @{
+ */
+
+#define NRS_POL_NAME_ORR       "orr"
+#define NRS_POL_NAME_TRR       "trr"
+
+/**
+ * Checks if the RPC type of \a nrq is currently handled by an ORR/TRR policy
+ *
+ * \param[in]  orrd   the ORR/TRR policy scheduler instance
+ * \param[in]  nrq    the request
+ * \param[out] opcode the opcode is saved here, just in order to avoid calling
+ *                   lustre_msg_get_opc() again later
+ *
+ * \retval true  request type is supported by the policy instance
+ * \retval false request type is not supported by the policy instance
+ */
+static bool nrs_orr_req_supported(struct nrs_orr_data *orrd,
+                                 struct ptlrpc_nrs_request *nrq, __u32 *opcode)
+{
+       struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
+                                                  rq_nrq);
+       __u32                   opc = lustre_msg_get_opc(req->rq_reqmsg);
+       bool                    rc = false;
+
+       /**
+        * XXX: nrs_orr_data::od_supp accessed unlocked.
+        */
+       switch (opc) {
+       case OST_READ:
+               rc = orrd->od_supp & NOS_OST_READ;
+               break;
+       case OST_WRITE:
+               rc = orrd->od_supp & NOS_OST_WRITE;
+               break;
+       }
+
+       if (rc)
+               *opcode = opc;
+
+       return rc;
+}
+
+/**
+ * Returns the ORR/TRR key fields for the request \a nrq in \a key.
+ *
+ * \param[in]  orrd the ORR/TRR policy scheduler instance
+ * \param[in]  nrq  the request
+ * \param[in]  opc  the request's opcode
+ * \param[in]  name the policy name
+ * \param[out] key  fields of the key are returned here.
+ *
+ * \retval 0   key filled successfully
+ * \retval < 0 error
+ */
+static int nrs_orr_key_fill(struct nrs_orr_data *orrd,
+                           struct ptlrpc_nrs_request *nrq, __u32 opc,
+                           char *name, struct nrs_orr_key *key)
+{
+       struct ptlrpc_request  *req = container_of(nrq, struct ptlrpc_request,
+                                                  rq_nrq);
+       struct ost_body        *body;
+       __u32                   ost_idx;
+       bool                    is_orr = strncmp(name, NRS_POL_NAME_ORR,
+                                                NRS_POL_NAME_MAX) == 0;
+
+       LASSERT(req != NULL);
+
+       /**
+        * This is an attempt to fill in the request key fields while
+        * moving a request from the regular to the high-priority NRS
+        * head (via ldlm_lock_reorder_req()), but the request key has
+        * been adequately filled when nrs_orr_res_get() was called through
+        * ptlrpc_nrs_req_initialize() for the regular NRS head's ORR/TRR
+        * policy, so there is nothing to do.
+        */
+       if ((is_orr && nrq->nr_u.orr.or_orr_set) ||
+           (!is_orr && nrq->nr_u.orr.or_trr_set)) {
+               *key = nrq->nr_u.orr.or_key;
+               return 0;
+       }
+
+       if (nrq->nr_u.orr.or_orr_set || nrq->nr_u.orr.or_trr_set)
+               memset(&nrq->nr_u.orr.or_key, 0, sizeof(nrq->nr_u.orr.or_key));
+
+       ost_idx = class_server_data(req->rq_export->exp_obd)->lsd_osd_index;
+
+       if (is_orr) {
+               int     rc;
+               /**
+                * The request pill for OST_READ and OST_WRITE requests is
+                * initialized in the ost_io service's
+                * ptlrpc_service_ops::so_hpreq_handler, ost_io_hpreq_handler(),
+                * so no need to redo it here.
+                */
+               body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+               if (body == NULL)
+                       RETURN(-EFAULT);
+
+               rc = ostid_to_fid(&key->ok_fid, &body->oa.o_oi, ost_idx);
+               if (rc < 0)
+                       return rc;
+
+               nrq->nr_u.orr.or_orr_set = 1;
+       } else {
+               key->ok_idx = ost_idx;
+               nrq->nr_u.orr.or_trr_set = 1;
+       }
+
+       return 0;
+}
+
+/**
+ * Populates the range values in \a range with logical offsets obtained via
+ * \a nb.
+ *
+ * \param[in]  nb      niobuf_remote struct array for this request
+ * \param[in]  niocount        count of niobuf_remote structs for this request
+ * \param[out] range   the offset range is returned here
+ */
+static void nrs_orr_range_fill_logical(struct niobuf_remote *nb, int niocount,
+                                      struct nrs_orr_req_range *range)
+{
+       /* Should we do this at page boundaries ? */
+       range->or_start = nb[0].offset & CFS_PAGE_MASK;
+       range->or_end = (nb[niocount - 1].offset +
+                        nb[niocount - 1].len - 1) | ~CFS_PAGE_MASK;
+}
+
+/**
+ * We obtain information just for a single extent, as the request can only be in
+ * a single place in the binary heap anyway.
+ */
+#define ORR_NUM_EXTENTS 1
+
+/**
+ * Converts the logical file offset range in \a range, to a physical disk offset
+ * range in \a range, for a request. Uses obd_get_info() in order to carry out a
+ * fiemap call and obtain backend-fs extent information. The returned range is
+ * in physical block numbers.
+ *
+ * \param[in]    nrq   the request
+ * \param[in]    oa    obdo struct for this request
+ * \param[in,out] range        the offset range in bytes; logical range in, physical
+ *                     range out
+ *
+ * \retval 0   physical offsets obtained successfully
+ * \retvall < 0 error
+ */
+static int nrs_orr_range_fill_physical(struct ptlrpc_nrs_request *nrq,
+                                      struct obdo *oa,
+                                      struct nrs_orr_req_range *range)
+{
+       struct ptlrpc_request     *req = container_of(nrq,
+                                                     struct ptlrpc_request,
+                                                     rq_nrq);
+       char                       fiemap_buf[offsetof(struct ll_user_fiemap,
+                                                 fm_extents[ORR_NUM_EXTENTS])];
+       struct ll_user_fiemap     *fiemap = (struct ll_user_fiemap *)fiemap_buf;
+       struct ll_fiemap_info_key  key;
+       loff_t                     start;
+       loff_t                     end;
+       int                        rc;
+
+       key = (typeof(key)) {
+               .name = KEY_FIEMAP,
+               .oa = *oa,
+               .fiemap = {
+                       .fm_start = range->or_start,
+                       .fm_length = range->or_end - range->or_start,
+                       .fm_extent_count = ORR_NUM_EXTENTS
+               }
+       };
+
+       rc = obd_get_info(req->rq_svc_thread->t_env, req->rq_export,
+                         sizeof(key), &key, NULL, fiemap, NULL);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       if (fiemap->fm_mapped_extents == 0 ||
+           fiemap->fm_mapped_extents > ORR_NUM_EXTENTS)
+               GOTO(out, rc = -EFAULT);
+
+       /**
+        * Calculate the physical offset ranges for the request from the extent
+        * information and the logical request offsets.
+        */
+       start = fiemap->fm_extents[0].fe_physical + range->or_start -
+               fiemap->fm_extents[0].fe_logical;
+       end = start + range->or_end - range->or_start;
+
+       range->or_start = start;
+       range->or_end = end;
+
+       nrq->nr_u.orr.or_physical_set = 1;
+out:
+       return rc;
+}
+
+/**
+ * Sets the offset range the request covers; either in logical file
+ * offsets or in physical disk offsets.
+ *
+ * \param[in] nrq       the request
+ * \param[in] orrd      the ORR/TRR policy scheduler instance
+ * \param[in] opc       the request's opcode
+ * \param[in] moving_req is the request in the process of moving onto the
+ *                      high-priority NRS head?
+ *
+ * \retval 0   range filled successfully
+ * \retval != 0 error
+ */
+static int nrs_orr_range_fill(struct ptlrpc_nrs_request *nrq,
+                             struct nrs_orr_data *orrd, __u32 opc,
+                             bool moving_req)
+{
+       struct ptlrpc_request       *req = container_of(nrq,
+                                                       struct ptlrpc_request,
+                                                       rq_nrq);
+       struct obd_ioobj            *ioo;
+       struct niobuf_remote        *nb;
+       struct ost_body             *body;
+       struct nrs_orr_req_range     range;
+       int                          niocount;
+       int                          rc = 0;
+
+       /**
+        * If we are scheduling using physical disk offsets, but we have filled
+        * the offset information in the request previously
+        * (i.e. ldlm_lock_reorder_req() is moving the request to the
+        * high-priority NRS head), there is no need to do anything, and we can
+        * exit. Moreover than the lack of need, we would be unable to perform
+        * the obd_get_info() call required in nrs_orr_range_fill_physical(),
+        * because ldlm_lock_reorder_lock() calls into here while holding a
+        * spinlock, and retrieving fiemap information via obd_get_info() is a
+        * potentially sleeping operation.
+        */
+       if (orrd->od_physical && nrq->nr_u.orr.or_physical_set)
+               return 0;
+
+       ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+       if (ioo == NULL)
+               GOTO(out, rc = -EFAULT);
+
+       niocount = ioo->ioo_bufcnt;
+
+       nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+       if (nb == NULL)
+               GOTO(out, rc = -EFAULT);
+
+       /**
+        * Use logical information from niobuf_remote structures.
+        */
+       nrs_orr_range_fill_logical(nb, niocount, &range);
+
+       /**
+        * Obtain physical offsets if selected, and this is an OST_READ RPC
+        * RPC. We do not enter this block if moving_req is set which indicates
+        * that the request is being moved to the high-priority NRS head by
+        * ldlm_lock_reorder_req(), as that function calls in here while holding
+        * a spinlock, and nrs_orr_range_physical() can sleep, so we just use
+        * logical file offsets for the range values for such requests.
+        */
+       if (orrd->od_physical && opc == OST_READ && !moving_req) {
+               body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+               if (body == NULL)
+                       GOTO(out, rc = -EFAULT);
+
+               /**
+                * Translate to physical block offsets from backend filesystem
+                * extents.
+                * Ignore return values; if obtaining the physical offsets
+                * fails, use the logical offsets.
+                */
+               nrs_orr_range_fill_physical(nrq, &body->oa, &range);
+       }
+
+       nrq->nr_u.orr.or_range = range;
+out:
+       return rc;
+}
+
+/**
+ * Generates a character string that can be used in order to register uniquely
+ * named libcfs_hash and slab objects for ORR/TRR policy instances. The
+ * character string is unique per policy instance, as it includes the policy's
+ * name, the CPT number, and a {reg|hp} token, and there is one policy instance
+ * per NRS head on each CPT, and the policy is only compatible with the ost_io
+ * service.
+ *
+ * \param[in] policy the policy instance
+ * \param[out] name  the character array that will hold the generated name
+ */
+static void nrs_orr_genobjname(struct ptlrpc_nrs_policy *policy, char *name)
+{
+       snprintf(name, NRS_ORR_OBJ_NAME_MAX, "%s%s%s%d",
+                "nrs_", policy->pol_desc->pd_name,
+                policy->pol_nrs->nrs_queue_type == PTLRPC_NRS_QUEUE_REG ?
+                "_reg_" : "_hp_", nrs_pol2cptid(policy));
+}
+
+/**
+ * ORR/TRR hash operations
+ */
+#define NRS_ORR_BITS           24
+#define NRS_ORR_BKT_BITS       12
+#define NRS_ORR_HASH_FLAGS     (CFS_HASH_RW_BKTLOCK | CFS_HASH_ASSERT_EMPTY)
+
+#define NRS_TRR_BITS           4
+#define NRS_TRR_BKT_BITS       2
+#define NRS_TRR_HASH_FLAGS     CFS_HASH_RW_BKTLOCK
+
+static unsigned nrs_orr_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask)
+{
+       return cfs_hash_djb2_hash(key, sizeof(struct nrs_orr_key), mask);
+}
+
+static void *nrs_orr_hop_key(cfs_hlist_node_t *hnode)
+{
+       struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
+                                                     struct nrs_orr_object,
+                                                     oo_hnode);
+       return &orro->oo_key;
+}
+
+static int nrs_orr_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
+{
+       struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
+                                                     struct nrs_orr_object,
+                                                     oo_hnode);
+
+       return lu_fid_eq(&orro->oo_key.ok_fid,
+                        &((struct nrs_orr_key *)key)->ok_fid);
+}
+
+static void *nrs_orr_hop_object(cfs_hlist_node_t *hnode)
+{
+       return cfs_hlist_entry(hnode, struct nrs_orr_object, oo_hnode);
+}
+
+static void nrs_orr_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+       struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
+                                                     struct nrs_orr_object,
+                                                     oo_hnode);
+       cfs_atomic_inc(&orro->oo_ref);
+}
+
+/**
+ * Removes an nrs_orr_object the hash and frees its memory, if the object has
+ * no active users.
+ */
+static void nrs_orr_hop_put_free(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+       struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
+                                                     struct nrs_orr_object,
+                                                     oo_hnode);
+       struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
+                                                  struct nrs_orr_data, od_res);
+       cfs_hash_bd_t          bds[2];
+
+       if (cfs_atomic_dec_return(&orro->oo_ref) > 1)
+               return;
+
+       cfs_hash_lock(hs, 0);
+       cfs_hash_dual_bd_get_and_lock(hs, &orro->oo_key, bds, 1);
+
+       /**
+        * Another thread may have won the race and taken a reference on the
+        * nrs_orr_object.
+        */
+       if (cfs_atomic_read(&orro->oo_ref) > 1)
+               goto lost_race;
+
+       if (bds[1].bd_bucket == NULL)
+               cfs_hash_bd_del_locked(hs, &bds[0], hnode);
+       else
+               hnode = cfs_hash_dual_bd_finddel_locked(hs, bds, &orro->oo_key,
+                                                       hnode);
+       LASSERT(hnode != NULL);
+
+       OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
+
+lost_race:
+
+       cfs_hash_dual_bd_unlock(hs, bds, 1);
+       cfs_hash_unlock(hs, 0);
+}
+
+static void nrs_orr_hop_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+       struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
+                                                     struct nrs_orr_object,
+                                                     oo_hnode);
+       cfs_atomic_dec(&orro->oo_ref);
+}
+
+static int nrs_trr_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
+{
+       struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
+                                                     struct nrs_orr_object,
+                                                     oo_hnode);
+
+       return orro->oo_key.ok_idx == ((struct nrs_orr_key *)key)->ok_idx;
+}
+
+static void nrs_trr_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+       struct nrs_orr_object *orro = cfs_hlist_entry(hnode,
+                                                     struct nrs_orr_object,
+                                                     oo_hnode);
+       struct nrs_orr_data   *orrd = container_of(orro->oo_res.res_parent,
+                                                  struct nrs_orr_data, od_res);
+
+       LASSERTF(cfs_atomic_read(&orro->oo_ref) == 0,
+                "Busy NRS TRR policy object for OST with index %u, with %d "
+                "refs\n", orro->oo_key.ok_idx, cfs_atomic_read(&orro->oo_ref));
+
+       OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
+}
+
+static cfs_hash_ops_t nrs_orr_hash_ops = {
+       .hs_hash        = nrs_orr_hop_hash,
+       .hs_key         = nrs_orr_hop_key,
+       .hs_keycmp      = nrs_orr_hop_keycmp,
+       .hs_object      = nrs_orr_hop_object,
+       .hs_get         = nrs_orr_hop_get,
+       .hs_put         = nrs_orr_hop_put_free,
+       .hs_put_locked  = nrs_orr_hop_put,
+};
+
+static cfs_hash_ops_t nrs_trr_hash_ops = {
+       .hs_hash        = nrs_orr_hop_hash,
+       .hs_key         = nrs_orr_hop_key,
+       .hs_keycmp      = nrs_trr_hop_keycmp,
+       .hs_object      = nrs_orr_hop_object,
+       .hs_get         = nrs_orr_hop_get,
+       .hs_put         = nrs_orr_hop_put,
+       .hs_put_locked  = nrs_orr_hop_put,
+       .hs_exit        = nrs_trr_hop_exit,
+};
+
+#define NRS_ORR_QUANTUM_DFLT   256
+
+/**
+ * Binary heap predicate.
+ *
+ * Uses
+ * ptlrpc_nrs_request::nr_u::orr::or_round,
+ * ptlrpc_nrs_request::nr_u::orr::or_sequence, and
+ * ptlrpc_nrs_request::nr_u::orr::or_range to compare two binheap nodes and
+ * produce a binary predicate that indicates their relative priority, so that
+ * the binary heap can perform the necessary sorting operations.
+ *
+ * \param[in] e1 the first binheap node to compare
+ * \param[in] e2 the second binheap node to compare
+ *
+ * \retval 0 e1 > e2
+ * \retval 1 e1 < e2
+ */
+static int orr_req_compare(cfs_binheap_node_t *e1, cfs_binheap_node_t *e2)
+{
+       struct ptlrpc_nrs_request *nrq1;
+       struct ptlrpc_nrs_request *nrq2;
+
+       nrq1 = container_of(e1, struct ptlrpc_nrs_request, nr_node);
+       nrq2 = container_of(e2, struct ptlrpc_nrs_request, nr_node);
+
+       /**
+        * Requests have been scheduled against a different scheduling round.
+        */
+       if (nrq1->nr_u.orr.or_round < nrq2->nr_u.orr.or_round)
+               return 1;
+       else if (nrq1->nr_u.orr.or_round > nrq2->nr_u.orr.or_round)
+               return 0;
+
+       /**
+        * Requests have been scheduled against the same scheduling round, but
+        * belong to a different batch, i.e. they pertain to a different
+        * backend-fs object (for ORR policy instances) or OST (for TRR policy
+        * instances).
+        */
+       if (nrq1->nr_u.orr.or_sequence < nrq2->nr_u.crr.cr_sequence)
+               return 1;
+       else if (nrq1->nr_u.orr.or_sequence > nrq2->nr_u.crr.cr_sequence)
+               return 0;
+
+       /**
+        * If round numbers and sequence numbers are equal, the two requests
+        * have been scheduled on the same round, and belong to the same batch,
+        * which means they pertain to the same backend-fs object (if this is an
+        * ORR policy instance), or to the same OST (if this is a TRR policy
+        * instance), so these requests should be sorted by ascending offset
+        * order.
+        */
+       if (nrq1->nr_u.orr.or_range.or_start <
+           nrq2->nr_u.orr.or_range.or_start) {
+               return 1;
+       } else if (nrq1->nr_u.orr.or_range.or_start >
+                nrq2->nr_u.orr.or_range.or_start) {
+               return 0;
+       } else {
+               /**
+                * Requests start from the same offset; Dispatch the shorter one
+                * first; perhaps slightly more chances of hitting caches like
+                * this.
+                */
+               return nrq1->nr_u.orr.or_range.or_end <
+                      nrq2->nr_u.orr.or_range.or_end;
+       }
+}
+
+/**
+ * ORR binary heap operations
+ */
+static cfs_binheap_ops_t nrs_orr_heap_ops = {
+       .hop_enter      = NULL,
+       .hop_exit       = NULL,
+       .hop_compare    = orr_req_compare,
+};
+
+/**
+ * Prints a warning message if an ORR/TRR policy is started on a service with
+ * more than one CPT.
+ *
+ * \param[in] policy the policy instance
+ *
+ * \retval 0 success
+ */
+static int nrs_orr_init(struct ptlrpc_nrs_policy *policy)
+{
+       if (policy->pol_nrs->nrs_svcpt->scp_service->srv_ncpts > 1) {
+               bool is_orr = strncmp(policy->pol_desc->pd_name,
+                                     NRS_POL_NAME_ORR, NRS_POL_NAME_MAX) == 0;
+
+               CWARN("A%s %s NRS policy has been registered on a PTLRPC "
+                     "service which has more than one service partition. "
+                     "Please be advised that this policy may perform better "
+                     "on services with only one partition.\n",
+                     is_orr ? "n" : "", policy->pol_desc->pd_name);
+       }
+       return 0;
+}
+
+/**
+ * Called when an ORR policy instance is started.
+ *
+ * \param[in] policy the policy
+ *
+ * \retval -ENOMEM OOM error
+ * \retval 0      success
+ */
+static int nrs_orr_start(struct ptlrpc_nrs_policy *policy)
+{
+       struct nrs_orr_data    *orrd;
+       cfs_hash_ops_t         *ops;
+       unsigned                cur_bits;
+       unsigned                max_bits;
+       unsigned                bkt_bits;
+       unsigned                flags;
+       int                     rc = 0;
+       ENTRY;
+
+       OBD_CPT_ALLOC_PTR(orrd, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
+       if (orrd == NULL)
+               RETURN(-ENOMEM);
+
+       /*
+        * Binary heap instance for sorted incoming requests.
+        */
+       orrd->od_binheap = cfs_binheap_create(&nrs_orr_heap_ops,
+                                             CBH_FLAG_ATOMIC_GROW, 4096, NULL,
+                                             nrs_pol2cptab(policy),
+                                             nrs_pol2cptid(policy));
+       if (orrd->od_binheap == NULL)
+               GOTO(failed, rc = -ENOMEM);
+
+       nrs_orr_genobjname(policy, orrd->od_objname);
+
+       /**
+        * Slab cache for NRS ORR/TRR objects.
+        */
+       orrd->od_cache = cfs_mem_cache_create(orrd->od_objname,
+                                             sizeof(struct nrs_orr_object),
+                                             0, 0);
+       if (orrd->od_cache == NULL)
+               GOTO(failed, rc = -ENOMEM);
+
+       if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
+                   NRS_POL_NAME_MAX) == 0) {
+               ops = &nrs_orr_hash_ops;
+               cur_bits = NRS_ORR_BITS;
+               max_bits = NRS_ORR_BITS;
+               bkt_bits = NRS_ORR_BKT_BITS;
+               flags = NRS_ORR_HASH_FLAGS;
+       } else {
+               ops = &nrs_trr_hash_ops;
+               cur_bits = NRS_TRR_BITS;
+               max_bits = NRS_TRR_BITS;
+               bkt_bits = NRS_TRR_BKT_BITS;
+               flags = NRS_TRR_HASH_FLAGS;
+       }
+
+       /**
+        * Hash for finding objects by struct nrs_orr_key.
+        * XXX: For TRR, it might be better to avoid using libcfs_hash?
+        * All that needs to be resolved are OST indices, and they
+        * will stay relatively stable during an OSS node's lifetime.
+        */
+       orrd->od_obj_hash = cfs_hash_create(orrd->od_objname, cur_bits,
+                                           max_bits, bkt_bits, 0,
+                                           CFS_HASH_MIN_THETA,
+                                           CFS_HASH_MAX_THETA, ops, flags);
+       if (orrd->od_obj_hash == NULL)
+               GOTO(failed, rc = -ENOMEM);
+
+       /* XXX: Fields accessed unlocked */
+       orrd->od_quantum = NRS_ORR_QUANTUM_DFLT;
+       orrd->od_supp = NOS_DFLT;
+       orrd->od_physical = true;
+       /**
+        * Set to 1 so that the test inside nrs_orr_req_add() can evaluate to
+        * true.
+        */
+       orrd->od_sequence = 1;
+
+       policy->pol_private = orrd;
+
+       RETURN(rc);
+
+failed:
+       if (orrd->od_cache) {
+               rc = cfs_mem_cache_destroy(orrd->od_cache);
+               LASSERTF(rc == 0, "Could not destroy od_cache slab\n");
+       }
+       if (orrd->od_binheap != NULL)
+               cfs_binheap_destroy(orrd->od_binheap);
+
+       OBD_FREE_PTR(orrd);
+
+       RETURN(rc);
+}
+
+/**
+ * Called when an ORR/TRR policy instance is stopped.
+ *
+ * Called when the policy has been instructed to transition to the
+ * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state and has no more
+ * pending requests to serve.
+ *
+ * \param[in] policy the policy
+ */
+static void nrs_orr_stop(struct ptlrpc_nrs_policy *policy)
+{
+       struct nrs_orr_data *orrd = policy->pol_private;
+       ENTRY;
+
+       LASSERT(orrd != NULL);
+       LASSERT(orrd->od_binheap != NULL);
+       LASSERT(orrd->od_obj_hash != NULL);
+       LASSERT(orrd->od_cache != NULL);
+       LASSERT(cfs_binheap_is_empty(orrd->od_binheap));
+
+       cfs_binheap_destroy(orrd->od_binheap);
+       cfs_hash_putref(orrd->od_obj_hash);
+       cfs_mem_cache_destroy(orrd->od_cache);
+
+       OBD_FREE_PTR(orrd);
+}
+
+/**
+ * Performs a policy-specific ctl function on ORR/TRR policy instances; similar
+ * to ioctl.
+ *
+ * \param[in]    policy the policy instance
+ * \param[in]    opc    the opcode
+ * \param[in,out] arg   used for passing parameters and information
+ *
+ * \pre spin_is_locked(&policy->pol_nrs->->nrs_lock)
+ * \post spin_is_locked(&policy->pol_nrs->->nrs_lock)
+ *
+ * \retval 0   operation carried successfully
+ * \retval -ve error
+ */
+int nrs_orr_ctl(struct ptlrpc_nrs_policy *policy, enum ptlrpc_nrs_ctl opc,
+               void *arg)
+{
+       LASSERT(spin_is_locked(&policy->pol_nrs->nrs_lock));
+
+       switch(opc) {
+       default:
+               RETURN(-EINVAL);
+
+       case NRS_CTL_ORR_RD_QUANTUM: {
+               struct nrs_orr_data     *orrd = policy->pol_private;
+
+               *(__u16 *)arg = orrd->od_quantum;
+               }
+               break;
+
+       case NRS_CTL_ORR_WR_QUANTUM: {
+               struct nrs_orr_data     *orrd = policy->pol_private;
+
+               orrd->od_quantum = *(__u16 *)arg;
+               LASSERT(orrd->od_quantum != 0);
+               }
+               break;
+
+       case NRS_CTL_ORR_RD_OFF_TYPE: {
+               struct nrs_orr_data     *orrd = policy->pol_private;
+
+               *(bool *)arg = orrd->od_physical;
+               }
+               break;
+
+       case NRS_CTL_ORR_WR_OFF_TYPE: {
+               struct nrs_orr_data     *orrd = policy->pol_private;
+
+               orrd->od_physical = *(bool *)arg;
+               }
+               break;
+
+       case NRS_CTL_ORR_RD_SUPP_REQ: {
+               struct nrs_orr_data     *orrd = policy->pol_private;
+
+               *(enum nrs_orr_supp *)arg = orrd->od_supp;
+               }
+               break;
+
+       case NRS_CTL_ORR_WR_SUPP_REQ: {
+               struct nrs_orr_data     *orrd = policy->pol_private;
+
+               orrd->od_supp = *(enum nrs_orr_supp *)arg;
+               LASSERT((orrd->od_supp & NOS_OST_RW) != 0);
+               }
+               break;
+       }
+       RETURN(0);
+}
+
+/**
+ * Obtains resources for ORR/TRR policy instances. The top-level resource lives
+ * inside \e nrs_orr_data and the second-level resource inside
+ * \e nrs_orr_object instances.
+ *
+ * \param[in]  policy    the policy for which resources are being taken for
+ *                       request \a nrq
+ * \param[in]  nrq       the request for which resources are being taken
+ * \param[in]  parent    parent resource, embedded in nrs_orr_data for the
+ *                       ORR/TRR policies
+ * \param[out] resp      used to return resource references
+ * \param[in]  moving_req signifies limited caller context; used to perform
+ *                       memory allocations in an atomic context in this
+ *                       policy
+ *
+ * \retval 0   we are returning a top-level, parent resource, one that is
+ *            embedded in an nrs_orr_data object
+ * \retval 1   we are returning a bottom-level resource, one that is embedded
+ *            in an nrs_orr_object object
+ *
+ * \see nrs_resource_get_safe()
+ */
+int nrs_orr_res_get(struct ptlrpc_nrs_policy *policy,
+                   struct ptlrpc_nrs_request *nrq,
+                   const struct ptlrpc_nrs_resource *parent,
+                   struct ptlrpc_nrs_resource **resp, bool moving_req)
+{
+       struct nrs_orr_data            *orrd;
+       struct nrs_orr_object          *orro;
+       struct nrs_orr_object          *tmp;
+       struct nrs_orr_key              key = { { { 0 } } };
+       __u32                           opc;
+       int                             rc = 0;
+
+       /**
+        * struct nrs_orr_data is requested.
+        */
+       if (parent == NULL) {
+               *resp = &((struct nrs_orr_data *)policy->pol_private)->od_res;
+               return 0;
+       }
+
+       orrd = container_of(parent, struct nrs_orr_data, od_res);
+
+       /**
+        * If the request type is not supported, fail the enqueuing; the RPC
+        * will be handled by the fallback NRS policy.
+        */
+       if (!nrs_orr_req_supported(orrd, nrq, &opc))
+               return -1;
+
+       /**
+        * Fill in the key for the request; OST FID for ORR policy instances,
+        * and OST index for TRR policy instances.
+        */
+       rc = nrs_orr_key_fill(orrd, nrq, opc, policy->pol_desc->pd_name, &key);
+       if (rc < 0)
+               RETURN(rc);
+
+       /**
+        * Set the offset range the request covers
+        */
+       rc = nrs_orr_range_fill(nrq, orrd, opc, moving_req);
+       if (rc < 0)
+               RETURN(rc);
+
+       orro = cfs_hash_lookup(orrd->od_obj_hash, &key);
+       if (orro != NULL)
+               goto out;
+
+       OBD_SLAB_CPT_ALLOC_PTR_GFP(orro, orrd->od_cache,
+                                  nrs_pol2cptab(policy), nrs_pol2cptid(policy),
+                                  moving_req ? CFS_ALLOC_ATOMIC :
+                                  CFS_ALLOC_IO);
+       if (orro == NULL)
+               RETURN(-ENOMEM);
+
+       orro->oo_key = key;
+       cfs_atomic_set(&orro->oo_ref, 1);
+
+       tmp = cfs_hash_findadd_unique(orrd->od_obj_hash, &orro->oo_key,
+                                     &orro->oo_hnode);
+       if (tmp != orro) {
+               OBD_SLAB_FREE_PTR(orro, orrd->od_cache);
+               orro = tmp;
+       }
+out:
+       /**
+        * For debugging purposes
+        */
+       nrq->nr_u.orr.or_key = orro->oo_key;
+
+       *resp = &orro->oo_res;
+
+       return 1;
+}
+
+/**
+ * Called when releasing references to the resource hierachy obtained for a
+ * request for scheduling using ORR/TRR policy instances
+ *
+ * \param[in] policy   the policy the resource belongs to
+ * \param[in] res      the resource to be released
+ */
+static void nrs_orr_res_put(struct ptlrpc_nrs_policy *policy,
+                           const struct ptlrpc_nrs_resource *res)
+{
+       struct nrs_orr_data     *orrd;
+       struct nrs_orr_object   *orro;
+
+       /**
+        * Do nothing for freeing parent, nrs_orr_data resources.
+        */
+       if (res->res_parent == NULL)
+               return;
+
+       orro = container_of(res, struct nrs_orr_object, oo_res);
+       orrd = container_of(res->res_parent, struct nrs_orr_data, od_res);
+
+       cfs_hash_put(orrd->od_obj_hash, &orro->oo_hnode);
+}
+
+/**
+ * Called when polling an ORR/TRR policy instance for a request so that it can
+ * be served. Returns the request that is at the root of the binary heap, as
+ * that is the lowest priority one (i.e. libcfs_heap is an implementation of a
+ * min-heap)
+ *
+ * \param[in] policy the policy instance being polled
+ * \param[in] peek   when set, signifies that we just want to examine the
+ *                  request, and not handle it, so the request is not removed
+ *                  from the policy.
+ * \param[in] force  force the policy to return a request; unused in this policy
+ *
+ * \retval the request to be handled
+ * \retval NULL no request available
+ *
+ * \see ptlrpc_nrs_req_get_nolock()
+ * \see nrs_request_get()
+ */
+static
+struct ptlrpc_nrs_request *nrs_orr_req_get(struct ptlrpc_nrs_policy *policy,
+                                          bool peek, bool force)
+{
+       struct nrs_orr_data       *orrd = policy->pol_private;
+       cfs_binheap_node_t        *node = cfs_binheap_root(orrd->od_binheap);
+       struct ptlrpc_nrs_request *nrq;
+
+       nrq = unlikely(node == NULL) ? NULL :
+             container_of(node, struct ptlrpc_nrs_request, nr_node);
+
+       if (likely(!peek && nrq != NULL)) {
+               struct nrs_orr_object *orro;
+
+               orro = container_of(nrs_request_resource(nrq),
+                                   struct nrs_orr_object, oo_res);
+
+               LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
+
+               cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
+               orro->oo_active--;
+
+               if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
+                                NRS_POL_NAME_MAX) == 0)
+                       CDEBUG(D_RPCTRACE,
+                              "NRS: starting to handle %s request for object "
+                              "with FID "DFID", from OST with index %u, with "
+                              "round "LPU64"\n", NRS_POL_NAME_ORR,
+                              PFID(&orro->oo_key.ok_fid),
+                              nrq->nr_u.orr.or_key.ok_idx,
+                              nrq->nr_u.orr.or_round);
+               else
+                       CDEBUG(D_RPCTRACE,
+                              "NRS: starting to handle %s request from OST "
+                              "with index %u, with round "LPU64"\n",
+                              NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
+                              nrq->nr_u.orr.or_round);
+
+               /** Peek at the next request to be served */
+               node = cfs_binheap_root(orrd->od_binheap);
+
+               /** No more requests */
+               if (unlikely(node == NULL)) {
+                       orrd->od_round++;
+               } else {
+                       struct ptlrpc_nrs_request *next;
+
+                       next = container_of(node, struct ptlrpc_nrs_request,
+                                           nr_node);
+
+                       if (orrd->od_round < next->nr_u.orr.or_round)
+                               orrd->od_round = next->nr_u.orr.or_round;
+               }
+       }
+
+       return nrq;
+}
+
+/**
+ * Sort-adds request \a nrq to an ORR/TRR \a policy instance's set of queued
+ * requests in the policy's binary heap.
+ *
+ * A scheduling round is a stream of requests that have been sorted in batches
+ * according to the backend-fs object (for ORR policy instances) or OST (for TRR
+ * policy instances) that they pertain to (as identified by its IDIF FID or OST
+ * index respectively); there can be only one batch for each object or OST in
+ * each round. The batches are of maximum size nrs_orr_data:od_quantum. When a
+ * new request arrives for scheduling for an object or OST that has exhausted
+ * its quantum in its current round, the request will be scheduled on the next
+ * scheduling round. Requests are allowed to be scheduled against a round until
+ * all requests for the round are serviced, so an object or OST might miss a
+ * round if requests are not scheduled for it for a long enough period of time.
+ * Objects or OSTs that miss a round will continue with having their next
+ * request scheduled, starting at the round that requests are being dispatched
+ * for, at the time of arrival of this request.
+ *
+ * Requests are tagged with the round number and a sequence number; the sequence
+ * number indicates the relative ordering amongst the batches of requests in a
+ * round, and is identical for all requests in a batch, as is the round number.
+ * The round and sequence numbers are used by orr_req_compare() in order to use
+ * nrs_orr_data::od_binheap in order to maintain an ordered set of rounds, with
+ * each round consisting of an ordered set of batches of requests, and each
+ * batch consisting of an ordered set of requests according to their logical
+ * file or physical disk offsets.
+ *
+ * \param[in] policy the policy
+ * \param[in] nrq    the request to add
+ *
+ * \retval 0   request successfully added
+ * \retval != 0 error
+ */
+static int nrs_orr_req_add(struct ptlrpc_nrs_policy *policy,
+                          struct ptlrpc_nrs_request *nrq)
+{
+       struct nrs_orr_data     *orrd;
+       struct nrs_orr_object   *orro;
+       int                      rc;
+
+       orro = container_of(nrs_request_resource(nrq),
+                           struct nrs_orr_object, oo_res);
+       orrd = container_of(nrs_request_resource(nrq)->res_parent,
+                           struct nrs_orr_data, od_res);
+
+       if (orro->oo_quantum == 0 || orro->oo_round < orrd->od_round ||
+           (orro->oo_active == 0 && orro->oo_quantum > 0)) {
+
+               /**
+                * If there are no pending requests for the object/OST, but some
+                * of its quantum still remains unused, which implies we did not
+                * get a chance to schedule up to its maximum allowed batch size
+                * of requests in the previous round this object/OST
+                * participated in, schedule this next request on a new round;
+                * this avoids fragmentation of request batches caused by
+                * intermittent inactivity on the object/OST, at the expense of
+                * potentially slightly increased service time for the request
+                * batch this request will be a part of.
+                */
+               if (orro->oo_active == 0 && orro->oo_quantum > 0)
+                       orro->oo_round++;
+
+               /** A new scheduling round has commenced */
+               if (orro->oo_round < orrd->od_round)
+                       orro->oo_round = orrd->od_round;
+
+               /** I was not the last object/OST that scheduled a request */
+               if (orro->oo_sequence < orrd->od_sequence)
+                       orro->oo_sequence = ++orrd->od_sequence;
+               /**
+                * Reset the quantum if we have reached the maximum quantum
+                * size for this batch, or even if we have not managed to
+                * complete a batch size up to its maximum allowed size.
+                * XXX: Accessed unlocked
+                */
+               orro->oo_quantum = orrd->od_quantum;
+       }
+
+       nrq->nr_u.crr.cr_round = orro->oo_round;
+       nrq->nr_u.crr.cr_sequence = orro->oo_sequence;
+
+       rc = cfs_binheap_insert(orrd->od_binheap, &nrq->nr_node);
+       if (rc == 0) {
+               orro->oo_active++;
+               if (--orro->oo_quantum == 0)
+                       orro->oo_round++;
+       }
+       return rc;
+}
+
+/**
+ * Removes request \a nrq from an ORR/TRR \a policy instance's set of queued
+ * requests.
+ *
+ * \param[in] policy the policy
+ * \param[in] nrq    the request to remove
+ */
+static void nrs_orr_req_del(struct ptlrpc_nrs_policy *policy,
+                           struct ptlrpc_nrs_request *nrq)
+{
+       struct nrs_orr_data     *orrd;
+       struct nrs_orr_object   *orro;
+       bool                     is_root;
+
+       orro = container_of(nrs_request_resource(nrq),
+                           struct nrs_orr_object, oo_res);
+       orrd = container_of(nrs_request_resource(nrq)->res_parent,
+                           struct nrs_orr_data, od_res);
+
+       LASSERT(nrq->nr_u.orr.or_round <= orro->oo_round);
+
+       is_root = &nrq->nr_node == cfs_binheap_root(orrd->od_binheap);
+
+       cfs_binheap_remove(orrd->od_binheap, &nrq->nr_node);
+       orro->oo_active--;
+
+       /**
+        * If we just deleted the node at the root of the binheap, we may have
+        * to adjust round numbers.
+        */
+       if (unlikely(is_root)) {
+               /** Peek at the next request to be served */
+               cfs_binheap_node_t *node = cfs_binheap_root(orrd->od_binheap);
+
+               /** No more requests */
+               if (unlikely(node == NULL)) {
+                       orrd->od_round++;
+               } else {
+                       nrq = container_of(node, struct ptlrpc_nrs_request,
+                                          nr_node);
+
+                       if (orrd->od_round < nrq->nr_u.orr.or_round)
+                               orrd->od_round = nrq->nr_u.orr.or_round;
+               }
+       }
+}
+
+/**
+ * Called right after the request \a nrq finishes being handled by ORR policy
+ * instance \a policy.
+ *
+ * \param[in] policy the policy that handled the request
+ * \param[in] nrq    the request that was handled
+ */
+static void nrs_orr_req_stop(struct ptlrpc_nrs_policy *policy,
+                            struct ptlrpc_nrs_request *nrq)
+{
+       /** NB: resource control, credits etc can be added here */
+       if (strncmp(policy->pol_desc->pd_name, NRS_POL_NAME_ORR,
+                   NRS_POL_NAME_MAX) == 0)
+               CDEBUG(D_RPCTRACE,
+                      "NRS: finished handling %s request for object with FID "
+                      DFID", from OST with index %u, with round "LPU64"\n",
+                      NRS_POL_NAME_ORR, PFID(&nrq->nr_u.orr.or_key.ok_fid),
+                      nrq->nr_u.orr.or_key.ok_idx, nrq->nr_u.orr.or_round);
+       else
+               CDEBUG(D_RPCTRACE,
+                      "NRS: finished handling %s request from OST with index %u,"
+                      " with round "LPU64"\n",
+                      NRS_POL_NAME_TRR, nrq->nr_u.orr.or_key.ok_idx,
+                      nrq->nr_u.orr.or_round);
+}
+
+/**
+ * lprocfs interface
+ */
+
+#ifdef LPROCFS
+
+/**
+ * This allows to bundle the policy name into the lprocfs_vars::data pointer
+ * so that lprocfs read/write functions can be used by both the ORR and TRR
+ * policies.
+ */
+struct nrs_lprocfs_orr_data {
+       struct ptlrpc_service   *svc;
+       char                    *name;
+} lprocfs_orr_data = {
+       .name = NRS_POL_NAME_ORR
+}, lprocfs_trr_data = {
+       .name = NRS_POL_NAME_TRR
+};
+
+/**
+ * Retrieves the value of the Round Robin quantum (i.e. the maximum batch size)
+ * for ORR/TRR policy instances on both the regular and high-priority NRS head
+ * of a service, as long as a policy instance is not in the
+ * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
+ * state are skipped later by nrs_orr_ctl().
+ *
+ * Quantum values are in # of RPCs, and the output is in YAML format.
+ *
+ * For example:
+ *
+ *     reg_quantum:256
+ *     hp_quantum:8
+ *
+ * XXX: the CRR-N version of this, ptlrpc_lprocfs_rd_nrs_crrn_quantum() is
+ * almost identical; it can be reworked and then reused for ORR/TRR.
+ */
+static int ptlrpc_lprocfs_rd_nrs_orr_quantum(char *page, char **start,
+                                            off_t off, int count, int *eof,
+                                            void *data)
+{
+       struct nrs_lprocfs_orr_data *orr_data = data;
+       struct ptlrpc_service       *svc = orr_data->svc;
+       __u16                        quantum;
+       int                          rc;
+       int                          rc2 = 0;
+
+       /**
+        * Perform two separate calls to this as only one of the NRS heads'
+        * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
+        * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
+        */
+       rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
+                                      orr_data->name,
+                                      NRS_CTL_ORR_RD_QUANTUM,
+                                      true, &quantum);
+       if (rc == 0) {
+               *eof = 1;
+               rc2 = snprintf(page, count, NRS_LPROCFS_QUANTUM_NAME_REG
+                              "%-5d\n", quantum);
+               /**
+                * Ignore -ENODEV as the regular NRS head's policy may be in the
+                * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
+                */
+       } else if (rc != -ENODEV) {
+               return rc;
+       }
+
+       /**
+        * We know the ost_io service which is the only one ORR/TRR policies are
+        * compatible with, do have an HP NRS head, but it may be best to guard
+        * against a possible change of this in the future.
+        */
+       if (!nrs_svc_has_hp(svc))
+               goto no_hp;
+
+       rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
+                                      orr_data->name, NRS_CTL_ORR_RD_QUANTUM,
+                                      true, &quantum);
+       if (rc == 0) {
+               *eof = 1;
+               rc2 += snprintf(page + rc2, count - rc2,
+                               NRS_LPROCFS_QUANTUM_NAME_HP"%-5d\n", quantum);
+               /**
+                * Ignore -ENODEV as the high priority NRS head's policy may be
+                * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
+                */
+       } else if (rc != -ENODEV) {
+               return rc;
+       }
+
+no_hp:
+
+       return rc2 ? : rc;
+}
+
+/**
+ * Sets the value of the Round Robin quantum (i.e. the maximum batch size)
+ * for ORR/TRR policy instances of a service. The user can set the quantum size
+ * for the regular and high priority NRS head separately by specifying each
+ * value, or both together in a single invocation.
+ *
+ * For example:
+ *
+ * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=req_quantum:64, to set the
+ * request quantum size of the ORR policy instance on the regular NRS head of
+ * the ost_io service to 64
+ *
+ * lctl set_param ost.OSS.ost_io.nrs_trr_quantum=hp_quantum:8 to set the request
+ * quantum size of the TRR policy instance on the high priority NRS head of the
+ * ost_io service to 8
+ *
+ * lctl set_param ost.OSS.ost_io.nrs_orr_quantum=32, to set both the request
+ * quantum size of the ORR policy instance on both the regular and the high
+ * priority NRS head of the ost_io service to 32
+ *
+ * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state
+ * are skipped later by nrs_orr_ctl().
+ *
+ * XXX: the CRR-N version of this, ptlrpc_lprocfs_wr_nrs_crrn_quantum() is
+ * almost identical; it can be reworked and then reused for ORR/TRR.
+ */
+static int ptlrpc_lprocfs_wr_nrs_orr_quantum(struct file *file,
+                                            const char *buffer,
+                                            unsigned long count, void *data)
+{
+       struct nrs_lprocfs_orr_data *orr_data = data;
+       struct ptlrpc_service       *svc = orr_data->svc;
+       enum ptlrpc_nrs_queue_type   queue = 0;
+       char                         kernbuf[LPROCFS_NRS_WR_QUANTUM_MAX_CMD];
+       char                        *val;
+       long                         quantum_reg;
+       long                         quantum_hp;
+       /** lprocfs_find_named_value() modifies its argument, so keep a copy */
+       unsigned long                count_copy;
+       int                          rc = 0;
+       int                          rc2 = 0;
+
+        if (count > (sizeof(kernbuf) - 1))
+                return -EINVAL;
+
+       if (cfs_copy_from_user(kernbuf, buffer, count))
+               return -EFAULT;
+
+        kernbuf[count] = '\0';
+
+       count_copy = count;
+
+       /**
+        * Check if the regular quantum value has been specified
+        */
+       val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_REG,
+                                      &count_copy);
+       if (val != kernbuf) {
+               quantum_reg = simple_strtol(val, NULL, 10);
+
+               queue |= PTLRPC_NRS_QUEUE_REG;
+       }
+
+       count_copy = count;
+
+       /**
+        * Check if the high priority quantum value has been specified
+        */
+       val = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_QUANTUM_NAME_HP,
+                                      &count_copy);
+       if (val != kernbuf) {
+               if (!nrs_svc_has_hp(svc))
+                       return -ENODEV;
+
+               quantum_hp = simple_strtol(val, NULL, 10);
+
+               queue |= PTLRPC_NRS_QUEUE_HP;
+       }
+
+       /**
+        * If none of the queues has been specified, look for a valid numerical
+        * value
+        */
+       if (queue == 0) {
+               if (!isdigit(kernbuf[0]))
+                       return -EINVAL;
+
+               quantum_reg = simple_strtol(kernbuf, NULL, 10);
+
+               queue = PTLRPC_NRS_QUEUE_REG;
+
+               if (nrs_svc_has_hp(svc)) {
+                       queue |= PTLRPC_NRS_QUEUE_HP;
+                       quantum_hp = quantum_reg;
+               }
+       }
+
+       if ((((queue & PTLRPC_NRS_QUEUE_REG) != 0) &&
+           ((quantum_reg > LPROCFS_NRS_QUANTUM_MAX || quantum_reg <= 0))) ||
+           (((queue & PTLRPC_NRS_QUEUE_HP) != 0) &&
+           ((quantum_hp > LPROCFS_NRS_QUANTUM_MAX || quantum_hp <= 0))))
+               return -EINVAL;
+
+       /**
+        * We change the values on regular and HP NRS heads separately, so that
+        * we do not exit early from ptlrpc_nrs_policy_control() with an error
+        * returned by nrs_policy_ctl_locked(), in cases where the user has not
+        * started the policy on either the regular or HP NRS head; i.e. we are
+        * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
+        * only if the operation fails with -ENODEV on all heads that have been
+        * specified by the command; if at least one operation succeeds,
+        * success is returned.
+        */
+       if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
+               rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
+                                              orr_data->name,
+                                              NRS_CTL_ORR_WR_QUANTUM, false,
+                                              &quantum_reg);
+               if ((rc < 0 && rc != -ENODEV) ||
+                   (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
+                       return rc;
+       }
+
+       if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
+               rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
+                                               orr_data->name,
+                                               NRS_CTL_ORR_WR_QUANTUM, false,
+                                               &quantum_hp);
+               if ((rc2 < 0 && rc2 != -ENODEV) ||
+                   (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
+                       return rc2;
+       }
+
+       return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
+}
+
+#define LPROCFS_NRS_OFF_NAME_REG               "reg_offset_type:"
+#define LPROCFS_NRS_OFF_NAME_HP                        "hp_offset_type:"
+
+#define LPROCFS_NRS_OFF_NAME_PHYSICAL          "physical"
+#define LPROCFS_NRS_OFF_NAME_LOGICAL           "logical"
+
+/**
+ * Retrieves the offset type used by ORR/TRR policy instances on both the
+ * regular and high-priority NRS head of a service, as long as a policy
+ * instance is not in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state;
+ * policy instances in this state are skipped later by nrs_orr_ctl().
+ *
+ * Offset type information is a (physical|logical) string, and output is
+ * in YAML format.
+ *
+ * For example:
+ *
+ *     reg_offset_type:physical
+ *     hp_offset_type:logical
+ */
+static int ptlrpc_lprocfs_rd_nrs_orr_offset_type(char *page, char **start,
+                                                off_t off, int count, int *eof,
+                                                void *data)
+{
+       struct nrs_lprocfs_orr_data *orr_data = data;
+       struct ptlrpc_service       *svc = orr_data->svc;
+       bool                         physical;
+       int                          rc;
+       int                          rc2 = 0;
+
+       /**
+        * Perform two separate calls to this as only one of the NRS heads'
+        * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
+        * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
+        */
+       rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
+                                      orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
+                                      true, &physical);
+       if (rc == 0) {
+               *eof = 1;
+               rc2 = snprintf(page, count,
+                              LPROCFS_NRS_OFF_NAME_REG"%s\n",
+                              physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
+                              LPROCFS_NRS_OFF_NAME_LOGICAL);
+               /**
+                * Ignore -ENODEV as the regular NRS head's policy may be in the
+                * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
+                */
+       } else if (rc != -ENODEV) {
+               return rc;
+       }
+
+       /**
+        * We know the ost_io service which is the only one ORR/TRR policies are
+        * compatible with, do have an HP NRS head, but it may be best to guard
+        * against a possible change of this in the future.
+        */
+       if (!nrs_svc_has_hp(svc))
+               goto no_hp;
+
+       rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
+                                      orr_data->name, NRS_CTL_ORR_RD_OFF_TYPE,
+                                      true, &physical);
+       if (rc == 0) {
+               *eof = 1;
+               rc2 += snprintf(page + rc2, count - rc2,
+                               LPROCFS_NRS_OFF_NAME_HP"%s\n",
+                               physical ? LPROCFS_NRS_OFF_NAME_PHYSICAL :
+                               LPROCFS_NRS_OFF_NAME_LOGICAL);
+               /**
+                * Ignore -ENODEV as the high priority NRS head's policy may be
+                * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
+                */
+       } else if (rc != -ENODEV) {
+               return rc;
+       }
+
+no_hp:
+
+       return rc2 ? : rc;
+}
+
+/**
+ * Max valid command string is the size of the labels, plus "physical" twice.
+ * plus a separating ' '
+ */
+#define LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD                                               \
+       sizeof(LPROCFS_NRS_OFF_NAME_REG LPROCFS_NRS_OFF_NAME_PHYSICAL " "      \
+              LPROCFS_NRS_OFF_NAME_HP LPROCFS_NRS_OFF_NAME_PHYSICAL)
+
+/**
+ * Sets the type of offsets used to order RPCs in ORR/TRR policy instances. The
+ * user can set offset type for the regular or high priority NRS head
+ * separately by specifying each value, or both together in a single invocation.
+ *
+ * For example:
+ *
+ * lctl set_param ost.OSS.ost_io.nrs_orr_offset_type=
+ * reg_offset_type:physical, to enable the ORR policy instance on the regular
+ * NRS head of the ost_io service to use physical disk offset ordering.
+ *
+ * lctl set_param ost.OSS.ost_io.nrs_trr_offset_type=logical, to enable the TRR
+ * policy instances on both the regular ang high priority NRS heads of the
+ * ost_io service to use logical file offset ordering.
+ *
+ * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
+ * are skipped later by nrs_orr_ctl().
+ */
+static int ptlrpc_lprocfs_wr_nrs_orr_offset_type(struct file *file,
+                                                const char *buffer,
+                                                unsigned long count,
+                                                void *data)
+{
+       struct nrs_lprocfs_orr_data *orr_data = data;
+       struct ptlrpc_service       *svc = orr_data->svc;
+       enum ptlrpc_nrs_queue_type   queue = 0;
+       char                         kernbuf[LPROCFS_NRS_WR_OFF_TYPE_MAX_CMD];
+       char                        *val_reg;
+       char                        *val_hp;
+       bool                         physical_reg;
+       bool                         physical_hp;
+       unsigned long                count_copy;
+       int                          rc = 0;
+       int                          rc2 = 0;
+
+        if (count > (sizeof(kernbuf) - 1))
+                return -EINVAL;
+
+       if (cfs_copy_from_user(kernbuf, buffer, count))
+               return -EFAULT;
+
+        kernbuf[count] = '\0';
+
+       count_copy = count;
+
+       /**
+        * Check if the regular offset type has been specified
+        */
+       val_reg = lprocfs_find_named_value(kernbuf,
+                                          LPROCFS_NRS_OFF_NAME_REG,
+                                          &count_copy);
+       if (val_reg != kernbuf)
+               queue |= PTLRPC_NRS_QUEUE_REG;
+
+       count_copy = count;
+
+       /**
+        * Check if the high priority offset type has been specified
+        */
+       val_hp = lprocfs_find_named_value(kernbuf, LPROCFS_NRS_OFF_NAME_HP,
+                                         &count_copy);
+       if (val_hp != kernbuf) {
+               if (!nrs_svc_has_hp(svc))
+                       return -ENODEV;
+
+               queue |= PTLRPC_NRS_QUEUE_HP;
+       }
+
+       /**
+        * If none of the queues has been specified, there may be a valid
+        * command string at the start of the buffer.
+        */
+       if (queue == 0) {
+               queue = PTLRPC_NRS_QUEUE_REG;
+
+               if (nrs_svc_has_hp(svc))
+                       queue |= PTLRPC_NRS_QUEUE_HP;
+       }
+
+       if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
+               if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_PHYSICAL,
+                           sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
+                       physical_reg = true;
+               else if (strncmp(val_reg, LPROCFS_NRS_OFF_NAME_LOGICAL,
+                        sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
+                       physical_reg = false;
+               else
+                       return -EINVAL;
+       }
+
+       if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
+               if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_PHYSICAL,
+                           sizeof(LPROCFS_NRS_OFF_NAME_PHYSICAL) - 1) == 0)
+                       physical_hp = true;
+               else if (strncmp(val_hp, LPROCFS_NRS_OFF_NAME_LOGICAL,
+                                sizeof(LPROCFS_NRS_OFF_NAME_LOGICAL) - 1) == 0)
+                       physical_hp = false;
+               else
+                       return -EINVAL;
+       }
+
+       /**
+        * We change the values on regular and HP NRS heads separately, so that
+        * we do not exit early from ptlrpc_nrs_policy_control() with an error
+        * returned by nrs_policy_ctl_locked(), in cases where the user has not
+        * started the policy on either the regular or HP NRS head; i.e. we are
+        * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
+        * only if the operation fails with -ENODEV on all heads that have been
+        * specified by the command; if at least one operation succeeds,
+        * success is returned.
+        */
+       if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
+               rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
+                                              orr_data->name,
+                                              NRS_CTL_ORR_WR_OFF_TYPE, false,
+                                              &physical_reg);
+               if ((rc < 0 && rc != -ENODEV) ||
+                   (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
+                       return rc;
+       }
+
+       if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
+               rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
+                                               orr_data->name,
+                                               NRS_CTL_ORR_WR_OFF_TYPE, false,
+                                               &physical_hp);
+               if ((rc2 < 0 && rc2 != -ENODEV) ||
+                   (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
+                       return rc2;
+       }
+
+       return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
+}
+
+#define NRS_LPROCFS_REQ_SUPP_NAME_REG          "reg_supported:"
+#define NRS_LPROCFS_REQ_SUPP_NAME_HP           "hp_supported:"
+
+#define LPROCFS_NRS_SUPP_NAME_READS            "reads"
+#define LPROCFS_NRS_SUPP_NAME_WRITES           "writes"
+#define LPROCFS_NRS_SUPP_NAME_READWRITES       "reads_and_writes"
+
+/**
+ * Translates enum nrs_orr_supp values to a corresponding string.
+ */
+static const char *nrs_orr_supp2str(enum nrs_orr_supp supp)
+{
+       switch(supp) {
+       default:
+               LBUG();
+       case NOS_OST_READ:
+               return LPROCFS_NRS_SUPP_NAME_READS;
+       case NOS_OST_WRITE:
+               return LPROCFS_NRS_SUPP_NAME_WRITES;
+       case NOS_OST_RW:
+               return LPROCFS_NRS_SUPP_NAME_READWRITES;
+       }
+}
+
+/**
+ * Translates strings to the corresponding enum nrs_orr_supp value
+ */
+static enum nrs_orr_supp nrs_orr_str2supp(const char *val)
+{
+       if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READWRITES,
+                   sizeof(LPROCFS_NRS_SUPP_NAME_READWRITES) - 1) == 0)
+               return NOS_OST_RW;
+       else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_READS,
+                        sizeof(LPROCFS_NRS_SUPP_NAME_READS) - 1) == 0)
+               return NOS_OST_READ;
+       else if (strncmp(val, LPROCFS_NRS_SUPP_NAME_WRITES,
+                        sizeof(LPROCFS_NRS_SUPP_NAME_WRITES) - 1) == 0)
+               return NOS_OST_WRITE;
+       else
+               return -EINVAL;
+}
+
+/**
+ * Retrieves the type of RPCs handled at the point of invocation by ORR/TRR
+ * policy instances on both the regular and high-priority NRS head of a service,
+ * as long as a policy instance is not in the
+ * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state; policy instances in this
+ * state are skipped later by nrs_orr_ctl().
+ *
+ * Supported RPC type information is a (reads|writes|reads_and_writes) string,
+ * and output is in YAML format.
+ *
+ * For example:
+ *
+ *     reg_supported:reads
+ *     hp_supported:reads_and_writes
+ */
+static int ptlrpc_lprocfs_rd_nrs_orr_supported(char *page, char **start,
+                                              off_t off, int count, int *eof,
+                                              void *data)
+{
+       struct nrs_lprocfs_orr_data *orr_data = data;
+       struct ptlrpc_service       *svc = orr_data->svc;
+       enum nrs_orr_supp            supported;
+       int                          rc;
+       int                          rc2 = 0;
+
+       /**
+        * Perform two separate calls to this as only one of the NRS heads'
+        * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED
+        * or ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
+        */
+       rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
+                                      orr_data->name,
+                                      NRS_CTL_ORR_RD_SUPP_REQ, true,
+                                      &supported);
+
+       if (rc == 0) {
+               *eof = 1;
+               rc2 = snprintf(page, count,
+                              NRS_LPROCFS_REQ_SUPP_NAME_REG"%s\n",
+                              nrs_orr_supp2str(supported));
+               /**
+                * Ignore -ENODEV as the regular NRS head's policy may be in the
+                * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
+                */
+       } else if (rc != -ENODEV) {
+               return rc;
+       }
+
+       /**
+        * We know the ost_io service which is the only one ORR/TRR policies are
+        * compatible with, do have an HP NRS head, but it may be best to guard
+        * against a possible change of this in the future.
+        */
+       if (!nrs_svc_has_hp(svc))
+               goto no_hp;
+
+       rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
+                                      orr_data->name,
+                                      NRS_CTL_ORR_RD_SUPP_REQ, true,
+                                      &supported);
+       if (rc == 0) {
+               *eof = 1;
+               rc2 += snprintf(page + rc2, count - rc2,
+                              NRS_LPROCFS_REQ_SUPP_NAME_HP"%s\n",
+                              nrs_orr_supp2str(supported));
+               /**
+                * Ignore -ENODEV as the high priority NRS head's policy may be
+                * in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
+                */
+       } else if (rc != -ENODEV) {
+               return rc;
+       }
+
+no_hp:
+
+       return rc2 ? : rc;
+}
+
+/**
+ * Max valid command string is the size of the labels, plus "reads_and_writes"
+ * twice, plus a separating ' '
+ */
+#define LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD                                               \
+       sizeof(NRS_LPROCFS_REQ_SUPP_NAME_REG LPROCFS_NRS_SUPP_NAME_READWRITES  \
+              NRS_LPROCFS_REQ_SUPP_NAME_HP LPROCFS_NRS_SUPP_NAME_READWRITES   \
+              " ")
+
+/**
+ * Sets the type of RPCs handled by ORR/TRR policy instances. The user can
+ * modify this setting for the regular or high priority NRS heads separately, or
+ * both together in a single invocation.
+ *
+ * For example:
+ *
+ * lctl set_param ost.OSS.ost_io.nrs_orr_supported=
+ * "reg_supported:reads", to enable the ORR policy instance on the regular NRS
+ * head of the ost_io service to handle OST_READ RPCs.
+ *
+ * lctl set_param ost.OSS.ost_io.nrs_trr_supported=reads_and_writes, to enable
+ * the TRR policy instances on both the regular ang high priority NRS heads of
+ * the ost_io service to use handle OST_READ and OST_WRITE RPCs.
+ *
+ * policy instances in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state are
+ * are skipped later by nrs_orr_ctl().
+ */
+static int ptlrpc_lprocfs_wr_nrs_orr_supported(struct file *file,
+                                              const char *buffer,
+                                              unsigned long count, void *data)
+{
+       struct nrs_lprocfs_orr_data *orr_data = data;
+       struct ptlrpc_service       *svc = orr_data->svc;
+       enum ptlrpc_nrs_queue_type   queue = 0;
+       char                         kernbuf[LPROCFS_NRS_WR_REQ_SUPP_MAX_CMD];
+       char                        *val_reg;
+       char                        *val_hp;
+       enum nrs_orr_supp            supp_reg;
+       enum nrs_orr_supp            supp_hp;
+       unsigned long                count_copy;
+       int                          rc = 0;
+       int                          rc2 = 0;
+
+        if (count > (sizeof(kernbuf) - 1))
+                return -EINVAL;
+
+       if (cfs_copy_from_user(kernbuf, buffer, count))
+               return -EFAULT;
+
+        kernbuf[count] = '\0';
+
+       count_copy = count;
+
+       /**
+        * Check if the regular supported requests setting has been specified
+        */
+       val_reg = lprocfs_find_named_value(kernbuf,
+                                          NRS_LPROCFS_REQ_SUPP_NAME_REG,
+                                          &count_copy);
+       if (val_reg != kernbuf)
+               queue |= PTLRPC_NRS_QUEUE_REG;
+
+       count_copy = count;
+
+       /**
+        * Check if the high priority supported requests setting has been
+        * specified
+        */
+       val_hp = lprocfs_find_named_value(kernbuf, NRS_LPROCFS_REQ_SUPP_NAME_HP,
+                                         &count_copy);
+       if (val_hp != kernbuf) {
+               if (!nrs_svc_has_hp(svc))
+                       return -ENODEV;
+
+               queue |= PTLRPC_NRS_QUEUE_HP;
+       }
+
+       /**
+        * If none of the queues has been specified, there may be a valid
+        * command string at the start of the buffer.
+        */
+       if (queue == 0) {
+               queue = PTLRPC_NRS_QUEUE_REG;
+
+               if (nrs_svc_has_hp(svc))
+                       queue |= PTLRPC_NRS_QUEUE_HP;
+       }
+
+       if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
+               supp_reg = nrs_orr_str2supp(val_reg);
+               if (supp_reg == -EINVAL)
+                       return -EINVAL;
+       }
+
+       if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
+               supp_hp = nrs_orr_str2supp(val_hp);
+               if (supp_hp == -EINVAL)
+                       return -EINVAL;
+       }
+
+       /**
+        * We change the values on regular and HP NRS heads separately, so that
+        * we do not exit early from ptlrpc_nrs_policy_control() with an error
+        * returned by nrs_policy_ctl_locked(), in cases where the user has not
+        * started the policy on either the regular or HP NRS head; i.e. we are
+        * ignoring -ENODEV within nrs_policy_ctl_locked(). -ENODEV is returned
+        * only if the operation fails with -ENODEV on all heads that have been
+        * specified by the command; if at least one operation succeeds,
+        * success is returned.
+        */
+       if ((queue & PTLRPC_NRS_QUEUE_REG) != 0) {
+               rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
+                                              orr_data->name,
+                                              NRS_CTL_ORR_WR_SUPP_REQ, false,
+                                              &supp_reg);
+               if ((rc < 0 && rc != -ENODEV) ||
+                   (rc == -ENODEV && queue == PTLRPC_NRS_QUEUE_REG))
+                       return rc;
+       }
+
+       if ((queue & PTLRPC_NRS_QUEUE_HP) != 0) {
+               rc2 = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
+                                               orr_data->name,
+                                               NRS_CTL_ORR_WR_SUPP_REQ, false,
+                                               &supp_hp);
+               if ((rc2 < 0 && rc2 != -ENODEV) ||
+                   (rc2 == -ENODEV && queue == PTLRPC_NRS_QUEUE_HP))
+                       return rc2;
+       }
+
+       return rc == -ENODEV && rc2 == -ENODEV ? -ENODEV : count;
+}
+
+int nrs_orr_lprocfs_init(struct ptlrpc_service *svc)
+{
+       int     rc;
+       int     i;
+
+       struct lprocfs_vars nrs_orr_lprocfs_vars[] = {
+               { .name         = "nrs_orr_quantum",
+                 .read_fptr    = ptlrpc_lprocfs_rd_nrs_orr_quantum,
+                 .write_fptr   = ptlrpc_lprocfs_wr_nrs_orr_quantum },
+               { .name         = "nrs_orr_offset_type",
+                 .read_fptr    = ptlrpc_lprocfs_rd_nrs_orr_offset_type,
+                 .write_fptr   = ptlrpc_lprocfs_wr_nrs_orr_offset_type },
+               { .name         = "nrs_orr_supported",
+                 .read_fptr    = ptlrpc_lprocfs_rd_nrs_orr_supported,
+                 .write_fptr   = ptlrpc_lprocfs_wr_nrs_orr_supported },
+               { NULL }
+       };
+
+       if (svc->srv_procroot == NULL)
+               return 0;
+
+       lprocfs_orr_data.svc = svc;
+
+       for (i = 0; i < ARRAY_SIZE(nrs_orr_lprocfs_vars); i++)
+               nrs_orr_lprocfs_vars[i].data = &lprocfs_orr_data;
+
+       rc = lprocfs_add_vars(svc->srv_procroot, nrs_orr_lprocfs_vars, NULL);
+
+       return rc;
+}
+
+void nrs_orr_lprocfs_fini(struct ptlrpc_service *svc)
+{
+       if (svc->srv_procroot == NULL)
+               return;
+
+       lprocfs_remove_proc_entry("nrs_orr_quantum", svc->srv_procroot);
+       lprocfs_remove_proc_entry("nrs_orr_offset_type", svc->srv_procroot);
+       lprocfs_remove_proc_entry("nrs_orr_supported", svc->srv_procroot);
+}
+
+#endif /* LPROCFS */
+
+static const struct ptlrpc_nrs_pol_ops nrs_orr_ops = {
+       .op_policy_init         = nrs_orr_init,
+       .op_policy_start        = nrs_orr_start,
+       .op_policy_stop         = nrs_orr_stop,
+       .op_policy_ctl          = nrs_orr_ctl,
+       .op_res_get             = nrs_orr_res_get,
+       .op_res_put             = nrs_orr_res_put,
+       .op_req_get             = nrs_orr_req_get,
+       .op_req_enqueue         = nrs_orr_req_add,
+       .op_req_dequeue         = nrs_orr_req_del,
+       .op_req_stop            = nrs_orr_req_stop,
+#ifdef LPROCFS
+       .op_lprocfs_init        = nrs_orr_lprocfs_init,
+       .op_lprocfs_fini        = nrs_orr_lprocfs_fini,
+#endif
+};
+
+struct ptlrpc_nrs_pol_conf nrs_conf_orr = {
+       .nc_name                = NRS_POL_NAME_ORR,
+       .nc_ops                 = &nrs_orr_ops,
+       .nc_compat              = nrs_policy_compat_one,
+       .nc_compat_svc_name     = "ost_io",
+};
+
+/**
+ * TRR, Target-based Round Robin policy
+ *
+ * TRR reuses much of the functions and data structures of ORR
+ */
+
+#ifdef LPROCFS
+
+int nrs_trr_lprocfs_init(struct ptlrpc_service *svc)
+{
+       int     rc;
+       int     i;
+
+       struct lprocfs_vars nrs_trr_lprocfs_vars[] = {
+               { .name         = "nrs_trr_quantum",
+                 .read_fptr    = ptlrpc_lprocfs_rd_nrs_orr_quantum,
+                 .write_fptr   = ptlrpc_lprocfs_wr_nrs_orr_quantum },
+               { .name         = "nrs_trr_offset_type",
+                 .read_fptr    = ptlrpc_lprocfs_rd_nrs_orr_offset_type,
+                 .write_fptr   = ptlrpc_lprocfs_wr_nrs_orr_offset_type },
+               { .name         = "nrs_trr_supported",
+                 .read_fptr    = ptlrpc_lprocfs_rd_nrs_orr_supported,
+                 .write_fptr   = ptlrpc_lprocfs_wr_nrs_orr_supported },
+               { NULL }
+       };
+
+       if (svc->srv_procroot == NULL)
+               return 0;
+
+       lprocfs_trr_data.svc = svc;
+
+       for (i = 0; i < ARRAY_SIZE(nrs_trr_lprocfs_vars); i++)
+               nrs_trr_lprocfs_vars[i].data = &lprocfs_trr_data;
+
+       rc = lprocfs_add_vars(svc->srv_procroot, nrs_trr_lprocfs_vars, NULL);
+
+       return rc;
+}
+
+void nrs_trr_lprocfs_fini(struct ptlrpc_service *svc)
+{
+       if (svc->srv_procroot == NULL)
+               return;
+
+       lprocfs_remove_proc_entry("nrs_trr_quantum", svc->srv_procroot);
+       lprocfs_remove_proc_entry("nrs_trr_offset_type", svc->srv_procroot);
+       lprocfs_remove_proc_entry("nrs_trr_supported", svc->srv_procroot);
+}
+
+#endif /* LPROCFS */
+
+/**
+ * Reuse much of the ORR functionality for TRR.
+ */
+static const struct ptlrpc_nrs_pol_ops nrs_trr_ops = {
+       .op_policy_init         = nrs_orr_init,
+       .op_policy_start        = nrs_orr_start,
+       .op_policy_stop         = nrs_orr_stop,
+       .op_policy_ctl          = nrs_orr_ctl,
+       .op_res_get             = nrs_orr_res_get,
+       .op_res_put             = nrs_orr_res_put,
+       .op_req_get             = nrs_orr_req_get,
+       .op_req_enqueue         = nrs_orr_req_add,
+       .op_req_dequeue         = nrs_orr_req_del,
+       .op_req_stop            = nrs_orr_req_stop,
+#ifdef LPROCFS
+       .op_lprocfs_init        = nrs_trr_lprocfs_init,
+       .op_lprocfs_fini        = nrs_trr_lprocfs_fini,
+#endif
+};
+
+struct ptlrpc_nrs_pol_conf nrs_conf_trr = {
+       .nc_name                = NRS_POL_NAME_TRR,
+       .nc_ops                 = &nrs_trr_ops,
+       .nc_compat              = nrs_policy_compat_one,
+       .nc_compat_svc_name     = "ost_io",
+};
+
+/** @} ORR/TRR policy */
+
+/** @} nrs */
+
+#endif /* HAVE_SERVER_SUPPORT */
index 45d9fed..153a3ba 100644 (file)
@@ -213,7 +213,7 @@ struct ptlrpc_nrs_policy *nrs_request_policy(struct ptlrpc_nrs_request *nrq)
 #define NRS_LPROCFS_QUANTUM_NAME_HP    "hp_quantum:"
 
 /**
 #define NRS_LPROCFS_QUANTUM_NAME_HP    "hp_quantum:"
 
 /**
- * the maximum size of nrs_crrn_client::cc_quantum
+ * the maximum size of nrs_crrn_client::cc_quantum and nrs_orr_data::od_quantum.
  */
 #define LPROCFS_NRS_QUANTUM_MAX                65535
 
  */
 #define LPROCFS_NRS_QUANTUM_MAX                65535
 
index c141f49..c56780c 100644 (file)
@@ -1778,7 +1778,8 @@ got_request:
  * ptlrpc_server_handle_req later on.
  */
 static int
  * ptlrpc_server_handle_req later on.
  */
 static int
-ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt)
+ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
+                           struct ptlrpc_thread *thread)
 {
        struct ptlrpc_service   *svc = svcpt->scp_service;
        struct ptlrpc_request   *req;
 {
        struct ptlrpc_service   *svc = svcpt->scp_service;
        struct ptlrpc_request   *req;
@@ -1898,6 +1899,8 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt)
                 goto err_req;
         }
 
                 goto err_req;
         }
 
+        req->rq_svc_thread = thread;
+
         ptlrpc_at_add_timed(req);
 
         /* Move it over to the request processing queue */
         ptlrpc_at_add_timed(req);
 
         /* Move it over to the request processing queue */
@@ -2239,7 +2242,7 @@ liblustre_check_services (void *arg)
                svcpt->scp_nthrs_running++;
 
                do {
                svcpt->scp_nthrs_running++;
 
                do {
-                       rc = ptlrpc_server_handle_req_in(svcpt);
+                       rc = ptlrpc_server_handle_req_in(svcpt, NULL);
                        rc |= ptlrpc_server_handle_reply(svcpt);
                        rc |= ptlrpc_at_check_timed(svcpt);
                        rc |= ptlrpc_server_handle_request(svcpt, NULL);
                        rc |= ptlrpc_server_handle_reply(svcpt);
                        rc |= ptlrpc_at_check_timed(svcpt);
                        rc |= ptlrpc_server_handle_request(svcpt, NULL);
@@ -2501,7 +2504,10 @@ static int ptlrpc_main(void *arg)
 
                /* Process all incoming reqs before handling any */
                if (ptlrpc_server_request_incoming(svcpt)) {
 
                /* Process all incoming reqs before handling any */
                if (ptlrpc_server_request_incoming(svcpt)) {
-                       ptlrpc_server_handle_req_in(svcpt);
+                       lu_context_enter(&env->le_ctx);
+                       ptlrpc_server_handle_req_in(svcpt, thread);
+                       lu_context_exit(&env->le_ctx);
+
                        /* but limit ourselves in case of flood */
                        if (counter++ < 100)
                                continue;
                        /* but limit ourselves in case of flood */
                        if (counter++ < 100)
                                continue;