* @{
*/
#include <linux/kobject.h>
+#include <linux/rhashtable.h>
#include <linux/uio.h>
#include <libcfs/libcfs.h>
#include <lnet/api.h>
/**
* OST_IO_MAXREQSIZE ~=
- * lustre_msg + ptlrpc_body + obdo + obd_ioobj +
- * DT_MAX_BRW_PAGES * niobuf_remote
+ * lustre_msg + ptlrpc_body + obdo + obd_ioobj +
+ * DT_MAX_BRW_PAGES * niobuf_remote
*
* - single object with 16 pages is 512 bytes
- * - OST_IO_MAXREQSIZE must be at least 1 page of cookies plus some spillover
+ * - OST_IO_MAXREQSIZE must be at least 1 niobuf per page of data
* - Must be a multiple of 1024
+ * - should allow a reasonably large SHORT_IO_BYTES size (64KB)
*/
-#define _OST_MAXREQSIZE_BASE ((unsigned long)(sizeof(struct lustre_msg) + \
- sizeof(struct ptlrpc_body) + \
- sizeof(struct obdo) + \
- sizeof(struct obd_ioobj) + \
- sizeof(struct niobuf_remote)))
-#define _OST_MAXREQSIZE_SUM ((unsigned long)(_OST_MAXREQSIZE_BASE + \
- sizeof(struct niobuf_remote) * \
- (DT_MAX_BRW_PAGES - 1)))
+#define _OST_MAXREQSIZE_BASE ((unsigned long)(sizeof(struct lustre_msg) + \
+ /* lm_buflens */ sizeof(__u32) * 4 + \
+ sizeof(struct ptlrpc_body) + \
+ sizeof(struct obdo) + \
+ sizeof(struct obd_ioobj) + \
+ sizeof(struct niobuf_remote)))
+#define _OST_MAXREQSIZE_SUM ((unsigned long)(_OST_MAXREQSIZE_BASE + \
+ sizeof(struct niobuf_remote) * \
+ DT_MAX_BRW_PAGES))
/**
* FIEMAP request can be 4K+ for now
*/
(1024UL - 1)) + 1)
/* Safe estimate of free space in standard RPC, provides upper limit for # of
* bytes of i/o to pack in RPC (skipping bulk transfer). */
-#define OST_SHORT_IO_SPACE (OST_IO_MAXREQSIZE - _OST_MAXREQSIZE_BASE)
+#define OST_MAX_SHORT_IO_BYTES ((OST_IO_MAXREQSIZE - _OST_MAXREQSIZE_BASE) & \
+ PAGE_MASK)
/* Actual size used for short i/o buffer. Calculation means this:
* At least one page (for large PAGE_SIZE), or 16 KiB, but not more
* than the available space aligned to a page boundary. */
-#define OBD_MAX_SHORT_IO_BYTES min(max(PAGE_SIZE, 16UL * 1024UL), \
- OST_SHORT_IO_SPACE & PAGE_MASK)
+#define OBD_DEF_SHORT_IO_BYTES min(max(PAGE_SIZE, 16UL * 1024UL), \
+ OST_MAX_SHORT_IO_BYTES)
#define OST_MAXREPSIZE (9 * 1024)
#define OST_IO_MAXREPSIZE OST_MAXREPSIZE
*/
struct ptlrpc_connection {
/** linkage for connections hash table */
- struct hlist_node c_hash;
+ struct rhash_head c_hash;
/** Our own lnet nid for this connection */
lnet_nid_t c_self;
/** Remote side nid for this connection */
/** state flags of requests */
/* XXX only ones left are those used by the bulk descs as well! */
-#define PTL_RPC_FL_INTR (1 << 0) /* reply wait was interrupted by user */
-#define PTL_RPC_FL_TIMEOUT (1 << 7) /* request timed out waiting for reply */
+#define PTL_RPC_FL_INTR BIT(0) /* reply wait was interrupted by user */
+#define PTL_RPC_FL_TIMEOUT BIT(7) /* request timed out waiting for reply */
#define REQ_MAX_ACK_LOCKS 8
union ptlrpc_async_args {
- /**
- * Scratchpad for passing args to completion interpreter. Users
- * cast to the struct of their choosing, and CLASSERT that this is
- * big enough. For _tons_ of context, OBD_ALLOC a struct and store
- * a pointer to it here. The pointer_arg ensures this struct is at
- * least big enough for that.
- */
- void *pointer_arg[11];
- __u64 space[7];
+ /**
+ * Scratchpad for passing args to completion interpreter. Users
+ * cast to the struct of their choosing, and BUILD_BUG_ON that this is
+ * big enough. For _tons_ of context, OBD_ALLOC a struct and store
+ * a pointer to it here. The pointer_arg ensures this struct is at
+ * least big enough for that.
+ */
+ void *pointer_arg[11];
+ __u64 space[7];
};
struct ptlrpc_request_set;
struct ptlrpc_cli_req {
/** For bulk requests on client only: bulk descriptor */
struct ptlrpc_bulk_desc *cr_bulk;
- /** optional time limit for send attempts */
- time64_t cr_delay_limit;
+ /** optional time limit for send attempts. This is a timeout
+ * not a timestamp so timeout_t (s32) is used instead of time64_t
+ */
+ timeout_t cr_delay_limit;
/** time request was first queued */
time64_t cr_queued_time;
/** request sent in nanoseconds */
unsigned int
rq_hp:1, /**< high priority RPC */
rq_at_linked:1, /**< link into service's srv_at_array */
- rq_packed_final:1; /**< packed final reply */
+ rq_packed_final:1, /**< packed final reply */
+ rq_obsolete:1; /* aborted by a signal on a client */
/** @} */
/** one of RQ_PHASE_* */
* service time estimate (secs)
* If the request is not served by this time, it is marked as timed out.
* Do not change to time64_t since this is transmitted over the wire.
+ *
+ * The linux kernel handles timestamps with time64_t and timeouts
+ * are normally done with jiffies. Lustre shares the rq_timeout between
+ * nodes. Since jiffies can vary from node to node Lustre instead
+ * will express the timeout value in seconds. To avoid confusion with
+ * timestamps (time64_t) and jiffy timeouts (long) Lustre timeouts
+ * are expressed in s32 (timeout_t). Also what is transmitted over
+ * the wire is 32 bits.
*/
- time_t rq_timeout;
+ timeout_t rq_timeout;
/**
* when request/reply sent (secs), or time when request should be sent
*/
static inline bool lustre_req_swabbed(struct ptlrpc_request *req, size_t index)
{
LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
- return req->rq_req_swab_mask & (1 << index);
+ return req->rq_req_swab_mask & BIT(index);
}
/**
static inline bool lustre_rep_swabbed(struct ptlrpc_request *req, size_t index)
{
LASSERT(index < sizeof(req->rq_rep_swab_mask) * 8);
- return req->rq_rep_swab_mask & (1 << index);
+ return req->rq_rep_swab_mask & BIT(index);
}
/**
static inline void lustre_set_req_swabbed(struct ptlrpc_request *req,
size_t index)
{
- LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
- LASSERT((req->rq_req_swab_mask & (1 << index)) == 0);
- req->rq_req_swab_mask |= 1 << index;
+ LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
+ LASSERT((req->rq_req_swab_mask & BIT(index)) == 0);
+ req->rq_req_swab_mask |= BIT(index);
}
/**
static inline void lustre_set_rep_swabbed(struct ptlrpc_request *req,
size_t index)
{
- LASSERT(index < sizeof(req->rq_rep_swab_mask) * 8);
- LASSERT((req->rq_rep_swab_mask & (1 << index)) == 0);
- req->rq_rep_swab_mask |= 1 << index;
+ LASSERT(index < sizeof(req->rq_rep_swab_mask) * 8);
+ LASSERT((req->rq_rep_swab_mask & BIT(index)) == 0);
+ req->rq_rep_swab_mask |= BIT(index);
}
/**
PTLRPC_BULK_OP_PASSIVE = 0x00000002,
PTLRPC_BULK_OP_PUT = 0x00000004,
PTLRPC_BULK_OP_GET = 0x00000008,
- PTLRPC_BULK_BUF_KVEC = 0x00000010,
- PTLRPC_BULK_BUF_KIOV = 0x00000020,
PTLRPC_BULK_GET_SOURCE = PTLRPC_BULK_OP_PASSIVE | PTLRPC_BULK_OP_GET,
PTLRPC_BULK_PUT_SINK = PTLRPC_BULK_OP_PASSIVE | PTLRPC_BULK_OP_PUT,
PTLRPC_BULK_GET_SINK = PTLRPC_BULK_OP_ACTIVE | PTLRPC_BULK_OP_GET,
return (type & PTLRPC_BULK_PUT_SOURCE) == PTLRPC_BULK_PUT_SOURCE;
}
-static inline bool ptlrpc_is_bulk_desc_kvec(enum ptlrpc_bulk_op_type type)
-{
- return ((type & PTLRPC_BULK_BUF_KVEC) | (type & PTLRPC_BULK_BUF_KIOV))
- == PTLRPC_BULK_BUF_KVEC;
-}
-
-static inline bool ptlrpc_is_bulk_desc_kiov(enum ptlrpc_bulk_op_type type)
-{
- return ((type & PTLRPC_BULK_BUF_KVEC) | (type & PTLRPC_BULK_BUF_KIOV))
- == PTLRPC_BULK_BUF_KIOV;
-}
-
static inline bool ptlrpc_is_bulk_op_active(enum ptlrpc_bulk_op_type type)
{
return ((type & PTLRPC_BULK_OP_ACTIVE) |
extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops;
extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops;
-extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kvec_ops;
/*
* Definition of bulk descriptor.
* Another user is readpage for MDT.
*/
struct ptlrpc_bulk_desc {
+ unsigned int bd_refs; /* number MD's assigned including zero-sends */
/** completed with failure */
unsigned long bd_failure:1;
/** client side */
int bd_max_iov; /* allocated size of bd_iov */
int bd_nob; /* # bytes covered */
int bd_nob_transferred; /* # bytes GOT/PUT */
+ unsigned int bd_nob_last; /* # bytes in last MD */
__u64 bd_last_mbits;
lnet_nid_t bd_sender; /* stash event::sender */
int bd_md_count; /* # valid entries in bd_mds */
int bd_md_max_brw; /* max entries in bd_mds */
+
+ /** array of offsets for each MD */
+ unsigned int bd_mds_off[PTLRPC_BULK_OPS_COUNT];
/** array of associated MDs */
struct lnet_handle_md bd_mds[PTLRPC_BULK_OPS_COUNT];
- union {
- struct {
- /*
- * encrypt iov, size is either 0 or bd_iov_count.
- */
- lnet_kiov_t *bd_enc_vec;
- lnet_kiov_t *bd_vec;
- } bd_kiov;
-
- struct {
- struct kvec *bd_enc_kvec;
- struct kvec *bd_kvec;
- } bd_kvec;
- } bd_u;
-
+ /* encrypted iov, size is either 0 or bd_iov_count. */
+ struct bio_vec *bd_enc_vec;
+ struct bio_vec *bd_vec;
};
-#define GET_KIOV(desc) ((desc)->bd_u.bd_kiov.bd_vec)
-#define BD_GET_KIOV(desc, i) ((desc)->bd_u.bd_kiov.bd_vec[i])
-#define GET_ENC_KIOV(desc) ((desc)->bd_u.bd_kiov.bd_enc_vec)
-#define BD_GET_ENC_KIOV(desc, i) ((desc)->bd_u.bd_kiov.bd_enc_vec[i])
-#define GET_KVEC(desc) ((desc)->bd_u.bd_kvec.bd_kvec)
-#define BD_GET_KVEC(desc, i) ((desc)->bd_u.bd_kvec.bd_kvec[i])
-#define GET_ENC_KVEC(desc) ((desc)->bd_u.bd_kvec.bd_enc_kvec)
-#define BD_GET_ENC_KVEC(desc, i) ((desc)->bd_u.bd_kvec.bd_enc_kvec[i])
-
enum {
SVC_INIT = 0,
- SVC_STOPPED = 1 << 0,
- SVC_STOPPING = 1 << 1,
- SVC_STARTING = 1 << 2,
- SVC_RUNNING = 1 << 3,
- SVC_EVENT = 1 << 4,
+ SVC_STOPPED = BIT(0),
+ SVC_STOPPING = BIT(1),
+ SVC_STARTING = BIT(2),
+ SVC_RUNNING = BIT(3),
};
#define PTLRPC_THR_NAME_LEN 32
*/
struct ptlrpc_thread {
/**
- * List of active threads in svc->srv_threads
+ * List of active threads in svcpt->scp_threads
*/
struct list_head t_link;
/**
return !!(thread->t_flags & SVC_RUNNING);
}
-static inline int thread_is_event(struct ptlrpc_thread *thread)
-{
- return !!(thread->t_flags & SVC_EVENT);
-}
-
static inline void thread_clear_flags(struct ptlrpc_thread *thread, __u32 flags)
{
thread->t_flags &= ~flags;
char *srv_name;
/** only statically allocated strings here; we don't clean them */
char *srv_thread_name;
- /** service thread list */
- struct list_head srv_threads;
/** threads # should be created for each partition on initializing */
int srv_nthrs_cpt_init;
/** limit of threads number for each partition */
int scp_thr_nextid;
/** # of starting threads */
int scp_nthrs_starting;
- /** # of stopping threads, reserved for shrinking threads */
- int scp_nthrs_stopping;
/** # running threads */
int scp_nthrs_running;
/** service threads list */
/* Bits for pc_flags */
enum ptlrpcd_ctl_flags {
- /**
- * Ptlrpc thread start flag.
- */
- LIOD_START = 1 << 0,
- /**
- * Ptlrpc thread stop flag.
- */
- LIOD_STOP = 1 << 1,
- /**
- * Ptlrpc thread force flag (only stop force so far).
- * This will cause aborting any inflight rpcs handled
- * by thread if LIOD_STOP is specified.
- */
- LIOD_FORCE = 1 << 2,
- /**
- * This is a recovery ptlrpc thread.
- */
- LIOD_RECOVERY = 1 << 3,
+ /**
+ * Ptlrpc thread start flag.
+ */
+ LIOD_START = BIT(0),
+ /**
+ * Ptlrpc thread stop flag.
+ */
+ LIOD_STOP = BIT(1),
+ /**
+ * Ptlrpc thread force flag (only stop force so far).
+ * This will cause aborting any inflight rpcs handled
+ * by thread if LIOD_STOP is specified.
+ */
+ LIOD_FORCE = BIT(2),
+ /**
+ * This is a recovery ptlrpc thread.
+ */
+ LIOD_RECOVERY = BIT(3),
};
/**
/** @} nrs */
/* ptlrpc/events.c */
-extern struct lnet_handle_eq ptlrpc_eq_h;
extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
struct lnet_process_id *peer, lnet_nid_t *self);
/**
struct ptlrpc_connection *ptlrpc_connection_get(struct lnet_process_id peer,
lnet_nid_t self,
struct obd_uuid *uuid);
-int ptlrpc_connection_put(struct ptlrpc_connection *c);
+
+static inline void ptlrpc_connection_put(struct ptlrpc_connection *conn)
+{
+ if (!conn)
+ return;
+
+ LASSERT(atomic_read(&conn->c_refcount) > 0);
+
+ /*
+ * We do not remove connection from hashtable and
+ * do not free it even if last caller released ref,
+ * as we want to have it cached for the case it is
+ * needed again.
+ *
+ * Deallocating it and later creating new connection
+ * again would be wastful. This way we also avoid
+ * expensive locking to protect things from get/put
+ * race when found cached connection is freed by
+ * ptlrpc_connection_put().
+ *
+ * It will be freed later in module unload time,
+ * when ptlrpc_connection_fini()->lh_exit->conn_exit()
+ * path is called.
+ */
+ atomic_dec(&conn->c_refcount);
+
+ CDEBUG(D_INFO, "PUT conn=%p refcount %d to %s\n",
+ conn, atomic_read(&conn->c_refcount),
+ libcfs_nid2str(conn->c_peer.nid));
+}
+
struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *);
int ptlrpc_connection_init(void);
void ptlrpc_connection_fini(void);
LASSERT(desc != NULL);
spin_lock(&desc->bd_lock);
- rc = desc->bd_md_count;
+ rc = desc->bd_refs;
spin_unlock(&desc->bd_lock);
return rc;
}
spin_lock(&desc->bd_lock);
- rc = desc->bd_md_count;
+ rc = desc->bd_refs;
spin_unlock(&desc->bd_lock);
return rc;
}
void *arg);
int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set);
int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *);
-void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
void ptlrpc_set_destroy(struct ptlrpc_request_set *);
void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
+#define PTLRPCD_SET ((struct ptlrpc_request_set *)1)
void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
const struct ptlrpc_bulk_frag_ops
*ops);
-int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
- void *frag, int len);
void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
struct page *page, int pageoffset, int len,
int pin);
__u64 ptlrpc_next_xid(void);
__u64 ptlrpc_sample_next_xid(void);
__u64 ptlrpc_req_xid(struct ptlrpc_request *request);
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req);
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req);
/* Set of routines to run a function in ptlrpcd context */
void *ptlrpcd_alloc_work(struct obd_import *imp,
int ptlrpc_hr_init(void);
void ptlrpc_hr_fini(void);
+void ptlrpc_watchdog_init(struct delayed_work *work, timeout_t timeout);
+void ptlrpc_watchdog_disable(struct delayed_work *work);
+void ptlrpc_watchdog_touch(struct delayed_work *work, timeout_t timeout);
+
/** @} */
/* ptlrpc/import.c */
* @{
*/
int ptlrpc_connect_import(struct obd_import *imp);
+int ptlrpc_connect_import_locked(struct obd_import *imp);
int ptlrpc_init_import(struct obd_import *imp);
int ptlrpc_disconnect_import(struct obd_import *imp, int noclose);
int ptlrpc_disconnect_and_idle_import(struct obd_import *imp);
int lustre_msg_get_status(struct lustre_msg *msg);
__u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg);
__u32 lustre_msg_get_magic(struct lustre_msg *msg);
-__u32 lustre_msg_get_timeout(struct lustre_msg *msg);
-__u32 lustre_msg_get_service_time(struct lustre_msg *msg);
+timeout_t lustre_msg_get_timeout(struct lustre_msg *msg);
+timeout_t lustre_msg_get_service_timeout(struct lustre_msg *msg);
char *lustre_msg_get_jobid(struct lustre_msg *msg);
__u32 lustre_msg_get_cksum(struct lustre_msg *msg);
__u64 lustre_msg_get_mbits(struct lustre_msg *msg);
-__u32 lustre_msg_calc_cksum(struct lustre_msg *msg);
+__u32 lustre_msg_calc_cksum(struct lustre_msg *msg, __u32 buf);
void lustre_msg_set_handle(struct lustre_msg *msg,struct lustre_handle *handle);
void lustre_msg_set_type(struct lustre_msg *msg, __u32 type);
void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc);
void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt);
void ptlrpc_req_set_repsize(struct ptlrpc_request *req, int count, __u32 *sizes);
void ptlrpc_request_set_replen(struct ptlrpc_request *req);
-void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout);
-void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time);
+void lustre_msg_set_timeout(struct lustre_msg *msg, timeout_t timeout);
+void lustre_msg_set_service_timeout(struct lustre_msg *msg,
+ timeout_t service_timeout);
void lustre_msg_set_jobid(struct lustre_msg *msg, char *jobid);
void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum);
void lustre_msg_set_mbits(struct lustre_msg *msg, __u64 mbits);
max_t(int, at, obd_timeout);
}
+/**
+ * Calculate the amount of time for lock prolongation.
+ *
+ * This is helper function to get the timeout extra time.
+ *
+ * @req current request
+ *
+ * Return: amount of time to extend the timeout with
+ */
+static inline timeout_t prolong_timeout(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
+ timeout_t req_timeout = 0;
+
+ if (AT_OFF)
+ return obd_timeout / 2;
+
+ if (req->rq_deadline > req->rq_arrival_time.tv_sec)
+ req_timeout = req->rq_deadline - req->rq_arrival_time.tv_sec;
+
+ return max(req_timeout,
+ at_est2timeout(at_get(&svcpt->scp_at_estimate)));
+}
+
static inline struct ptlrpc_service *
ptlrpc_req2svc(struct ptlrpc_request *req)
{
* Target client logic
* @{
*/
-int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg);
-int client_obd_cleanup(struct obd_device *obddev);
+int client_obd_setup(struct obd_device *obd, struct lustre_cfg *lcfg);
+int client_obd_cleanup(struct obd_device *obd);
int client_connect_import(const struct lu_env *env,
struct obd_export **exp, struct obd_device *obd,
struct obd_uuid *cluuid, struct obd_connect_data *,
int client_disconnect_export(struct obd_export *exp);
int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
int priority);
+int client_import_dyn_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
+ lnet_nid_t prim_nid, int priority);
+int client_import_add_nids_to_conn(struct obd_import *imp, lnet_nid_t *nids,
+ int nid_count, struct obd_uuid *uuid);
int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid);
int client_import_find_conn(struct obd_import *imp, lnet_nid_t peer,
struct obd_uuid *uuid);
typedef int (*timeout_cb_t)(struct timeout_item *, void *);
int ptlrpc_pinger_add_import(struct obd_import *imp);
int ptlrpc_pinger_del_import(struct obd_import *imp);
-int ptlrpc_add_timeout_client(time64_t time, enum timeout_event event,
- timeout_cb_t cb, void *data,
- struct list_head *obd_list);
-int ptlrpc_del_timeout_client(struct list_head *obd_list,
- enum timeout_event event);
struct ptlrpc_request * ptlrpc_prep_ping(struct obd_import *imp);
int ptlrpc_obd_ping(struct obd_device *obd);
void ping_evictor_start(void);