*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2014, Intel Corporation.
+ * Copyright (c) 2010, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
*
* @{
*/
-
+#include <linux/kobject.h>
#include <linux/uio.h>
#include <libcfs/libcfs.h>
-#include <lnet/nidstr.h>
#include <lnet/api.h>
-#include <lustre/lustre_idl.h>
+#include <lnet/lib-types.h>
+#include <uapi/linux/lnet/nidstr.h>
+#include <uapi/linux/lustre/lustre_idl.h>
#include <lustre_ha.h>
#include <lustre_sec.h>
#include <lustre_import.h>
#include <lu_object.h>
#include <lustre_req_layout.h>
#include <obd_support.h>
-#include <lustre_ver.h>
+#include <uapi/linux/lustre/lustre_ver.h>
/* MD flags we _always_ use */
#define PTLRPC_MD_OPTIONS 0
/**
- * Max # of bulk operations in one request.
+ * log2 max # of bulk operations in one request: 2=4MB/RPC, 5=32MB/RPC, ...
* In order for the client and server to properly negotiate the maximum
* possible transfer size, PTLRPC_BULK_OPS_COUNT must be a power-of-two
* value. The client is free to limit the actual RPC size for any bulk
- * transfer via cl_max_pages_per_rpc to some non-power-of-two value. */
-#define PTLRPC_BULK_OPS_BITS 2
+ * transfer via cl_max_pages_per_rpc to some non-power-of-two value.
+ * NOTE: This is limited to 16 (=64GB RPCs) by IOOBJ_MAX_BRW_BITS. */
+#define PTLRPC_BULK_OPS_BITS 6
+#if PTLRPC_BULK_OPS_BITS > 16
+#error "More than 65536 BRW RPCs not allowed by IOOBJ_MAX_BRW_BITS."
+#endif
#define PTLRPC_BULK_OPS_COUNT (1U << PTLRPC_BULK_OPS_BITS)
/**
* PTLRPC_BULK_OPS_MASK is for the convenience of the client only, and
* currently supported maximum between peers at connect via ocd_brw_size.
*/
#define PTLRPC_MAX_BRW_BITS (LNET_MTU_BITS + PTLRPC_BULK_OPS_BITS)
-#define PTLRPC_MAX_BRW_SIZE (1 << PTLRPC_MAX_BRW_BITS)
-#define PTLRPC_MAX_BRW_PAGES (PTLRPC_MAX_BRW_SIZE >> PAGE_CACHE_SHIFT)
+#define PTLRPC_MAX_BRW_SIZE (1U << PTLRPC_MAX_BRW_BITS)
+#define PTLRPC_MAX_BRW_PAGES (PTLRPC_MAX_BRW_SIZE >> PAGE_SHIFT)
-#define ONE_MB_BRW_SIZE (1 << LNET_MTU_BITS)
-#define MD_MAX_BRW_SIZE (1 << LNET_MTU_BITS)
-#define MD_MAX_BRW_PAGES (MD_MAX_BRW_SIZE >> PAGE_CACHE_SHIFT)
+#define ONE_MB_BRW_SIZE (1U << LNET_MTU_BITS)
+#define MD_MAX_BRW_SIZE (1U << LNET_MTU_BITS)
+#define MD_MAX_BRW_PAGES (MD_MAX_BRW_SIZE >> PAGE_SHIFT)
#define DT_MAX_BRW_SIZE PTLRPC_MAX_BRW_SIZE
-#define DT_MAX_BRW_PAGES (DT_MAX_BRW_SIZE >> PAGE_CACHE_SHIFT)
-#define OFD_MAX_BRW_SIZE (1 << LNET_MTU_BITS)
+#define DT_DEF_BRW_SIZE (4 * ONE_MB_BRW_SIZE)
+#define DT_MAX_BRW_PAGES (DT_MAX_BRW_SIZE >> PAGE_SHIFT)
+#define OFD_MAX_BRW_SIZE (1U << LNET_MTU_BITS)
/* When PAGE_SIZE is a constant, we can check our arithmetic here with cpp! */
#if ((PTLRPC_MAX_BRW_PAGES & (PTLRPC_MAX_BRW_PAGES - 1)) != 0)
# error "PTLRPC_MAX_BRW_PAGES isn't a power of two"
#endif
-#if (PTLRPC_MAX_BRW_SIZE != (PTLRPC_MAX_BRW_PAGES * PAGE_CACHE_SIZE))
-# error "PTLRPC_MAX_BRW_SIZE isn't PTLRPC_MAX_BRW_PAGES * PAGE_CACHE_SIZE"
+#if (PTLRPC_MAX_BRW_SIZE != (PTLRPC_MAX_BRW_PAGES * PAGE_SIZE))
+# error "PTLRPC_MAX_BRW_SIZE isn't PTLRPC_MAX_BRW_PAGES * PAGE_SIZE"
#endif
#if (PTLRPC_MAX_BRW_SIZE > LNET_MTU * PTLRPC_BULK_OPS_COUNT)
# error "PTLRPC_MAX_BRW_SIZE too big"
*/
/* depress threads factor for VM with small memory size */
#define OSS_THR_FACTOR min_t(int, 8, \
- NUM_CACHEPAGES >> (28 - PAGE_CACHE_SHIFT))
+ NUM_CACHEPAGES >> (28 - PAGE_SHIFT))
#define OSS_NTHRS_INIT (PTLRPC_NTHRS_INIT + 1)
#define OSS_NTHRS_BASE 64
-#define OSS_NTHRS_MAX 512
/* threads for handling "create" request */
#define OSS_CR_THR_FACTOR 1
* - single object with 16 pages is 512 bytes
* - OST_IO_MAXREQSIZE must be at least 1 page of cookies plus some spillover
* - Must be a multiple of 1024
- * - actual size is about 18K
*/
-#define _OST_MAXREQSIZE_SUM (sizeof(struct lustre_msg) + \
- sizeof(struct ptlrpc_body) + \
- sizeof(struct obdo) + \
- sizeof(struct obd_ioobj) + \
- sizeof(struct niobuf_remote) * DT_MAX_BRW_PAGES)
+#define _OST_MAXREQSIZE_BASE ((unsigned long)(sizeof(struct lustre_msg) + \
+ sizeof(struct ptlrpc_body) + \
+ sizeof(struct obdo) + \
+ sizeof(struct obd_ioobj) + \
+ sizeof(struct niobuf_remote)))
+#define _OST_MAXREQSIZE_SUM ((unsigned long)(_OST_MAXREQSIZE_BASE + \
+ sizeof(struct niobuf_remote) * \
+ (DT_MAX_BRW_PAGES - 1)))
/**
* FIEMAP request can be 4K+ for now
*/
-#define OST_MAXREQSIZE (16 * 1024)
-#define OST_IO_MAXREQSIZE max_t(int, OST_MAXREQSIZE, \
- (((_OST_MAXREQSIZE_SUM - 1) | (1024 - 1)) + 1))
+#define OST_MAXREQSIZE (16UL * 1024UL)
+#define OST_IO_MAXREQSIZE max(OST_MAXREQSIZE, \
+ ((_OST_MAXREQSIZE_SUM - 1) | \
+ (1024UL - 1)) + 1)
+/* Safe estimate of free space in standard RPC, provides upper limit for # of
+ * bytes of i/o to pack in RPC (skipping bulk transfer). */
+#define OST_SHORT_IO_SPACE (OST_IO_MAXREQSIZE - _OST_MAXREQSIZE_BASE)
+
+/* Actual size used for short i/o buffer. Calculation means this:
+ * At least one page (for large PAGE_SIZE), or 16 KiB, but not more
+ * than the available space aligned to a page boundary. */
+#define OBD_MAX_SHORT_IO_BYTES min(max(PAGE_SIZE, 16UL * 1024UL), \
+ OST_SHORT_IO_SPACE & PAGE_MASK)
#define OST_MAXREPSIZE (9 * 1024)
#define OST_IO_MAXREPSIZE OST_MAXREPSIZE
#define OST_NBUFS 64
/** OST_BUFSIZE = max_reqsize + max sptlrpc payload size */
-#define OST_BUFSIZE max_t(int, OST_MAXREQSIZE + 1024, 16 * 1024)
+#define OST_BUFSIZE max_t(int, OST_MAXREQSIZE + 1024, 32 * 1024)
/**
* OST_IO_MAXREQSIZE is 18K, giving extra 46K can increase buffer utilization
* rate of request buffer, please check comment of MDS_LOV_BUFSIZE for details.
*/
#define OST_IO_BUFSIZE max_t(int, OST_IO_MAXREQSIZE + 1024, 64 * 1024)
+
/* Macro to hide a typecast. */
#define ptlrpc_req_async_args(req) ((void *)&req->rq_async_args)
/** Our own lnet nid for this connection */
lnet_nid_t c_self;
/** Remote side nid for this connection */
- lnet_process_id_t c_peer;
+ struct lnet_process_id c_peer;
/** UUID of the other side */
struct obd_uuid c_remote_uuid;
/** reference counter for this connection */
};
struct ptlrpc_request_set;
-typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
typedef int (*set_producer_func)(struct ptlrpc_request_set *, void *);
/**
atomic_t set_remaining;
/** wait queue to wait on for request events */
wait_queue_head_t set_waitq;
- wait_queue_head_t *set_wakeup_ptr;
/** List of requests in the set */
struct list_head set_requests;
/**
- * List of completion callbacks to be called when the set is completed
- * This is only used if \a set_interpret is NULL.
- * Links struct ptlrpc_set_cbdata.
- */
- struct list_head set_cblist;
- /** Completion callback, if only one. */
- set_interpreter_func set_interpret;
- /** opaq argument passed to completion \a set_interpret callback. */
- void *set_arg;
- /**
* Lock for \a set_new_requests manipulations
* locked so that any old caller can communicate requests to
* the set holder who can then fold them into the lock-free set
set_producer_func set_producer;
/** opaq argument passed to the producer callback */
void *set_producer_arg;
-};
-
-/**
- * Description of a single ptrlrpc_set callback
- */
-struct ptlrpc_set_cbdata {
- /** List linkage item */
- struct list_head psc_item;
- /** Pointer to interpreting function */
- set_interpreter_func psc_interpret;
- /** Opaq argument to pass to the callback */
- void *psc_data;
+ unsigned int set_allow_intr:1;
};
struct ptlrpc_bulk_desc;
* ptlrpc callback & work item stuff
*/
struct ptlrpc_cb_id {
- void (*cbid_fn)(lnet_event_t *ev); /* specific callback fn */
- void *cbid_arg; /* additional arg */
+ void (*cbid_fn)(struct lnet_event *ev); /* specific callback fn */
+ void *cbid_arg; /* additional arg */
};
/** Maximum number of locks to fit into reply state */
unsigned long rs_committed:1;/* the transaction was committed
and the rs was dispatched
by ptlrpc_commit_replies */
+ unsigned long rs_convert_lock:1; /* need to convert saved
+ * locks to COS mode */
+ atomic_t rs_refcount; /* number of users */
+ /** Number of locks awaiting client ACK */
+ int rs_nlocks;
+
/** Size of the state */
int rs_size;
/** opcode */
struct obd_export *rs_export;
struct ptlrpc_service_part *rs_svcpt;
/** Lnet metadata handle for the reply */
- lnet_handle_md_t rs_md_h;
- atomic_t rs_refcount;
+ struct lnet_handle_md rs_md_h;
/** Context for the sevice thread */
- struct ptlrpc_svc_ctx *rs_svc_ctx;
+ struct ptlrpc_svc_ctx *rs_svc_ctx;
/** Reply buffer (actually sent to the client), encoded if needed */
- struct lustre_msg *rs_repbuf; /* wrapper */
- /** Size of the reply buffer */
- int rs_repbuf_len; /* wrapper buf length */
- /** Size of the reply message */
- int rs_repdata_len; /* wrapper msg length */
- /**
- * Actual reply message. Its content is encrupted (if needed) to
- * produce reply buffer for actual sending. In simple case
- * of no network encryption we jus set \a rs_repbuf to \a rs_msg
- */
- struct lustre_msg *rs_msg; /* reply message */
-
- /** Number of locks awaiting client ACK */
- int rs_nlocks;
- /** Handles of locks awaiting client reply ACK */
- struct lustre_handle rs_locks[RS_MAX_LOCKS];
- /** Lock modes of locks in \a rs_locks */
- ldlm_mode_t rs_modes[RS_MAX_LOCKS];
+ struct lustre_msg *rs_repbuf; /* wrapper */
+ /** Size of the reply buffer */
+ int rs_repbuf_len; /* wrapper buf length */
+ /** Size of the reply message */
+ int rs_repdata_len; /* wrapper msg length */
+ /**
+ * Actual reply message. Its content is encrupted (if needed) to
+ * produce reply buffer for actual sending. In simple case
+ * of no network encryption we jus set \a rs_repbuf to \a rs_msg
+ */
+ struct lustre_msg *rs_msg; /* reply message */
+
+ /** Handles of locks awaiting client reply ACK */
+ struct lustre_handle rs_locks[RS_MAX_LOCKS];
+ /** Lock modes of locks in \a rs_locks */
+ enum ldlm_mode rs_modes[RS_MAX_LOCKS];
};
struct ptlrpc_thread;
/** RPC stages */
enum rq_phase {
- RQ_PHASE_NEW = 0xebc0de00,
- RQ_PHASE_RPC = 0xebc0de01,
- RQ_PHASE_BULK = 0xebc0de02,
- RQ_PHASE_INTERPRET = 0xebc0de03,
- RQ_PHASE_COMPLETE = 0xebc0de04,
- RQ_PHASE_UNREGISTERING = 0xebc0de05,
- RQ_PHASE_UNDEFINED = 0xebc0de06
+ RQ_PHASE_NEW = 0xebc0de00,
+ RQ_PHASE_RPC = 0xebc0de01,
+ RQ_PHASE_BULK = 0xebc0de02,
+ RQ_PHASE_INTERPRET = 0xebc0de03,
+ RQ_PHASE_COMPLETE = 0xebc0de04,
+ RQ_PHASE_UNREG_RPC = 0xebc0de05,
+ RQ_PHASE_UNREG_BULK = 0xebc0de06,
+ RQ_PHASE_UNDEFINED = 0xebc0de07
};
/** Type of request interpreter call-back */
/** For bulk requests on client only: bulk descriptor */
struct ptlrpc_bulk_desc *cr_bulk;
/** optional time limit for send attempts */
- cfs_duration_t cr_delay_limit;
+ time64_t cr_delay_limit;
/** time request was first queued */
- cfs_time_t cr_queued_time;
- /** request sent timeval */
- struct timeval cr_sent_tv;
+ time64_t cr_queued_time;
+ /** request sent in nanoseconds */
+ ktime_t cr_sent_ns;
/** time for request really sent out */
- time_t cr_sent_out;
+ time64_t cr_sent_out;
/** when req reply unlink must finish. */
- time_t cr_reply_deadline;
+ time64_t cr_reply_deadline;
/** when req bulk unlink must finish. */
- time_t cr_bulk_deadline;
+ time64_t cr_bulk_deadline;
+ /** when req unlink must finish. */
+ time64_t cr_req_deadline;
/** Portal to which this request would be sent */
short cr_req_ptl;
/** Portal where to wait for reply and where reply would be sent */
/** Link back to the request set */
struct ptlrpc_request_set *cr_set;
/** outgoing request MD handle */
- lnet_handle_md_t cr_req_md_h;
+ struct lnet_handle_md cr_req_md_h;
/** request-out callback parameter */
struct ptlrpc_cb_id cr_req_cbid;
/** incoming reply MD handle */
- lnet_handle_md_t cr_reply_md_h;
+ struct lnet_handle_md cr_reply_md_h;
wait_queue_head_t cr_reply_waitq;
/** reply callback parameter */
struct ptlrpc_cb_id cr_reply_cbid;
union ptlrpc_async_args cr_async_args;
/** Opaq data for replay and commit callbacks. */
void *cr_cb_data;
+ /** Link to the imp->imp_unreplied_list */
+ struct list_head cr_unreplied_list;
/**
* Commit callback, called when request is committed and about to be
* freed.
#define rq_bulk rq_cli.cr_bulk
#define rq_delay_limit rq_cli.cr_delay_limit
#define rq_queued_time rq_cli.cr_queued_time
-#define rq_sent_tv rq_cli.cr_sent_tv
+#define rq_sent_ns rq_cli.cr_sent_ns
#define rq_real_sent rq_cli.cr_sent_out
#define rq_reply_deadline rq_cli.cr_reply_deadline
#define rq_bulk_deadline rq_cli.cr_bulk_deadline
+#define rq_req_deadline rq_cli.cr_req_deadline
#define rq_nr_resend rq_cli.cr_resend_nr
#define rq_request_portal rq_cli.cr_req_ptl
#define rq_reply_portal rq_cli.cr_rep_ptl
#define rq_resend_cb rq_cli.cr_resend_cb
#define rq_async_args rq_cli.cr_async_args
#define rq_cb_data rq_cli.cr_cb_data
+#define rq_unreplied_list rq_cli.cr_unreplied_list
#define rq_commit_cb rq_cli.cr_commit_cb
#define rq_replay_cb rq_cli.cr_replay_cb
/** history sequence # */
__u64 sr_hist_seq;
/** the index of service's srv_at_array into which request is linked */
- time_t sr_at_index;
+ __u32 sr_at_index;
/** authed uid */
uid_t sr_auth_uid;
/** authed uid mapped to */
struct ptlrpc_nrs_request sr_nrq;
/** @} nrs */
/** request arrival time */
- struct timeval sr_arrival_time;
+ struct timespec64 sr_arrival_time;
/** server's half ctx */
struct ptlrpc_svc_ctx *sr_svc_ctx;
/** (server side), pointed directly into req buffer */
struct ptlrpc_user_desc *sr_user_desc;
- /** separated reply state */
+ /** separated reply state, may be vmalloc'd */
struct ptlrpc_reply_state *sr_reply_state;
/** server-side hp handlers */
struct ptlrpc_hpreq_ops *sr_ops;
* rq_list
*/
spinlock_t rq_lock;
+ spinlock_t rq_early_free_lock;
/** client-side flags are serialized by rq_lock @{ */
unsigned int rq_intr:1, rq_replied:1, rq_err:1,
rq_timedout:1, rq_resend:1, rq_restart:1,
* status */
rq_allow_replay:1,
/* bulk request, sent to server, but uncommitted */
- rq_unstable:1;
+ rq_unstable:1,
+ rq_early_free_repbuf:1, /* free reply buffer in advance */
+ rq_allow_intr:1;
/** @} */
/** server-side flags @{ */
/** bulk match bits */
__u64 rq_mbits;
/**
- * List item to for replay list. Not yet commited requests get linked
+ * List item to for replay list. Not yet committed requests get linked
* there.
* Also see \a rq_replay comment above.
* It's also link chain on obd_export::exp_req_replay_queue
/** description of flavors for client & server */
struct sptlrpc_flavor rq_flvr;
+ /**
+ * SELinux policy info at the time of the request
+ * sepol string format is:
+ * <mode>:<policy name>:<policy version>:<policy hash>
+ */
+ char rq_sepol[LUSTRE_NODEMAP_SEPOL_LENGTH + 1];
+
/* client/server security flags */
unsigned int
rq_ctx_init:1, /* context initiation */
rq_bulk_write:1, /* request bulk write */
/* server authentication flags */
rq_auth_gss:1, /* authenticated by gss */
- rq_auth_remote:1, /* authed as remote user */
rq_auth_usr_root:1, /* authed as root */
rq_auth_usr_mdt:1, /* authed as mdt */
rq_auth_usr_ost:1, /* authed as ost */
/** various buffer pointers */
- struct lustre_msg *rq_reqbuf; /**< req wrapper */
- char *rq_repbuf; /**< rep buffer */
- struct lustre_msg *rq_repdata; /**< rep wrapper msg */
+ struct lustre_msg *rq_reqbuf; /**< req wrapper, vmalloc*/
+ char *rq_repbuf; /**< rep buffer, vmalloc */
+ struct lustre_msg *rq_repdata; /**< rep wrapper msg */
/** only in priv mode */
struct lustre_msg *rq_clrbuf;
int rq_reqbuf_len; /* req wrapper buf len */
/** early replies go to offset 0, regular replies go after that */
unsigned int rq_reply_off;
-
/** @} */
/** Fields that help to see if request and reply were swabbed or not */
/** our LNet NID */
lnet_nid_t rq_self;
/** Peer description (the other side) */
- lnet_process_id_t rq_peer;
+ struct lnet_process_id rq_peer;
+ /** Descriptor for the NID from which the peer sent the request. */
+ struct lnet_process_id rq_source;
/**
* service time estimate (secs)
* If the request is not served by this time, it is marked as timed out.
+ * Do not change to time64_t since this is transmitted over the wire.
*/
- int rq_timeout;
+ time_t rq_timeout;
/**
* when request/reply sent (secs), or time when request should be sent
*/
- time_t rq_sent;
+ time64_t rq_sent;
/** when request must finish. */
- time_t rq_deadline;
+ time64_t rq_deadline;
/** request format description */
struct req_capsule rq_pill;
};
static inline int ptlrpc_req_interpret(const struct lu_env *env,
struct ptlrpc_request *req, int rc)
{
- if (req->rq_interpret_reply != NULL) {
- req->rq_status = req->rq_interpret_reply(env, req,
- &req->rq_async_args,
- rc);
- return req->rq_status;
- }
- return rc;
+ if (req->rq_interpret_reply != NULL) {
+ req->rq_status = req->rq_interpret_reply(env, req,
+ &req->rq_async_args,
+ rc);
+ return req->rq_status;
+ }
+
+ return rc;
}
/** \addtogroup nrs
/** @} nrs */
/**
- * Returns 1 if request buffer at offset \a index was already swabbed
+ * Returns true if request buffer at offset \a index was already swabbed
*/
-static inline int lustre_req_swabbed(struct ptlrpc_request *req, size_t index)
+static inline bool lustre_req_swabbed(struct ptlrpc_request *req, size_t index)
{
- LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
- return req->rq_req_swab_mask & (1 << index);
+ LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
+ return req->rq_req_swab_mask & (1 << index);
}
/**
- * Returns 1 if request reply buffer at offset \a index was already swabbed
+ * Returns true if request reply buffer at offset \a index was already swabbed
*/
-static inline int lustre_rep_swabbed(struct ptlrpc_request *req, size_t index)
+static inline bool lustre_rep_swabbed(struct ptlrpc_request *req, size_t index)
{
- LASSERT(index < sizeof(req->rq_rep_swab_mask) * 8);
- return req->rq_rep_swab_mask & (1 << index);
+ LASSERT(index < sizeof(req->rq_rep_swab_mask) * 8);
+ return req->rq_rep_swab_mask & (1 << index);
}
/**
- * Returns 1 if request needs to be swabbed into local cpu byteorder
+ * Returns true if request needs to be swabbed into local cpu byteorder
*/
-static inline int ptlrpc_req_need_swab(struct ptlrpc_request *req)
+static inline bool ptlrpc_req_need_swab(struct ptlrpc_request *req)
{
- return lustre_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
+ return lustre_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
}
/**
- * Returns 1 if request reply needs to be swabbed into local cpu byteorder
+ * Returns true if request reply needs to be swabbed into local cpu byteorder
*/
-static inline int ptlrpc_rep_need_swab(struct ptlrpc_request *req)
+static inline bool ptlrpc_rep_need_swab(struct ptlrpc_request *req)
{
- return lustre_rep_swabbed(req, MSG_PTLRPC_HEADER_OFF);
+ return lustre_rep_swabbed(req, MSG_PTLRPC_HEADER_OFF);
}
/**
static inline const char *
ptlrpc_phase2str(enum rq_phase phase)
{
- switch (phase) {
- case RQ_PHASE_NEW:
- return "New";
- case RQ_PHASE_RPC:
- return "Rpc";
- case RQ_PHASE_BULK:
- return "Bulk";
- case RQ_PHASE_INTERPRET:
- return "Interpret";
- case RQ_PHASE_COMPLETE:
- return "Complete";
- case RQ_PHASE_UNREGISTERING:
- return "Unregistering";
- default:
- return "?Phase?";
- }
+ switch (phase) {
+ case RQ_PHASE_NEW:
+ return "New";
+ case RQ_PHASE_RPC:
+ return "Rpc";
+ case RQ_PHASE_BULK:
+ return "Bulk";
+ case RQ_PHASE_INTERPRET:
+ return "Interpret";
+ case RQ_PHASE_COMPLETE:
+ return "Complete";
+ case RQ_PHASE_UNREG_RPC:
+ return "UnregRPC";
+ case RQ_PHASE_UNREG_BULK:
+ return "UnregBULK";
+ default:
+ return "?Phase?";
+ }
}
/**
#define FLAG(field, str) (field ? str : "")
/** Convert bit flags into a string */
-#define DEBUG_REQ_FLAGS(req) \
- ptlrpc_rqphase2str(req), \
- FLAG(req->rq_intr, "I"), FLAG(req->rq_replied, "R"), \
- FLAG(req->rq_err, "E"), \
- FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"), \
- FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"), \
- FLAG(req->rq_no_resend, "N"), \
- FLAG(req->rq_waiting, "W"), \
- FLAG(req->rq_wait_ctx, "C"), FLAG(req->rq_hp, "H"), \
- FLAG(req->rq_committed, "M")
-
-#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s%s"
+#define DEBUG_REQ_FLAGS(req) \
+ ptlrpc_rqphase2str(req), \
+ FLAG(req->rq_intr, "I"), FLAG(req->rq_replied, "R"), \
+ FLAG(req->rq_err, "E"), FLAG(req->rq_net_err, "e"), \
+ FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"), \
+ FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"), \
+ FLAG(req->rq_no_resend, "N"), \
+ FLAG(req->rq_waiting, "W"), \
+ FLAG(req->rq_wait_ctx, "C"), FLAG(req->rq_hp, "H"), \
+ FLAG(req->rq_committed, "M"), \
+ FLAG(req->rq_req_unlinked, "Q"), \
+ FLAG(req->rq_reply_unlinked, "U"), \
+ FLAG(req->rq_receiving_reply, "r")
+
+#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s"
void _debug_req(struct ptlrpc_request *req,
struct libcfs_debug_msg_data *data, const char *fmt, ...)
#define DEBUG_REQ(level, req, fmt, args...) \
do { \
if ((level) & (D_ERROR | D_WARNING)) { \
- static cfs_debug_limit_state_t cdls; \
+ static struct cfs_debug_limit_state cdls; \
LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, level, &cdls); \
debug_req(&msgdata, level, &cdls, req, "@@@ "fmt" ", ## args);\
} else { \
int bd_md_count; /* # valid entries in bd_mds */
int bd_md_max_brw; /* max entries in bd_mds */
/** array of associated MDs */
- lnet_handle_md_t bd_mds[PTLRPC_BULK_OPS_COUNT];
+ struct lnet_handle_md bd_mds[PTLRPC_BULK_OPS_COUNT];
union {
struct {
* encrypt iov, size is either 0 or bd_iov_count.
*/
lnet_kiov_t *bd_enc_vec;
- lnet_kiov_t bd_vec[0];
+ lnet_kiov_t *bd_vec;
} bd_kiov;
struct {
struct kvec *bd_enc_kvec;
- struct kvec bd_kvec[0];
+ struct kvec *bd_kvec;
} bd_kvec;
} bd_u;
#define BD_GET_ENC_KVEC(desc, i) ((desc)->bd_u.bd_kvec.bd_enc_kvec[i])
enum {
- SVC_STOPPED = 1 << 0,
- SVC_STOPPING = 1 << 1,
- SVC_STARTING = 1 << 2,
- SVC_RUNNING = 1 << 3,
- SVC_EVENT = 1 << 4,
- SVC_SIGNAL = 1 << 5,
+ SVC_INIT = 0,
+ SVC_STOPPED = 1 << 0,
+ SVC_STOPPING = 1 << 1,
+ SVC_STARTING = 1 << 2,
+ SVC_RUNNING = 1 << 3,
+ SVC_EVENT = 1 << 4,
+ SVC_SIGNAL = 1 << 5,
};
#define PTLRPC_THR_NAME_LEN 32
* Definition of server service thread structure
*/
struct ptlrpc_thread {
- /**
- * List of active threads in svc->srv_threads
- */
+ /**
+ * List of active threads in svc->srv_threads
+ */
struct list_head t_link;
- /**
- * thread-private data (preallocated memory)
- */
- void *t_data;
- __u32 t_flags;
- /**
- * service thread index, from ptlrpc_start_threads
- */
- unsigned int t_id;
- /**
- * service thread pid
- */
- pid_t t_pid;
- /**
- * put watchdog in the structure per thread b=14840
- */
- struct lc_watchdog *t_watchdog;
- /**
- * the svc this thread belonged to b=18582
- */
+ /**
+ * thread-private data (preallocated vmalloc'd memory)
+ */
+ void *t_data;
+ __u32 t_flags;
+ /**
+ * service thread index, from ptlrpc_start_threads
+ */
+ unsigned int t_id;
+ /**
+ * service thread
+ */
+ struct task_struct *t_task;
+ pid_t t_pid;
+ ktime_t t_touched;
+ /**
+ * put watchdog in the structure per thread b=14840
+ */
+ struct delayed_work t_watchdog;
+ /**
+ * the svc this thread belonged to b=18582
+ */
struct ptlrpc_service_part *t_svcpt;
wait_queue_head_t t_ctl_waitq;
struct lu_env *t_env;
/** Back pointer to service for which this buffer is registered */
struct ptlrpc_service_part *rqbd_svcpt;
/** LNet descriptor */
- lnet_handle_md_t rqbd_md_h;
+ struct lnet_handle_md rqbd_md_h;
int rqbd_refcount;
/** The buffer itself */
char *rqbd_buffer;
int srv_nthrs_cpt_init;
/** limit of threads number for each partition */
int srv_nthrs_cpt_limit;
- /** Root of /proc dir tree for this service */
- struct proc_dir_entry *srv_procroot;
+ /** Root of debugfs dir tree for this service */
+ struct dentry *srv_debugfs_entry;
/** Pointer to statistic data for this service */
struct lprocfs_stats *srv_stats;
/** # hp per lp reqs to handle */
int srv_watchdog_factor;
/** under unregister_service */
unsigned srv_is_stopping:1;
+ /** Whether or not to restrict service threads to CPUs in this CPT */
+ unsigned srv_cpt_bind:1;
+ /** max # request buffers */
+ int srv_nrqbds_max;
/** max # request buffers in history per partition */
int srv_hist_nrqbds_cpt_max;
- /** number of CPTs this service bound on */
+ /** number of CPTs this service associated with */
int srv_ncpts;
- /** CPTs array this service bound on */
+ /** CPTs array this service associated with */
__u32 *srv_cpts;
/** 2^srv_cptab_bits >= cfs_cpt_numbert(srv_cptable) */
int srv_cpt_bits;
/** CPT table this service is running over */
struct cfs_cpt_table *srv_cptable;
+
+ /* sysfs object */
+ struct kobject srv_kobj;
+ struct completion srv_kobj_unregister;
/**
* partition data for ptlrpc service
*/
* threads starting & stopping are also protected by this lock.
*/
spinlock_t scp_lock __cfs_cacheline_aligned;
+ /** userland serialization */
+ struct mutex scp_mutex;
/** total # req buffer descs allocated */
int scp_nrqbds_total;
/** # posted request buffers for receiving */
struct list_head scp_rqbd_posted;
/** incoming reqs */
struct list_head scp_req_incoming;
- /** timeout before re-posting reqs, in tick */
- cfs_duration_t scp_rqbd_timeout;
+ /** timeout before re-posting reqs, in jiffies */
+ long scp_rqbd_timeout;
/**
* all threads sleep on this. This wait-queue is signalled when new
* incoming request arrives and when difficult reply has to be handled.
/** early reply timer */
struct timer_list scp_at_timer;
/** debug */
- cfs_time_t scp_at_checktime;
+ ktime_t scp_at_checktime;
/** check early replies */
unsigned scp_at_check;
/** @} */
*/
char pc_name[16];
/**
- * Environment for request interpreters to run in.
- */
- struct lu_env pc_env;
- /**
* CPT the thread is bound on.
*/
int pc_cpt;
/** @} nrs */
/* ptlrpc/events.c */
-extern lnet_handle_eq_t ptlrpc_eq_h;
+extern struct lnet_handle_eq ptlrpc_eq_h;
extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
- lnet_process_id_t *peer, lnet_nid_t *self);
+ struct lnet_process_id *peer, lnet_nid_t *self);
/**
* These callbacks are invoked by LNet when something happened to
* underlying buffer
* @{
*/
-extern void request_out_callback(lnet_event_t *ev);
-extern void reply_in_callback(lnet_event_t *ev);
-extern void client_bulk_callback(lnet_event_t *ev);
-extern void request_in_callback(lnet_event_t *ev);
-extern void reply_out_callback(lnet_event_t *ev);
+extern void request_out_callback(struct lnet_event *ev);
+extern void reply_in_callback(struct lnet_event *ev);
+extern void client_bulk_callback(struct lnet_event *ev);
+extern void request_in_callback(struct lnet_event *ev);
+extern void reply_out_callback(struct lnet_event *ev);
#ifdef HAVE_SERVER_SUPPORT
-extern void server_bulk_callback(lnet_event_t *ev);
+extern void server_bulk_callback(struct lnet_event *ev);
#endif
/** @} */
/* ptlrpc/connection.c */
-struct ptlrpc_connection *ptlrpc_connection_get(lnet_process_id_t peer,
+struct ptlrpc_connection *ptlrpc_connection_get(struct lnet_process_id peer,
lnet_nid_t self,
struct obd_uuid *uuid);
int ptlrpc_connection_put(struct ptlrpc_connection *c);
void ptlrpc_connection_fini(void);
extern lnet_pid_t ptl_get_pid(void);
+/*
+ * Check if the peer connection is on the local node. We need to use GFP_NOFS
+ * for requests from a local client to avoid recursing into the filesystem
+ * as we might end up waiting on a page sent in the request we're serving.
+ *
+ * Use __GFP_HIGHMEM so that the pages can use all of the available memory
+ * on 32-bit machines. Use more aggressive GFP_HIGHUSER flags from non-local
+ * clients to be able to generate more memory pressure on the OSS and allow
+ * inactive pages to be reclaimed, since it doesn't have any other processes
+ * or allocations that generate memory reclaim pressure.
+ *
+ * See b=17576 (bdf50dc9) and b=19529 (3dcf18d3) for details.
+ */
+static inline bool ptlrpc_connection_is_local(struct ptlrpc_connection *conn)
+{
+ if (!conn)
+ return false;
+
+ if (conn->c_peer.nid == conn->c_self)
+ return true;
+
+ RETURN(LNetIsPeerLocal(conn->c_peer.nid));
+}
+
/* ptlrpc/niobuf.c */
/**
* Actual interfacing with LNet to put/get/register/unregister stuff
static inline int ptlrpc_client_bulk_active(struct ptlrpc_request *req)
{
struct ptlrpc_bulk_desc *desc;
- int rc;
+ int rc;
- LASSERT(req != NULL);
+ LASSERT(req != NULL);
desc = req->rq_bulk;
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) &&
- req->rq_bulk_deadline > cfs_time_current_sec())
- return 1;
+ if (!desc)
+ return 0;
+
+ if (req->rq_bulk_deadline > ktime_get_real_seconds())
+ return 1;
- if (!desc)
- return 0;
spin_lock(&desc->bd_lock);
rc = desc->bd_md_count;
void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
struct ptlrpc_client *);
void ptlrpc_cleanup_client(struct obd_import *imp);
-struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid);
+struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid,
+ lnet_nid_t nid4refnet);
int ptlrpc_queue_wait(struct ptlrpc_request *req);
int ptlrpc_replay_req(struct ptlrpc_request *req);
struct ptlrpc_request_set *ptlrpc_prep_set(void);
struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
void *arg);
-int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
- set_interpreter_func fn, void *data);
int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set);
-int ptlrpc_set_wait(struct ptlrpc_request_set *);
+int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *);
void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
void ptlrpc_set_destroy(struct ptlrpc_request_set *);
void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
__u32 version, int opcode, char **bufs,
struct ptlrpc_cli_ctx *ctx);
-struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version,
- int opcode, int count, __u32 *lengths,
- char **bufs);
-struct ptlrpc_request *ptlrpc_prep_req_pool(struct obd_import *imp,
- __u32 version, int opcode,
- int count, __u32 *lengths, char **bufs,
- struct ptlrpc_request_pool *pool);
void ptlrpc_req_finished(struct ptlrpc_request *request);
void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request);
struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req);
int i;
for (i = 0; i < desc->bd_iov_count ; i++)
- page_cache_release(BD_GET_KIOV(desc, i).kiov_page);
+ put_page(BD_GET_KIOV(desc, i).kiov_page);
}
static inline void ptlrpc_release_bulk_noop(struct ptlrpc_bulk_desc *desc)
/* user specified threads number, it will be validated due to
* other members of this structure. */
unsigned int tc_nthrs_user;
- /* set NUMA node affinity for service threads */
- unsigned int tc_cpu_affinity;
+ /* bind service threads to only CPUs in their associated CPT */
+ unsigned int tc_cpu_bind;
/* Tags for lu_context associated with service thread */
__u32 tc_ctx_tags;
};
struct cfs_cpt_table *cc_cptable;
/* string pattern to describe CPTs for a service */
char *cc_pattern;
+ /* whether or not to have per-CPT service partitions */
+ bool cc_affinity;
};
struct ptlrpc_service_conf {
*
* @{
*/
-void ptlrpc_save_lock(struct ptlrpc_request *req,
- struct lustre_handle *lock, int mode, int no_ack);
+void ptlrpc_save_lock(struct ptlrpc_request *req, struct lustre_handle *lock,
+ int mode, bool no_ack, bool convert_lock);
void ptlrpc_commit_replies(struct obd_export *exp);
void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs);
void ptlrpc_schedule_difficult_reply(struct ptlrpc_reply_state *rs);
int ptlrpc_hpreq_handler(struct ptlrpc_request *req);
struct ptlrpc_service *ptlrpc_register_service(
struct ptlrpc_service_conf *conf,
- struct proc_dir_entry *proc_entry);
+ struct kset *parent,
+ struct dentry *debugfs_entry);
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
int ptlrpc_start_threads(struct ptlrpc_service *svc);
int ptlrpc_unregister_service(struct ptlrpc_service *service);
-int liblustre_check_services(void *arg);
-void ptlrpc_daemonize(char *name);
int ptlrpc_service_health_check(struct ptlrpc_service *);
void ptlrpc_server_drop_request(struct ptlrpc_request *req);
void ptlrpc_request_change_export(struct ptlrpc_request *req,
struct obd_export *export);
-void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay);
+void ptlrpc_update_export_timer(struct obd_export *exp,
+ time64_t extra_delay);
int ptlrpc_hr_init(void);
void ptlrpc_hr_fini(void);
int ptlrpc_connect_import(struct obd_import *imp);
int ptlrpc_init_import(struct obd_import *imp);
int ptlrpc_disconnect_import(struct obd_import *imp, int noclose);
+int ptlrpc_disconnect_and_idle_import(struct obd_import *imp);
int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
void deuuidify(char *uuid, const char *prefix, char **uuid_start,
- int *uuid_len);
-
+ int *uuid_len);
+void ptlrpc_import_enter_resend(struct obd_import *imp);
/* ptlrpc/pack_generic.c */
int ptlrpc_reconnect_import(struct obd_import *imp);
/** @} */
*
* @{
*/
-int ptlrpc_buf_need_swab(struct ptlrpc_request *req, const int inout,
- __u32 index);
+bool ptlrpc_buf_need_swab(struct ptlrpc_request *req, const int inout,
+ __u32 index);
void ptlrpc_buf_set_swabbed(struct ptlrpc_request *req, const int inout,
__u32 index);
int ptlrpc_unpack_rep_msg(struct ptlrpc_request *req, int len);
void lustre_msg_add_op_flags(struct lustre_msg *msg, __u32 flags);
struct lustre_handle *lustre_msg_get_handle(struct lustre_msg *msg);
__u32 lustre_msg_get_type(struct lustre_msg *msg);
-__u32 lustre_msg_get_version(struct lustre_msg *msg);
+enum lustre_msg_version lustre_msg_get_version(struct lustre_msg *msg);
void lustre_msg_add_version(struct lustre_msg *msg, __u32 version);
__u32 lustre_msg_get_opc(struct lustre_msg *msg);
__u64 lustre_msg_get_last_xid(struct lustre_msg *msg);
char *lustre_msg_get_jobid(struct lustre_msg *msg);
__u32 lustre_msg_get_cksum(struct lustre_msg *msg);
__u64 lustre_msg_get_mbits(struct lustre_msg *msg);
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 7, 53, 0)
-__u32 lustre_msg_calc_cksum(struct lustre_msg *msg, int compat18);
-#else
__u32 lustre_msg_calc_cksum(struct lustre_msg *msg);
-#endif
void lustre_msg_set_handle(struct lustre_msg *msg,struct lustre_handle *handle);
void lustre_msg_set_type(struct lustre_msg *msg, __u32 type);
void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc);
if (req->rq_phase == new_phase)
return;
- if (new_phase == RQ_PHASE_UNREGISTERING) {
+ if (new_phase == RQ_PHASE_UNREG_RPC ||
+ new_phase == RQ_PHASE_UNREG_BULK) {
+ /* No embedded unregistering phases */
+ if (req->rq_phase == RQ_PHASE_UNREG_RPC ||
+ req->rq_phase == RQ_PHASE_UNREG_BULK)
+ return;
+
req->rq_next_phase = req->rq_phase;
if (req->rq_import)
atomic_inc(&req->rq_import->imp_unregistering);
}
- if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
+ if (req->rq_phase == RQ_PHASE_UNREG_RPC ||
+ req->rq_phase == RQ_PHASE_UNREG_BULK) {
if (req->rq_import)
atomic_dec(&req->rq_import->imp_unregistering);
}
static inline int
ptlrpc_client_early(struct ptlrpc_request *req)
{
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
- req->rq_reply_deadline > cfs_time_current_sec())
- return 0;
return req->rq_early;
}
static inline int
ptlrpc_client_replied(struct ptlrpc_request *req)
{
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
- req->rq_reply_deadline > cfs_time_current_sec())
- return 0;
- return req->rq_replied;
+ if (req->rq_reply_deadline > ktime_get_real_seconds())
+ return 0;
+ return req->rq_replied;
}
/** Returns true if request \a req is in process of receiving server reply */
static inline int
ptlrpc_client_recv(struct ptlrpc_request *req)
{
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
- req->rq_reply_deadline > cfs_time_current_sec())
- return 1;
- return req->rq_receiving_reply;
+ if (req->rq_reply_deadline > ktime_get_real_seconds())
+ return 1;
+ return req->rq_receiving_reply;
}
static inline int
int rc;
spin_lock(&req->rq_lock);
- if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
- req->rq_reply_deadline > cfs_time_current_sec()) {
+ if (req->rq_reply_deadline > ktime_get_real_seconds()) {
spin_unlock(&req->rq_lock);
return 1;
}
+ if (req->rq_req_deadline > ktime_get_real_seconds()) {
+ spin_unlock(&req->rq_lock);
+ return 1;
+ }
+
rc = !req->rq_req_unlinked || !req->rq_reply_unlinked ||
req->rq_receiving_reply;
spin_unlock(&req->rq_lock);
static inline void
ptlrpc_client_wake_req(struct ptlrpc_request *req)
{
+ smp_mb();
if (req->rq_set == NULL)
wake_up(&req->rq_reply_waitq);
else
static inline int ptlrpc_send_limit_expired(struct ptlrpc_request *req)
{
if (req->rq_delay_limit != 0 &&
- cfs_time_before(cfs_time_add(req->rq_queued_time,
- cfs_time_seconds(req->rq_delay_limit)),
- cfs_time_current())) {
+ req->rq_queued_time + req->rq_delay_limit < ktime_get_seconds())
return 1;
- }
return 0;
}
typedef int (*timeout_cb_t)(struct timeout_item *, void *);
int ptlrpc_pinger_add_import(struct obd_import *imp);
int ptlrpc_pinger_del_import(struct obd_import *imp);
-int ptlrpc_add_timeout_client(int time, enum timeout_event event,
+int ptlrpc_add_timeout_client(time64_t time, enum timeout_event event,
timeout_cb_t cb, void *data,
struct list_head *obd_list);
int ptlrpc_del_timeout_client(struct list_head *obd_list,
* @{
*/
const char* ll_opcode2str(__u32 opcode);
+const int ll_str2opcode(const char *ops);
#ifdef CONFIG_PROC_FS
void ptlrpc_lprocfs_register_obd(struct obd_device *obd);
void ptlrpc_lprocfs_unregister_obd(struct obd_device *obd);
/* ptlrpc/llog_server.c */
int llog_origin_handle_open(struct ptlrpc_request *req);
-int llog_origin_handle_destroy(struct ptlrpc_request *req);
int llog_origin_handle_prev_block(struct ptlrpc_request *req);
int llog_origin_handle_next_block(struct ptlrpc_request *req);
int llog_origin_handle_read_header(struct ptlrpc_request *req);
-int llog_origin_handle_close(struct ptlrpc_request *req);
/* ptlrpc/llog_client.c */
extern struct llog_operations llog_client_ops;