* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2016, Intel Corporation.
+ * Copyright (c) 2010, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* @{
*/
#include <linux/kobject.h>
+#include <linux/rhashtable.h>
#include <linux/uio.h>
#include <libcfs/libcfs.h>
-#include <lnet/nidstr.h>
#include <lnet/api.h>
-#include <lustre/lustre_idl.h>
+#include <lnet/lib-types.h>
+#include <uapi/linux/lnet/nidstr.h>
+#include <uapi/linux/lustre/lustre_idl.h>
#include <lustre_ha.h>
#include <lustre_sec.h>
#include <lustre_import.h>
#include <lu_object.h>
#include <lustre_req_layout.h>
#include <obd_support.h>
-#include <lustre_ver.h>
+#include <uapi/linux/lustre/lustre_ver.h>
/* MD flags we _always_ use */
#define PTLRPC_MD_OPTIONS 0
* value. The client is free to limit the actual RPC size for any bulk
* transfer via cl_max_pages_per_rpc to some non-power-of-two value.
* NOTE: This is limited to 16 (=64GB RPCs) by IOOBJ_MAX_BRW_BITS. */
-#define PTLRPC_BULK_OPS_BITS 4
+#define PTLRPC_BULK_OPS_BITS 6
#if PTLRPC_BULK_OPS_BITS > 16
#error "More than 65536 BRW RPCs not allowed by IOOBJ_MAX_BRW_BITS."
#endif
/**
* OST_IO_MAXREQSIZE ~=
- * lustre_msg + ptlrpc_body + obdo + obd_ioobj +
- * DT_MAX_BRW_PAGES * niobuf_remote
+ * lustre_msg + ptlrpc_body + obdo + obd_ioobj +
+ * DT_MAX_BRW_PAGES * niobuf_remote
*
* - single object with 16 pages is 512 bytes
- * - OST_IO_MAXREQSIZE must be at least 1 page of cookies plus some spillover
+ * - OST_IO_MAXREQSIZE must be at least 1 niobuf per page of data
* - Must be a multiple of 1024
- * - actual size is about 18K
+ * - should allow a reasonably large SHORT_IO_BYTES size (64KB)
*/
-#define _OST_MAXREQSIZE_SUM (sizeof(struct lustre_msg) + \
- sizeof(struct ptlrpc_body) + \
- sizeof(struct obdo) + \
- sizeof(struct obd_ioobj) + \
- sizeof(struct niobuf_remote) * DT_MAX_BRW_PAGES)
+#define _OST_MAXREQSIZE_BASE ((unsigned long)(sizeof(struct lustre_msg) + \
+ /* lm_buflens */ sizeof(__u32) * 4 + \
+ sizeof(struct ptlrpc_body) + \
+ sizeof(struct obdo) + \
+ sizeof(struct obd_ioobj) + \
+ sizeof(struct niobuf_remote)))
+#define _OST_MAXREQSIZE_SUM ((unsigned long)(_OST_MAXREQSIZE_BASE + \
+ sizeof(struct niobuf_remote) * \
+ DT_MAX_BRW_PAGES))
/**
* FIEMAP request can be 4K+ for now
*/
-#define OST_MAXREQSIZE (16 * 1024)
-#define OST_IO_MAXREQSIZE max_t(int, OST_MAXREQSIZE, \
- (((_OST_MAXREQSIZE_SUM - 1) | (1024 - 1)) + 1))
+#define OST_MAXREQSIZE (16UL * 1024UL)
+#define OST_IO_MAXREQSIZE max(OST_MAXREQSIZE, \
+ ((_OST_MAXREQSIZE_SUM - 1) | \
+ (1024UL - 1)) + 1)
+/* Safe estimate of free space in standard RPC, provides upper limit for # of
+ * bytes of i/o to pack in RPC (skipping bulk transfer). */
+#define OST_MAX_SHORT_IO_BYTES ((OST_IO_MAXREQSIZE - _OST_MAXREQSIZE_BASE) & \
+ PAGE_MASK)
+
+/* Actual size used for short i/o buffer. Calculation means this:
+ * At least one page (for large PAGE_SIZE), or 16 KiB, but not more
+ * than the available space aligned to a page boundary. */
+#define OBD_DEF_SHORT_IO_BYTES min(max(PAGE_SIZE, 16UL * 1024UL), \
+ OST_MAX_SHORT_IO_BYTES)
#define OST_MAXREPSIZE (9 * 1024)
#define OST_IO_MAXREPSIZE OST_MAXREPSIZE
#define OST_NBUFS 64
/** OST_BUFSIZE = max_reqsize + max sptlrpc payload size */
-#define OST_BUFSIZE max_t(int, OST_MAXREQSIZE + 1024, 16 * 1024)
+#define OST_BUFSIZE max_t(int, OST_MAXREQSIZE + 1024, 32 * 1024)
/**
* OST_IO_MAXREQSIZE is 18K, giving extra 46K can increase buffer utilization
* rate of request buffer, please check comment of MDS_LOV_BUFSIZE for details.
*/
#define OST_IO_BUFSIZE max_t(int, OST_IO_MAXREQSIZE + 1024, 64 * 1024)
-/* Macro to hide a typecast. */
-#define ptlrpc_req_async_args(req) ((void *)&req->rq_async_args)
+/* Macro to hide a typecast and BUILD_BUG. */
+#define ptlrpc_req_async_args(_var, req) ({ \
+ BUILD_BUG_ON(sizeof(*_var) > sizeof(req->rq_async_args)); \
+ (typeof(_var))&req->rq_async_args; \
+ })
struct ptlrpc_replay_async_args {
int praa_old_state;
*/
struct ptlrpc_connection {
/** linkage for connections hash table */
- struct hlist_node c_hash;
+ struct rhash_head c_hash;
/** Our own lnet nid for this connection */
lnet_nid_t c_self;
/** Remote side nid for this connection */
/** What portal do we expect replies on */
__u32 cli_reply_portal;
/** Name of the client */
- char *cli_name;
+ const char *cli_name;
};
/** state flags of requests */
/* XXX only ones left are those used by the bulk descs as well! */
-#define PTL_RPC_FL_INTR (1 << 0) /* reply wait was interrupted by user */
-#define PTL_RPC_FL_TIMEOUT (1 << 7) /* request timed out waiting for reply */
+#define PTL_RPC_FL_INTR BIT(0) /* reply wait was interrupted by user */
+#define PTL_RPC_FL_TIMEOUT BIT(7) /* request timed out waiting for reply */
#define REQ_MAX_ACK_LOCKS 8
union ptlrpc_async_args {
- /**
- * Scratchpad for passing args to completion interpreter. Users
- * cast to the struct of their choosing, and CLASSERT that this is
- * big enough. For _tons_ of context, OBD_ALLOC a struct and store
- * a pointer to it here. The pointer_arg ensures this struct is at
- * least big enough for that.
- */
- void *pointer_arg[11];
- __u64 space[7];
+ /**
+ * Scratchpad for passing args to completion interpreter. Users
+ * cast to the struct of their choosing, and BUILD_BUG_ON that this is
+ * big enough. For _tons_ of context, OBD_ALLOC a struct and store
+ * a pointer to it here. The pointer_arg ensures this struct is at
+ * least big enough for that.
+ */
+ void *pointer_arg[11];
+ __u64 space[7];
};
struct ptlrpc_request_set;
-typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
typedef int (*set_producer_func)(struct ptlrpc_request_set *, void *);
/**
atomic_t set_remaining;
/** wait queue to wait on for request events */
wait_queue_head_t set_waitq;
- wait_queue_head_t *set_wakeup_ptr;
/** List of requests in the set */
struct list_head set_requests;
/**
- * List of completion callbacks to be called when the set is completed
- * This is only used if \a set_interpret is NULL.
- * Links struct ptlrpc_set_cbdata.
- */
- struct list_head set_cblist;
- /** Completion callback, if only one. */
- set_interpreter_func set_interpret;
- /** opaq argument passed to completion \a set_interpret callback. */
- void *set_arg;
- /**
* Lock for \a set_new_requests manipulations
* locked so that any old caller can communicate requests to
* the set holder who can then fold them into the lock-free set
unsigned int set_allow_intr:1;
};
-/**
- * Description of a single ptrlrpc_set callback
- */
-struct ptlrpc_set_cbdata {
- /** List linkage item */
- struct list_head psc_item;
- /** Pointer to interpreting function */
- set_interpreter_func psc_interpret;
- /** Opaq argument to pass to the callback */
- void *psc_data;
-};
-
struct ptlrpc_bulk_desc;
struct ptlrpc_service_part;
struct ptlrpc_service;
struct ptlrpc_cli_req {
/** For bulk requests on client only: bulk descriptor */
struct ptlrpc_bulk_desc *cr_bulk;
- /** optional time limit for send attempts */
- cfs_duration_t cr_delay_limit;
+ /** optional time limit for send attempts. This is a timeout
+ * not a timestamp so timeout_t (s32) is used instead of time64_t
+ */
+ timeout_t cr_delay_limit;
/** time request was first queued */
- cfs_time_t cr_queued_time;
+ time64_t cr_queued_time;
/** request sent in nanoseconds */
ktime_t cr_sent_ns;
/** time for request really sent out */
unsigned int
rq_hp:1, /**< high priority RPC */
rq_at_linked:1, /**< link into service's srv_at_array */
- rq_packed_final:1; /**< packed final reply */
+ rq_packed_final:1, /**< packed final reply */
+ rq_obsolete:1; /* aborted by a signal on a client */
/** @} */
/** one of RQ_PHASE_* */
/** description of flavors for client & server */
struct sptlrpc_flavor rq_flvr;
+ /**
+ * SELinux policy info at the time of the request
+ * sepol string format is:
+ * <mode>:<policy name>:<policy version>:<policy hash>
+ */
+ char rq_sepol[LUSTRE_NODEMAP_SEPOL_LENGTH + 1];
+
/* client/server security flags */
unsigned int
rq_ctx_init:1, /* context initiation */
/**
* service time estimate (secs)
* If the request is not served by this time, it is marked as timed out.
+ * Do not change to time64_t since this is transmitted over the wire.
+ *
+ * The linux kernel handles timestamps with time64_t and timeouts
+ * are normally done with jiffies. Lustre shares the rq_timeout between
+ * nodes. Since jiffies can vary from node to node Lustre instead
+ * will express the timeout value in seconds. To avoid confusion with
+ * timestamps (time64_t) and jiffy timeouts (long) Lustre timeouts
+ * are expressed in s32 (timeout_t). Also what is transmitted over
+ * the wire is 32 bits.
*/
- int rq_timeout;
+ timeout_t rq_timeout;
/**
* when request/reply sent (secs), or time when request should be sent
*/
static inline int ptlrpc_req_interpret(const struct lu_env *env,
struct ptlrpc_request *req, int rc)
{
- if (req->rq_interpret_reply != NULL) {
- req->rq_status = req->rq_interpret_reply(env, req,
- &req->rq_async_args,
- rc);
- return req->rq_status;
- }
- return rc;
+ if (req->rq_interpret_reply != NULL) {
+ req->rq_status = req->rq_interpret_reply(env, req,
+ &req->rq_async_args,
+ rc);
+ return req->rq_status;
+ }
+
+ return rc;
}
/** \addtogroup nrs
/** @} nrs */
/**
- * Returns 1 if request buffer at offset \a index was already swabbed
+ * Returns true if request buffer at offset \a index was already swabbed
*/
-static inline int lustre_req_swabbed(struct ptlrpc_request *req, size_t index)
+static inline bool lustre_req_swabbed(struct ptlrpc_request *req, size_t index)
{
- LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
- return req->rq_req_swab_mask & (1 << index);
+ LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
+ return req->rq_req_swab_mask & BIT(index);
}
/**
- * Returns 1 if request reply buffer at offset \a index was already swabbed
+ * Returns true if request reply buffer at offset \a index was already swabbed
*/
-static inline int lustre_rep_swabbed(struct ptlrpc_request *req, size_t index)
+static inline bool lustre_rep_swabbed(struct ptlrpc_request *req, size_t index)
{
- LASSERT(index < sizeof(req->rq_rep_swab_mask) * 8);
- return req->rq_rep_swab_mask & (1 << index);
+ LASSERT(index < sizeof(req->rq_rep_swab_mask) * 8);
+ return req->rq_rep_swab_mask & BIT(index);
}
/**
- * Returns 1 if request needs to be swabbed into local cpu byteorder
+ * Returns true if request needs to be swabbed into local cpu byteorder
*/
-static inline int ptlrpc_req_need_swab(struct ptlrpc_request *req)
+static inline bool ptlrpc_req_need_swab(struct ptlrpc_request *req)
{
- return lustre_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
+ return lustre_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
}
/**
- * Returns 1 if request reply needs to be swabbed into local cpu byteorder
+ * Returns true if request reply needs to be swabbed into local cpu byteorder
*/
-static inline int ptlrpc_rep_need_swab(struct ptlrpc_request *req)
+static inline bool ptlrpc_rep_need_swab(struct ptlrpc_request *req)
{
- return lustre_rep_swabbed(req, MSG_PTLRPC_HEADER_OFF);
+ return lustre_rep_swabbed(req, MSG_PTLRPC_HEADER_OFF);
}
/**
static inline void lustre_set_req_swabbed(struct ptlrpc_request *req,
size_t index)
{
- LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
- LASSERT((req->rq_req_swab_mask & (1 << index)) == 0);
- req->rq_req_swab_mask |= 1 << index;
+ LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
+ LASSERT((req->rq_req_swab_mask & BIT(index)) == 0);
+ req->rq_req_swab_mask |= BIT(index);
}
/**
static inline void lustre_set_rep_swabbed(struct ptlrpc_request *req,
size_t index)
{
- LASSERT(index < sizeof(req->rq_rep_swab_mask) * 8);
- LASSERT((req->rq_rep_swab_mask & (1 << index)) == 0);
- req->rq_rep_swab_mask |= 1 << index;
+ LASSERT(index < sizeof(req->rq_rep_swab_mask) * 8);
+ LASSERT((req->rq_rep_swab_mask & BIT(index)) == 0);
+ req->rq_rep_swab_mask |= BIT(index);
}
/**
FLAG(req->rq_err, "E"), FLAG(req->rq_net_err, "e"), \
FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"), \
FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"), \
- FLAG(req->rq_no_resend, "N"), \
+ FLAG(req->rq_no_resend, "N"), FLAG(req->rq_no_reply, "n"), \
FLAG(req->rq_waiting, "W"), \
FLAG(req->rq_wait_ctx, "C"), FLAG(req->rq_hp, "H"), \
- FLAG(req->rq_committed, "M")
+ FLAG(req->rq_committed, "M"), \
+ FLAG(req->rq_req_unlinked, "Q"), \
+ FLAG(req->rq_reply_unlinked, "U"), \
+ FLAG(req->rq_receiving_reply, "r")
-#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s%s%s"
+#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s"
void _debug_req(struct ptlrpc_request *req,
struct libcfs_debug_msg_data *data, const char *fmt, ...)
} while (0)
/** @} */
-/**
- * Structure that defines a single page of a bulk transfer
- */
-struct ptlrpc_bulk_page {
- /** Linkage to list of pages in a bulk */
- struct list_head bp_link;
- /**
- * Number of bytes in a page to transfer starting from \a bp_pageoffset
- */
- int bp_buflen;
- /** offset within a page */
- int bp_pageoffset;
- /** The page itself */
- struct page *bp_page;
-};
-
enum ptlrpc_bulk_op_type {
PTLRPC_BULK_OP_ACTIVE = 0x00000001,
PTLRPC_BULK_OP_PASSIVE = 0x00000002,
PTLRPC_BULK_OP_PUT = 0x00000004,
PTLRPC_BULK_OP_GET = 0x00000008,
- PTLRPC_BULK_BUF_KVEC = 0x00000010,
- PTLRPC_BULK_BUF_KIOV = 0x00000020,
PTLRPC_BULK_GET_SOURCE = PTLRPC_BULK_OP_PASSIVE | PTLRPC_BULK_OP_GET,
PTLRPC_BULK_PUT_SINK = PTLRPC_BULK_OP_PASSIVE | PTLRPC_BULK_OP_PUT,
PTLRPC_BULK_GET_SINK = PTLRPC_BULK_OP_ACTIVE | PTLRPC_BULK_OP_GET,
return (type & PTLRPC_BULK_PUT_SOURCE) == PTLRPC_BULK_PUT_SOURCE;
}
-static inline bool ptlrpc_is_bulk_desc_kvec(enum ptlrpc_bulk_op_type type)
-{
- return ((type & PTLRPC_BULK_BUF_KVEC) | (type & PTLRPC_BULK_BUF_KIOV))
- == PTLRPC_BULK_BUF_KVEC;
-}
-
-static inline bool ptlrpc_is_bulk_desc_kiov(enum ptlrpc_bulk_op_type type)
-{
- return ((type & PTLRPC_BULK_BUF_KVEC) | (type & PTLRPC_BULK_BUF_KIOV))
- == PTLRPC_BULK_BUF_KIOV;
-}
-
static inline bool ptlrpc_is_bulk_op_active(enum ptlrpc_bulk_op_type type)
{
return ((type & PTLRPC_BULK_OP_ACTIVE) |
extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops;
extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops;
-extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kvec_ops;
/*
* Definition of bulk descriptor.
* Another user is readpage for MDT.
*/
struct ptlrpc_bulk_desc {
+ unsigned int bd_refs; /* number MD's assigned including zero-sends */
/** completed with failure */
unsigned long bd_failure:1;
/** client side */
unsigned long bd_registered:1;
/** For serialization with callback */
spinlock_t bd_lock;
- /** Import generation when request for this bulk was sent */
- int bd_import_generation;
/** {put,get}{source,sink}{kvec,kiov} */
enum ptlrpc_bulk_op_type bd_type;
/** LNet portal for this bulk */
struct obd_import *bd_import;
/** Back pointer to the request */
struct ptlrpc_request *bd_req;
- struct ptlrpc_bulk_frag_ops *bd_frag_ops;
+ const struct ptlrpc_bulk_frag_ops *bd_frag_ops;
wait_queue_head_t bd_waitq; /* server side only WQ */
int bd_iov_count; /* # entries in bd_iov */
int bd_max_iov; /* allocated size of bd_iov */
int bd_nob; /* # bytes covered */
int bd_nob_transferred; /* # bytes GOT/PUT */
+ unsigned int bd_nob_last; /* # bytes in last MD */
__u64 bd_last_mbits;
lnet_nid_t bd_sender; /* stash event::sender */
int bd_md_count; /* # valid entries in bd_mds */
int bd_md_max_brw; /* max entries in bd_mds */
+
+ /** array of offsets for each MD */
+ unsigned int bd_mds_off[PTLRPC_BULK_OPS_COUNT];
/** array of associated MDs */
struct lnet_handle_md bd_mds[PTLRPC_BULK_OPS_COUNT];
- union {
- struct {
- /*
- * encrypt iov, size is either 0 or bd_iov_count.
- */
- lnet_kiov_t *bd_enc_vec;
- lnet_kiov_t *bd_vec;
- } bd_kiov;
-
- struct {
- struct kvec *bd_enc_kvec;
- struct kvec *bd_kvec;
- } bd_kvec;
- } bd_u;
-
+ /* encrypted iov, size is either 0 or bd_iov_count. */
+ struct bio_vec *bd_enc_vec;
+ struct bio_vec *bd_vec;
};
-#define GET_KIOV(desc) ((desc)->bd_u.bd_kiov.bd_vec)
-#define BD_GET_KIOV(desc, i) ((desc)->bd_u.bd_kiov.bd_vec[i])
-#define GET_ENC_KIOV(desc) ((desc)->bd_u.bd_kiov.bd_enc_vec)
-#define BD_GET_ENC_KIOV(desc, i) ((desc)->bd_u.bd_kiov.bd_enc_vec[i])
-#define GET_KVEC(desc) ((desc)->bd_u.bd_kvec.bd_kvec)
-#define BD_GET_KVEC(desc, i) ((desc)->bd_u.bd_kvec.bd_kvec[i])
-#define GET_ENC_KVEC(desc) ((desc)->bd_u.bd_kvec.bd_enc_kvec)
-#define BD_GET_ENC_KVEC(desc, i) ((desc)->bd_u.bd_kvec.bd_enc_kvec[i])
-
enum {
SVC_INIT = 0,
- SVC_STOPPED = 1 << 0,
- SVC_STOPPING = 1 << 1,
- SVC_STARTING = 1 << 2,
- SVC_RUNNING = 1 << 3,
- SVC_EVENT = 1 << 4,
- SVC_SIGNAL = 1 << 5,
+ SVC_STOPPED = BIT(0),
+ SVC_STOPPING = BIT(1),
+ SVC_STARTING = BIT(2),
+ SVC_RUNNING = BIT(3),
};
#define PTLRPC_THR_NAME_LEN 32
*/
struct ptlrpc_thread {
/**
- * List of active threads in svc->srv_threads
+ * List of active threads in svcpt->scp_threads
*/
struct list_head t_link;
/**
*/
unsigned int t_id;
/**
- * service thread pid
+ * service thread
*/
+ struct task_struct *t_task;
pid_t t_pid;
+ ktime_t t_touched;
/**
* put watchdog in the structure per thread b=14840
*/
- struct lc_watchdog *t_watchdog;
+ struct delayed_work t_watchdog;
/**
* the svc this thread belonged to b=18582
*/
return !!(thread->t_flags & SVC_RUNNING);
}
-static inline int thread_is_event(struct ptlrpc_thread *thread)
-{
- return !!(thread->t_flags & SVC_EVENT);
-}
-
-static inline int thread_is_signal(struct ptlrpc_thread *thread)
-{
- return !!(thread->t_flags & SVC_SIGNAL);
-}
-
static inline void thread_clear_flags(struct ptlrpc_thread *thread, __u32 flags)
{
thread->t_flags &= ~flags;
char *srv_name;
/** only statically allocated strings here; we don't clean them */
char *srv_thread_name;
- /** service thread list */
- struct list_head srv_threads;
/** threads # should be created for each partition on initializing */
int srv_nthrs_cpt_init;
/** limit of threads number for each partition */
int srv_nthrs_cpt_limit;
- /** Root of /proc dir tree for this service */
- struct proc_dir_entry *srv_procroot;
+ /** Root of debugfs dir tree for this service */
+ struct dentry *srv_debugfs_entry;
/** Pointer to statistic data for this service */
struct lprocfs_stats *srv_stats;
/** # hp per lp reqs to handle */
int srv_watchdog_factor;
/** under unregister_service */
unsigned srv_is_stopping:1;
+ /** Whether or not to restrict service threads to CPUs in this CPT */
+ unsigned srv_cpt_bind:1;
+ /** max # request buffers */
+ int srv_nrqbds_max;
/** max # request buffers in history per partition */
int srv_hist_nrqbds_cpt_max;
- /** number of CPTs this service bound on */
+ /** number of CPTs this service associated with */
int srv_ncpts;
- /** CPTs array this service bound on */
+ /** CPTs array this service associated with */
__u32 *srv_cpts;
/** 2^srv_cptab_bits >= cfs_cpt_numbert(srv_cptable) */
int srv_cpt_bits;
int scp_thr_nextid;
/** # of starting threads */
int scp_nthrs_starting;
- /** # of stopping threads, reserved for shrinking threads */
- int scp_nthrs_stopping;
/** # running threads */
int scp_nthrs_running;
/** service threads list */
* threads starting & stopping are also protected by this lock.
*/
spinlock_t scp_lock __cfs_cacheline_aligned;
+ /** userland serialization */
+ struct mutex scp_mutex;
/** total # req buffer descs allocated */
int scp_nrqbds_total;
/** # posted request buffers for receiving */
struct list_head scp_rqbd_posted;
/** incoming reqs */
struct list_head scp_req_incoming;
- /** timeout before re-posting reqs, in tick */
- cfs_duration_t scp_rqbd_timeout;
+ /** timeout before re-posting reqs, in jiffies */
+ long scp_rqbd_timeout;
/**
* all threads sleep on this. This wait-queue is signalled when new
* incoming request arrives and when difficult reply has to be handled.
/** early reply timer */
struct timer_list scp_at_timer;
/** debug */
- cfs_time_t scp_at_checktime;
+ ktime_t scp_at_checktime;
/** check early replies */
unsigned scp_at_check;
/** @} */
/* Bits for pc_flags */
enum ptlrpcd_ctl_flags {
- /**
- * Ptlrpc thread start flag.
- */
- LIOD_START = 1 << 0,
- /**
- * Ptlrpc thread stop flag.
- */
- LIOD_STOP = 1 << 1,
- /**
- * Ptlrpc thread force flag (only stop force so far).
- * This will cause aborting any inflight rpcs handled
- * by thread if LIOD_STOP is specified.
- */
- LIOD_FORCE = 1 << 2,
- /**
- * This is a recovery ptlrpc thread.
- */
- LIOD_RECOVERY = 1 << 3,
+ /**
+ * Ptlrpc thread start flag.
+ */
+ LIOD_START = BIT(0),
+ /**
+ * Ptlrpc thread stop flag.
+ */
+ LIOD_STOP = BIT(1),
+ /**
+ * Ptlrpc thread force flag (only stop force so far).
+ * This will cause aborting any inflight rpcs handled
+ * by thread if LIOD_STOP is specified.
+ */
+ LIOD_FORCE = BIT(2),
+ /**
+ * This is a recovery ptlrpc thread.
+ */
+ LIOD_RECOVERY = BIT(3),
};
/**
/** @} nrs */
/* ptlrpc/events.c */
-extern struct lnet_handle_eq ptlrpc_eq_h;
extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
struct lnet_process_id *peer, lnet_nid_t *self);
/**
struct ptlrpc_connection *ptlrpc_connection_get(struct lnet_process_id peer,
lnet_nid_t self,
struct obd_uuid *uuid);
-int ptlrpc_connection_put(struct ptlrpc_connection *c);
+
+static inline void ptlrpc_connection_put(struct ptlrpc_connection *conn)
+{
+ if (!conn)
+ return;
+
+ LASSERT(atomic_read(&conn->c_refcount) > 0);
+
+ /*
+ * We do not remove connection from hashtable and
+ * do not free it even if last caller released ref,
+ * as we want to have it cached for the case it is
+ * needed again.
+ *
+ * Deallocating it and later creating new connection
+ * again would be wastful. This way we also avoid
+ * expensive locking to protect things from get/put
+ * race when found cached connection is freed by
+ * ptlrpc_connection_put().
+ *
+ * It will be freed later in module unload time,
+ * when ptlrpc_connection_fini()->lh_exit->conn_exit()
+ * path is called.
+ */
+ atomic_dec(&conn->c_refcount);
+
+ CDEBUG(D_INFO, "PUT conn=%p refcount %d to %s\n",
+ conn, atomic_read(&conn->c_refcount),
+ libcfs_nid2str(conn->c_peer.nid));
+}
+
struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *);
int ptlrpc_connection_init(void);
void ptlrpc_connection_fini(void);
LASSERT(desc != NULL);
spin_lock(&desc->bd_lock);
- rc = desc->bd_md_count;
+ rc = desc->bd_refs;
spin_unlock(&desc->bd_lock);
return rc;
}
LASSERT(req != NULL);
desc = req->rq_bulk;
+ if (!desc)
+ return 0;
+
if (req->rq_bulk_deadline > ktime_get_real_seconds())
return 1;
- if (!desc)
- return 0;
spin_lock(&desc->bd_lock);
- rc = desc->bd_md_count;
+ rc = desc->bd_refs;
spin_unlock(&desc->bd_lock);
return rc;
}
*/
void ptlrpc_request_committed(struct ptlrpc_request *req, int force);
-void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
+void ptlrpc_init_client(int req_portal, int rep_portal, const char *name,
struct ptlrpc_client *);
void ptlrpc_cleanup_client(struct obd_import *imp);
struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid,
struct ptlrpc_request_set *ptlrpc_prep_set(void);
struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
void *arg);
-int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
- set_interpreter_func fn, void *data);
int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set);
-int ptlrpc_set_wait(struct ptlrpc_request_set *);
-void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
+int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *);
void ptlrpc_set_destroy(struct ptlrpc_request_set *);
void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
+#define PTLRPCD_SET ((struct ptlrpc_request_set *)1)
void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
const struct ptlrpc_bulk_frag_ops
*ops);
-int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
- void *frag, int len);
void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
struct page *page, int pageoffset, int len,
int pin);
-static inline void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc,
- struct page *page, int pageoffset,
- int len)
-{
- __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 1);
-}
-
-static inline void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc,
- struct page *page, int pageoffset,
- int len)
-{
- __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0);
-}
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
-static inline void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc)
-{
- int i;
-
- for (i = 0; i < desc->bd_iov_count ; i++)
- put_page(BD_GET_KIOV(desc, i).kiov_page);
-}
-
static inline void ptlrpc_release_bulk_noop(struct ptlrpc_bulk_desc *desc)
{
}
__u64 ptlrpc_next_xid(void);
__u64 ptlrpc_sample_next_xid(void);
__u64 ptlrpc_req_xid(struct ptlrpc_request *request);
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req);
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req);
/* Set of routines to run a function in ptlrpcd context */
void *ptlrpcd_alloc_work(struct obd_import *imp,
/* user specified threads number, it will be validated due to
* other members of this structure. */
unsigned int tc_nthrs_user;
- /* set NUMA node affinity for service threads */
- unsigned int tc_cpu_affinity;
+ /* bind service threads to only CPUs in their associated CPT */
+ unsigned int tc_cpu_bind;
/* Tags for lu_context associated with service thread */
__u32 tc_ctx_tags;
};
struct cfs_cpt_table *cc_cptable;
/* string pattern to describe CPTs for a service */
char *cc_pattern;
+ /* whether or not to have per-CPT service partitions */
+ bool cc_affinity;
};
struct ptlrpc_service_conf {
struct ptlrpc_service *ptlrpc_register_service(
struct ptlrpc_service_conf *conf,
struct kset *parent,
- struct proc_dir_entry *proc_entry);
+ struct dentry *debugfs_entry);
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
int ptlrpc_start_threads(struct ptlrpc_service *svc);
int ptlrpc_unregister_service(struct ptlrpc_service *service);
-int liblustre_check_services(void *arg);
-void ptlrpc_daemonize(char *name);
int ptlrpc_service_health_check(struct ptlrpc_service *);
void ptlrpc_server_drop_request(struct ptlrpc_request *req);
void ptlrpc_request_change_export(struct ptlrpc_request *req,
struct obd_export *export);
-void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay);
+void ptlrpc_update_export_timer(struct obd_export *exp,
+ time64_t extra_delay);
int ptlrpc_hr_init(void);
void ptlrpc_hr_fini(void);
+void ptlrpc_watchdog_init(struct delayed_work *work, timeout_t timeout);
+void ptlrpc_watchdog_disable(struct delayed_work *work);
+void ptlrpc_watchdog_touch(struct delayed_work *work, timeout_t timeout);
+
/** @} */
/* ptlrpc/import.c */
* @{
*/
int ptlrpc_connect_import(struct obd_import *imp);
+int ptlrpc_connect_import_locked(struct obd_import *imp);
int ptlrpc_init_import(struct obd_import *imp);
int ptlrpc_disconnect_import(struct obd_import *imp, int noclose);
+int ptlrpc_disconnect_and_idle_import(struct obd_import *imp);
int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
void deuuidify(char *uuid, const char *prefix, char **uuid_start,
int *uuid_len);
*
* @{
*/
-int ptlrpc_buf_need_swab(struct ptlrpc_request *req, const int inout,
- __u32 index);
+#define PTLRPC_MAX_BUFCOUNT \
+ (sizeof(((struct ptlrpc_request *)0)->rq_req_swab_mask) * 8)
+#define MD_MAX_BUFLEN (MDS_REG_MAXREQSIZE > OUT_MAXREQSIZE ? \
+ MDS_REG_MAXREQSIZE : OUT_MAXREQSIZE)
+#define PTLRPC_MAX_BUFLEN (OST_IO_MAXREQSIZE > MD_MAX_BUFLEN ? \
+ OST_IO_MAXREQSIZE : MD_MAX_BUFLEN)
+bool ptlrpc_buf_need_swab(struct ptlrpc_request *req, const int inout,
+ __u32 index);
void ptlrpc_buf_set_swabbed(struct ptlrpc_request *req, const int inout,
__u32 index);
int ptlrpc_unpack_rep_msg(struct ptlrpc_request *req, int len);
char **bufs, int flags);
int lustre_shrink_msg(struct lustre_msg *msg, int segment,
unsigned int newlen, int move_data);
+int lustre_grow_msg(struct lustre_msg *msg, int segment, unsigned int newlen);
void lustre_free_reply_state(struct ptlrpc_reply_state *rs);
int __lustre_unpack_msg(struct lustre_msg *m, int len);
__u32 lustre_msg_hdr_size(__u32 magic, __u32 count);
void lustre_msg_add_op_flags(struct lustre_msg *msg, __u32 flags);
struct lustre_handle *lustre_msg_get_handle(struct lustre_msg *msg);
__u32 lustre_msg_get_type(struct lustre_msg *msg);
-__u32 lustre_msg_get_version(struct lustre_msg *msg);
+enum lustre_msg_version lustre_msg_get_version(struct lustre_msg *msg);
void lustre_msg_add_version(struct lustre_msg *msg, __u32 version);
__u32 lustre_msg_get_opc(struct lustre_msg *msg);
__u64 lustre_msg_get_last_xid(struct lustre_msg *msg);
int lustre_msg_get_status(struct lustre_msg *msg);
__u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg);
__u32 lustre_msg_get_magic(struct lustre_msg *msg);
-__u32 lustre_msg_get_timeout(struct lustre_msg *msg);
-__u32 lustre_msg_get_service_time(struct lustre_msg *msg);
+timeout_t lustre_msg_get_timeout(struct lustre_msg *msg);
+timeout_t lustre_msg_get_service_timeout(struct lustre_msg *msg);
char *lustre_msg_get_jobid(struct lustre_msg *msg);
__u32 lustre_msg_get_cksum(struct lustre_msg *msg);
__u64 lustre_msg_get_mbits(struct lustre_msg *msg);
-__u32 lustre_msg_calc_cksum(struct lustre_msg *msg);
+__u32 lustre_msg_calc_cksum(struct lustre_msg *msg, __u32 buf);
void lustre_msg_set_handle(struct lustre_msg *msg,struct lustre_handle *handle);
void lustre_msg_set_type(struct lustre_msg *msg, __u32 type);
void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc);
void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt);
void ptlrpc_req_set_repsize(struct ptlrpc_request *req, int count, __u32 *sizes);
void ptlrpc_request_set_replen(struct ptlrpc_request *req);
-void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout);
-void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time);
+void lustre_msg_set_timeout(struct lustre_msg *msg, timeout_t timeout);
+void lustre_msg_set_service_timeout(struct lustre_msg *msg,
+ timeout_t service_timeout);
void lustre_msg_set_jobid(struct lustre_msg *msg, char *jobid);
void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum);
void lustre_msg_set_mbits(struct lustre_msg *msg, __u64 mbits);
atomic_dec(&req->rq_import->imp_unregistering);
}
- DEBUG_REQ(D_INFO, req, "move req \"%s\" -> \"%s\"",
+ DEBUG_REQ(D_INFO, req, "move request phase from %s to %s",
ptlrpc_rqphase2str(req), ptlrpc_phase2str(new_phase));
req->rq_phase = new_phase;
static inline int ptlrpc_send_limit_expired(struct ptlrpc_request *req)
{
if (req->rq_delay_limit != 0 &&
- cfs_time_before(cfs_time_add(req->rq_queued_time,
- cfs_time_seconds(req->rq_delay_limit)),
- cfs_time_current())) {
+ req->rq_queued_time + req->rq_delay_limit < ktime_get_seconds())
return 1;
- }
return 0;
}
max_t(int, at, obd_timeout);
}
+/**
+ * Calculate the amount of time for lock prolongation.
+ *
+ * This is helper function to get the timeout extra time.
+ *
+ * @req current request
+ *
+ * Return: amount of time to extend the timeout with
+ */
+static inline timeout_t prolong_timeout(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
+ timeout_t req_timeout = 0;
+
+ if (AT_OFF)
+ return obd_timeout / 2;
+
+ if (req->rq_deadline > req->rq_arrival_time.tv_sec)
+ req_timeout = req->rq_deadline - req->rq_arrival_time.tv_sec;
+
+ return max(req_timeout,
+ at_est2timeout(at_get(&svcpt->scp_at_estimate)));
+}
+
static inline struct ptlrpc_service *
ptlrpc_req2svc(struct ptlrpc_request *req)
{
* Target client logic
* @{
*/
-int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg);
-int client_obd_cleanup(struct obd_device *obddev);
+int client_obd_setup(struct obd_device *obd, struct lustre_cfg *lcfg);
+int client_obd_cleanup(struct obd_device *obd);
int client_connect_import(const struct lu_env *env,
struct obd_export **exp, struct obd_device *obd,
struct obd_uuid *cluuid, struct obd_connect_data *,
int client_disconnect_export(struct obd_export *exp);
int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
int priority);
+int client_import_dyn_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
+ lnet_nid_t prim_nid, int priority);
+int client_import_add_nids_to_conn(struct obd_import *imp, lnet_nid_t *nids,
+ int nid_count, struct obd_uuid *uuid);
int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid);
int client_import_find_conn(struct obd_import *imp, lnet_nid_t peer,
struct obd_uuid *uuid);
typedef int (*timeout_cb_t)(struct timeout_item *, void *);
int ptlrpc_pinger_add_import(struct obd_import *imp);
int ptlrpc_pinger_del_import(struct obd_import *imp);
-int ptlrpc_add_timeout_client(int time, enum timeout_event event,
- timeout_cb_t cb, void *data,
- struct list_head *obd_list);
-int ptlrpc_del_timeout_client(struct list_head *obd_list,
- enum timeout_event event);
struct ptlrpc_request * ptlrpc_prep_ping(struct obd_import *imp);
int ptlrpc_obd_ping(struct obd_device *obd);
void ping_evictor_start(void);
/* ptlrpc/llog_server.c */
int llog_origin_handle_open(struct ptlrpc_request *req);
-int llog_origin_handle_destroy(struct ptlrpc_request *req);
int llog_origin_handle_prev_block(struct ptlrpc_request *req);
int llog_origin_handle_next_block(struct ptlrpc_request *req);
int llog_origin_handle_read_header(struct ptlrpc_request *req);
-int llog_origin_handle_close(struct ptlrpc_request *req);
/* ptlrpc/llog_client.c */
extern struct llog_operations llog_client_ops;