struct ptlrpc_request {
int rq_type; /* one of PTL_RPC_MSG_* */
struct list_head rq_list;
+ struct list_head rq_timed_list; /* server-side early replies */
struct list_head rq_history_list; /* server-side history */
__u64 rq_history_seq; /* history sequence # */
int rq_status;
spinlock_t rq_lock;
- /* client-side flags */
+ /* client-side flags are serialized by rq_lock */
unsigned long rq_intr:1, rq_replied:1, rq_err:1,
rq_timedout:1, rq_resend:1, rq_restart:1,
/*
/* this is the last request in the sequence. */
rq_sequence:1,
rq_no_resend:1, rq_waiting:1, rq_receiving_reply:1,
- rq_no_delay:1, rq_net_err:1, rq_wait_ctx:1;
+ rq_no_delay:1, rq_net_err:1, rq_wait_ctx:1,
+ rq_early:1, rq_must_unlink:1,
+ /* server-side flags */
+ rq_packed_final:1, /* packed final reply */
+ rq_sent_final:1; /* stop sending early replies */
+
enum rq_phase rq_phase; /* one of RQ_PHASE_* */
- atomic_t rq_refcount; /* client-side refcount for SENT race */
+ atomic_t rq_refcount; /* client-side refcount for SENT race,
+ server-side refcounf for multiple replies */
struct ptlrpc_thread *rq_svc_thread; /* initial thread servicing req */
int rq_reqlen;
struct lustre_msg *rq_reqmsg;
- int rq_timeout; /* time to wait for reply (seconds) */
int rq_replen;
struct lustre_msg *rq_repmsg;
__u64 rq_transno;
/* (server side), pointed directly into req buffer */
struct ptlrpc_user_desc *rq_user_desc;
+ /* early replies go to offset 0, regular replies go after that */
+ unsigned int rq_reply_off;
+
/* various buffer pointers */
struct lustre_msg *rq_reqbuf; /* req wrapper */
int rq_reqbuf_len; /* req wrapper buf len */
int rq_reqdata_len; /* req wrapper msg len */
- struct lustre_msg *rq_repbuf; /* rep wrapper */
- int rq_repbuf_len; /* rep wrapper buf len */
+ char *rq_repbuf; /* rep buffer */
+ int rq_repbuf_len; /* rep buffer len */
+ struct lustre_msg *rq_repdata; /* rep wrapper msg */
int rq_repdata_len; /* rep wrapper msg len */
struct lustre_msg *rq_clrbuf; /* only in priv mode */
int rq_clrbuf_len; /* only in priv mode */
int rq_import_generation;
enum lustre_imp_state rq_send_state;
+ int rq_early_count; /* how many early replies (for stats) */
+
/* client+server request */
lnet_handle_md_t rq_req_md_h;
struct ptlrpc_cb_id rq_req_cbid;
void (*rq_commit_cb)(struct ptlrpc_request *);
void *rq_cb_data;
- struct ptlrpc_bulk_desc *rq_bulk; /* client side bulk */
- time_t rq_sent; /* when request sent, seconds,
- * or time when request should
- * be sent */
+ struct ptlrpc_bulk_desc *rq_bulk;/* client side bulk */
+
+ /* client outgoing req */
+ time_t rq_sent; /* when request/reply sent (secs), or
+ * time when request should be sent */
+
+ volatile time_t rq_deadline; /* when request must finish. volatile
+ so that servers' early reply updates to the deadline aren't
+ kept in per-cpu cache */
+ int rq_timeout; /* service time estimate (secs) */
+
/* Multi-rpc bits */
struct list_head rq_set_chain;
struct ptlrpc_request_set *rq_set;
int srv_n_difficult_replies; /* # 'difficult' replies */
int srv_n_active_reqs; /* # reqs being served */
cfs_duration_t srv_rqbd_timeout; /* timeout before re-posting reqs, in tick */
- int srv_watchdog_timeout; /* soft watchdog timeout, in ms */
+ int srv_watchdog_factor; /* soft watchdog timeout mutiplier */
unsigned srv_cpu_affinity:1; /* bind threads to CPUs */
+ unsigned srv_at_check:1; /* check early replies */
+ cfs_time_t srv_at_checktime; /* debug */
__u32 srv_req_portal;
__u32 srv_rep_portal;
- int srv_n_queued_reqs; /* # reqs waiting to be served */
+ /* AT stuff */
+ struct adaptive_timeout srv_at_estimate;/* estimated rpc service time */
+ spinlock_t srv_at_lock;
+ struct list_head srv_at_list; /* reqs waiting for replies */
+ cfs_timer_t srv_at_timer; /* early reply timer */
+
+ int srv_n_queued_reqs; /* # reqs in either of the queues below */
+ struct list_head srv_req_in_queue; /* incoming reqs */
struct list_head srv_request_queue; /* reqs waiting for service */
struct list_head srv_request_history; /* request history */
return (rc);
}
-int ptlrpc_send_reply(struct ptlrpc_request *req, int);
+#define PTLRPC_REPLY_MAYBE_DIFFICULT 0x01
+#define PTLRPC_REPLY_EARLY 0x02
+int ptlrpc_send_reply(struct ptlrpc_request *req, int flags);
int ptlrpc_reply(struct ptlrpc_request *req);
int ptlrpc_send_error(struct ptlrpc_request *req, int difficult);
int ptlrpc_error(struct ptlrpc_request *req);
void ptlrpc_resend_req(struct ptlrpc_request *request);
+int ptlrpc_at_get_net_latency(struct ptlrpc_request *req);
int ptl_send_rpc(struct ptlrpc_request *request, int noreply);
int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid);
static inline int
-ptlrpc_client_receiving_reply (struct ptlrpc_request *req)
+ptlrpc_client_recv_or_unlink (struct ptlrpc_request *req)
{
int rc;
spin_lock(&req->rq_lock);
- rc = req->rq_receiving_reply;
- spin_unlock(&req->rq_lock);
- return (rc);
-}
-
-static inline int
-ptlrpc_client_replied (struct ptlrpc_request *req)
-{
- int rc;
-
- spin_lock(&req->rq_lock);
- rc = req->rq_replied;
+ rc = req->rq_receiving_reply || req->rq_must_unlink;
spin_unlock(&req->rq_lock);
return (rc);
}
void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int, int,
void (*populate_pool)(struct ptlrpc_request_pool *, int));
+void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req);
struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
const struct req_format *format);
struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp,
int psc_max_reply_size;
int psc_req_portal;
int psc_rep_portal;
- int psc_watchdog_timeout; /* in ms */
+ int psc_watchdog_factor;
int psc_min_threads;
int psc_max_threads;
__u32 psc_ctx_tags;
struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
int max_reply_size,
int req_portal, int rep_portal,
- int watchdog_timeout, /* in ms */
+ int watchdog_factor,
svc_handler_t, char *name,
cfs_proc_dir_entry_t *proc_entry,
svcreq_printfn_t,
int lustre_pack_reply(struct ptlrpc_request *, int count, int *lens,
char **bufs);
int lustre_pack_reply_v2(struct ptlrpc_request *req, int count,
- int *lens, char **bufs);
+ int *lens, char **bufs, int flags);
+#define LPRFL_EARLY_REPLY 1
+int lustre_pack_reply_flags(struct ptlrpc_request *, int count, int *lens,
+ char **bufs, int flags);
int lustre_shrink_msg(struct lustre_msg *msg, int segment,
unsigned int newlen, int move_data);
void lustre_free_reply_state(struct ptlrpc_reply_state *rs);
int lustre_msg_size(__u32 magic, int count, int *lengths);
int lustre_msg_size_v2(int count, int *lengths);
int lustre_packed_msg_size(struct lustre_msg *msg);
+int lustre_msg_early_size(void);
int lustre_unpack_msg(struct lustre_msg *m, int len);
void *lustre_msg_buf_v2(struct lustre_msg_v2 *m, int n, int min_size);
void *lustre_msg_buf(struct lustre_msg *m, int n, int minlen);
void *swabber);
void *lustre_swab_repbuf(struct ptlrpc_request *req, int n, int minlen,
void *swabber);
+__u32 lustre_msghdr_get_flags(struct lustre_msg *msg);
+void lustre_msghdr_set_flags(struct lustre_msg *msg, __u32 flags);
__u32 lustre_msg_get_flags(struct lustre_msg *msg);
void lustre_msg_add_flags(struct lustre_msg *msg, int flags);
void lustre_msg_set_flags(struct lustre_msg *msg, int flags);
void lustre_msg_set_limit(struct lustre_msg *msg, __u64 limit);
int lustre_msg_get_status(struct lustre_msg *msg);
__u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg);
+int lustre_msg_is_v1(struct lustre_msg *msg);
__u32 lustre_msg_get_magic(struct lustre_msg *msg);
+__u32 lustre_msg_get_timeout(struct lustre_msg *msg);
+__u32 lustre_msg_get_service_time(struct lustre_msg *msg);
+__u32 lustre_msg_get_cksum(struct lustre_msg *msg);
+__u32 lustre_msg_calc_cksum(struct lustre_msg *msg);
void lustre_msg_set_handle(struct lustre_msg *msg,struct lustre_handle *handle);
void lustre_msg_set_type(struct lustre_msg *msg, __u32 type);
void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc);
void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt);
void ptlrpc_req_set_repsize(struct ptlrpc_request *req, int count, int *sizes);
void ptlrpc_request_set_replen(struct ptlrpc_request *req);
+void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout);
+void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time);
+void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum);
static inline void
lustre_shrink_reply(struct ptlrpc_request *req, int segment,
lustre_free_reply_state(rs);
}
+/* Should only be called once per req */
+static inline void ptlrpc_req_drop_rs(struct ptlrpc_request *req)
+{
+ if (req->rq_reply_state == NULL)
+ return; /* shouldn't occur */
+ ptlrpc_rs_decref(req->rq_reply_state);
+ req->rq_reply_state = NULL;
+ req->rq_repmsg = NULL;
+}
+
static inline __u32 lustre_request_magic(struct ptlrpc_request *req)
{
return lustre_msg_get_magic(req->rq_reqmsg);