#include <linux/lustre_import.h>
#include <linux/lprocfs_status.h>
+/* Size over which to OBD_VMALLOC() rather than OBD_ALLOC() service request
+ * buffers */
+#define SVC_BUF_VMALLOC_THRESHOLD (2*PAGE_SIZE)
+
/* The following constants determine how much memory is devoted to
* buffering in the lustre services.
*
* total memory = ?_NBUFS * ?_BUFSIZE
*
* ?_MAXREQSIZE # maximum request service will receive
- * larger messages will get dropped.
+ * messages larger than ?_MAXREQSIZE are dropped.
* request buffers are auto-unlinked when less than ?_MAXREQSIZE
* is left in them.
*/
#define LDLM_NUM_THREADS min(smp_num_cpus * smp_num_cpus * 8, 64)
-#define LDLM_NEVENT_MAX 8192UL
-#define LDLM_NEVENTS min_t(unsigned long, num_physpages / 64, \
- LDLM_NEVENT_MAX)
#define LDLM_NBUF_MAX 256UL
-#define LDLM_NBUFS min(LDLM_NEVENTS / 16, LDLM_NBUF_MAX)
#define LDLM_BUFSIZE (8 * 1024)
#define LDLM_MAXREQSIZE (5 * 1024)
+#define LDLM_MAXMEM (num_physpages*(PAGE_SIZE/1024))
+#define LDLM_NBUFS min(LDLM_MAXMEM/LDLM_BUFSIZE, LDLM_NBUF_MAX)
#define MDT_MAX_THREADS 32UL
#define MDT_NUM_THREADS max(min_t(unsigned long, num_physpages / 8192, \
MDT_MAX_THREADS), 2UL)
-#define MDS_NEVENT_MAX 8192UL
-#define MDS_NEVENTS min_t(unsigned long, num_physpages / 64, \
- MDS_NEVENT_MAX)
#define MDS_NBUF_MAX 512UL
-#define MDS_NBUFS min(MDS_NEVENTS / 16, MDS_NBUF_MAX)
#define MDS_BUFSIZE (8 * 1024)
/* Assume file name length = FNAME_MAX = 256 (true for extN).
* path name length = PATH_MAX = 4096
* except in the open case where there are a large number of OSTs in a LOV.
*/
#define MDS_MAXREQSIZE (5 * 1024)
+#define MDS_MAXMEM (num_physpages*(PAGE_SIZE/512))
+#define MDS_NBUFS min(MDS_MAXMEM/MDS_BUFSIZE, MDS_NBUF_MAX)
#define OST_MAX_THREADS 36UL
#define OST_NUM_THREADS max(min_t(unsigned long, num_physpages / 8192, \
OST_MAX_THREADS), 2UL)
-#define OST_NEVENT_MAX 16384UL
-#define OST_NEVENTS min_t(unsigned long, num_physpages / 16, \
- OST_NEVENT_MAX)
#define OST_NBUF_MAX 5000UL
-#define OST_NBUFS min(OST_NEVENTS / 2, OST_NBUF_MAX)
#define OST_BUFSIZE (8 * 1024)
/* OST_MAXREQSIZE ~= 1640 bytes =
* lustre_msg + obdo + 16 * obd_ioobj + 64 * niobuf_remote
* - OST_MAXREQSIZE must be at least 1 page of cookies plus some spillover
*/
#define OST_MAXREQSIZE (5 * 1024)
+#define OST_MAXMEM (num_physpages*(PAGE_SIZE/512))
+#define OST_NBUFS min(OST_MAXMEM/OST_BUFSIZE, OST_NBUF_MAX)
#define PTLBD_NUM_THREADS 4
-#define PTLBD_NEVENTS 1024
#define PTLBD_NBUFS 20
#define PTLBD_BUFSIZE (32 * 1024)
#define PTLBD_MAXREQSIZE 1024
struct ptlrpc_bulk_desc;
+/*
+ * ptlrpc callback & work item stuff
+ */
+struct ptlrpc_cb_id {
+ void (*cbid_fn)(ptl_event_t *ev); /* specific callback fn */
+ void *cbid_arg; /* additional arg */
+};
+
+#define RS_MAX_LOCKS 4
+#define RS_DEBUG 1
+
+struct ptlrpc_reply_state {
+ struct ptlrpc_cb_id rs_cb_id;
+ struct list_head rs_list;
+ struct list_head rs_exp_list;
+ struct list_head rs_obd_list;
+#if RS_DEBUG
+ struct list_head rs_debug_list;
+#endif
+ /* updates to following flag serialised by srv_request_lock */
+ unsigned int rs_difficult:1; /* ACK/commit stuff */
+ unsigned int rs_scheduled:1; /* being handled? */
+ unsigned int rs_scheduled_ever:1; /* any schedule attempts? */
+ unsigned int rs_handled:1; /* been handled yet? */
+ unsigned int rs_on_net:1; /* reply_out_callback pending? */
+
+ int rs_size;
+ __u64 rs_transno;
+ __u64 rs_xid;
+ struct obd_export *rs_export;
+ struct ptlrpc_srv_ni *rs_srv_ni;
+ ptl_handle_md_t rs_md_h;
+
+ /* locks awaiting client reply ACK */
+ int rs_nlocks;
+ struct lustre_handle rs_locks[RS_MAX_LOCKS];
+ ldlm_mode_t rs_modes[RS_MAX_LOCKS];
+ /* last member: variable sized reply message */
+ struct lustre_msg rs_msg;
+};
+
struct ptlrpc_request {
int rq_type; /* one of PTL_RPC_MSG_* */
struct list_head rq_list;
int rq_status;
spinlock_t rq_lock;
- unsigned int rq_intr:1, rq_replied:1, rq_want_ack:1, rq_err:1,
+ /* client-side flags */
+ unsigned int rq_intr:1, rq_replied:1, rq_err:1,
rq_timedout:1, rq_resend:1, rq_restart:1, rq_replay:1,
- rq_no_resend:1, rq_resent:1, rq_waiting:1, rq_receiving_reply:1;
+ rq_no_resend:1, rq_waiting:1, rq_receiving_reply:1;
int rq_phase;
-
+ /* client-side refcount for SENT race */
atomic_t rq_refcount;
int rq_request_portal; /* XXX FIXME bug 249 */
int rq_reply_portal; /* XXX FIXME bug 249 */
+ /* client-side # reply bytes actually received */
+ int rq_nob_received;
+
int rq_reqlen;
struct lustre_msg *rq_reqmsg;
int rq_import_generation;
enum lustre_imp_state rq_send_state;
- wait_queue_head_t rq_reply_waitq; /* XXX also _for_ack */
- /* incoming reply */
- ptl_md_t rq_reply_md;
- ptl_handle_md_t rq_reply_md_h;
-
- /* outgoing req/rep */
- ptl_md_t rq_req_md;
+ /* client+server request */
+ ptl_handle_md_t rq_req_md_h;
+ struct ptlrpc_cb_id rq_req_cbid;
+ /* server-side... */
+ struct timeval rq_arrival_time; /* request arrival time */
+ struct ptlrpc_reply_state *rq_reply_state; /* separated reply state */
+ struct ptlrpc_request_buffer_desc *rq_rqbd; /* incoming request buffer */
+
+ /* client-only incoming reply */
+ ptl_handle_md_t rq_reply_md_h;
+ wait_queue_head_t rq_reply_waitq;
+ struct ptlrpc_cb_id rq_reply_cbid;
+
struct ptlrpc_peer rq_peer; /* XXX see service.c can this be factored away? */
struct obd_export *rq_export;
struct obd_import *rq_import;
- struct ptlrpc_service *rq_svc;
-
+
void (*rq_replay_cb)(struct ptlrpc_request *);
void (*rq_commit_cb)(struct ptlrpc_request *);
void *rq_cb_data;
struct ptlrpc_request_set *rq_set;
void *rq_interpret_reply; /* Async completion handler */
union ptlrpc_async_args rq_async_args; /* Async completion context */
-
- /* Only used on the server side for tracking acks. */
- struct ptlrpc_req_ack_lock {
- struct lustre_handle lock;
- __u32 mode;
- } rq_ack_locks[REQ_MAX_ACK_LOCKS];
};
#define RQ_PHASE_NEW 0xebc0de00
-#define RQ_PHASE_RPC 0xebc0de01
+#define RQ_PHASE_RPC 0xebc0de01
#define RQ_PHASE_BULK 0xebc0de02
#define RQ_PHASE_INTERPRET 0xebc0de03
#define RQ_PHASE_COMPLETE 0xebc0de04
#define PTLRPC_REQUEST_COMPLETE(req) ((req)->rq_phase > RQ_PHASE_RPC)
-#define DEBUG_REQ_FLAGS(req) \
- ((req->rq_phase == RQ_PHASE_NEW) ? "New" : \
- (req->rq_phase == RQ_PHASE_RPC) ? "RPC" : \
- (req->rq_phase == RQ_PHASE_INTERPRET) ? "Interpret" : \
- (req->rq_phase == RQ_PHASE_COMPLETE) ? "Complete" : \
- (req->rq_phase == RQ_PHASE_BULK) ? "Bulk" : "?phase?"), \
- FLAG(req->rq_intr, "I"), FLAG(req->rq_replied, "R"), \
- FLAG(req->rq_want_ack, "A"), FLAG(req->rq_err, "E"), \
- FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"), \
- FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"), \
- FLAG(req->rq_no_resend, "N"), FLAG(req->rq_resent, "s"), \
+#define DEBUG_REQ_FLAGS(req) \
+ ((req->rq_phase == RQ_PHASE_NEW) ? "New" : \
+ (req->rq_phase == RQ_PHASE_RPC) ? "Rpc" : \
+ (req->rq_phase == RQ_PHASE_INTERPRET) ? "Interpret" : \
+ (req->rq_phase == RQ_PHASE_COMPLETE) ? "Complete" : "?phase?"), \
+ FLAG(req->rq_intr, "I"), FLAG(req->rq_replied, "R"), \
+ FLAG(req->rq_err, "E"), \
+ FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"), \
+ FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"), \
+ FLAG(req->rq_no_resend, "N"), \
FLAG(req->rq_waiting, "W")
-#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s"
+#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s"
#define DEBUG_REQ(level, req, fmt, args...) \
do { \
} while (0)
struct ptlrpc_bulk_page {
- struct ptlrpc_bulk_desc *bp_desc;
struct list_head bp_link;
int bp_buflen;
int bp_pageoffset; /* offset within a page */
struct page *bp_page;
};
-#define BULK_GET_SOURCE 0
+#define BULK_GET_SOURCE 0
#define BULK_PUT_SINK 1
#define BULK_GET_SINK 2
#define BULK_PUT_SOURCE 3
struct ptlrpc_bulk_desc {
- unsigned int bd_complete:1;
+ unsigned int bd_success:1; /* completed successfully */
unsigned int bd_network_rw:1; /* accessible to the network */
unsigned int bd_type:2; /* {put,get}{source,sink} */
unsigned int bd_registered:1; /* client side */
struct obd_import *bd_import;
__u32 bd_portal;
struct ptlrpc_request *bd_req; /* associated request */
- wait_queue_head_t bd_waitq; /* server side only WQ */
- struct list_head bd_page_list;
- __u32 bd_page_count;
- __u32 bd_last_xid;
-
- ptl_md_t bd_md;
- ptl_handle_md_t bd_md_h;
- ptl_handle_me_t bd_me_h;
+ wait_queue_head_t bd_waitq; /* server side only WQ */
+ int bd_page_count; /* # pages (== entries in bd_iov) */
+ int bd_max_pages; /* allocated size of bd_iov */
+ int bd_nob; /* # bytes covered */
+ int bd_nob_transferred; /* # bytes GOT/PUT */
- int bd_callback_count; /* server side callbacks */
+ __u64 bd_last_xid;
+ struct ptlrpc_cb_id bd_cbid; /* network callback info */
+ ptl_handle_md_t bd_md_h; /* associated MD */
+
#ifdef __KERNEL__
ptl_kiov_t bd_iov[PTL_MD_MAX_IOV];
#else
struct ptlrpc_request_buffer_desc {
struct list_head rqbd_list;
struct ptlrpc_srv_ni *rqbd_srv_ni;
- ptl_handle_me_t rqbd_me_h;
- atomic_t rqbd_refcount;
+ ptl_handle_md_t rqbd_md_h;
+ int rqbd_refcount;
+ int rqbd_eventcount;
char *rqbd_buffer;
+ struct ptlrpc_cb_id rqbd_cbid;
+ struct ptlrpc_request rqbd_req;
};
/* event queues are per-ni, because one day we may get a hardware
char *pni_name;
int pni_number;
ptl_handle_ni_t pni_ni_h;
- ptl_handle_eq_t pni_request_out_eq_h;
- ptl_handle_eq_t pni_reply_in_eq_h;
- ptl_handle_eq_t pni_reply_out_eq_h;
- ptl_handle_eq_t pni_bulk_put_source_eq_h;
- ptl_handle_eq_t pni_bulk_put_sink_eq_h;
- ptl_handle_eq_t pni_bulk_get_source_eq_h;
- ptl_handle_eq_t pni_bulk_get_sink_eq_h;
+ ptl_handle_eq_t pni_eq_h;
};
struct ptlrpc_srv_ni {
/* Interface-specific service state */
struct ptlrpc_service *sni_service; /* owning service */
struct ptlrpc_ni *sni_ni; /* network interface */
- ptl_handle_eq_t sni_eq_h; /* event queue handle */
- struct list_head sni_rqbds; /* all the request buffer descriptors */
- __u32 sni_nrqbds; /* # request buffers */
- atomic_t sni_nrqbds_receiving; /* # request buffers posted */
+ struct list_head sni_rqbds; /* all the request buffers */
+ struct list_head sni_active_replies; /* all the active replies */
+ int sni_nrqbd_receiving; /* # posted request buffers */
};
-struct ptlrpc_service {
- time_t srv_time;
- time_t srv_timeout;
-
- struct list_head srv_ni_list; /* list of interfaces */
- __u32 srv_max_req_size; /* biggest request to receive */
- __u32 srv_buf_size; /* # bytes in a request buffer */
+typedef int (*svc_handler_t)(struct ptlrpc_request *req);
+struct ptlrpc_service {
+ struct list_head srv_list; /* chain thru all services */
+ int srv_max_req_size; /* biggest request to receive */
+ int srv_buf_size; /* size of individual buffers */
+ int srv_nbufs; /* total # req buffer descs allocated */
+ int srv_nthreads; /* # running threads */
+ int srv_n_difficult_replies; /* # 'difficult' replies */
+ int srv_n_active_reqs; /* # reqs being served */
+
__u32 srv_req_portal;
__u32 srv_rep_portal;
- __u32 srv_xid;
+ int srv_n_queued_reqs; /* # reqs waiting to be served */
+ struct list_head srv_request_queue; /* reqs waiting for service */
+
+ atomic_t srv_outstanding_replies;
+ struct list_head srv_reply_queue; /* replies waiting for service */
wait_queue_head_t srv_waitq; /* all threads sleep on this */
- spinlock_t srv_lock;
- struct list_head srv_threads;
- int (*srv_handler)(struct ptlrpc_request *req);
+ struct list_head srv_threads;
+ struct obd_device *srv_obddev;
+ svc_handler_t srv_handler;
+
char *srv_name; /* only statically allocated strings here; we don't clean them */
- struct proc_dir_entry *srv_procroot;
- struct lprocfs_stats *srv_stats;
- int srv_interface_rover;
+ spinlock_t srv_lock;
+
+ struct proc_dir_entry *srv_procroot;
+ struct lprocfs_stats *srv_stats;
+
struct ptlrpc_srv_ni srv_interfaces[0];
};
-typedef int (*svc_handler_t)(struct ptlrpc_request *req);
-
/* ptlrpc/events.c */
extern struct ptlrpc_ni ptlrpc_interfaces[];
extern int ptlrpc_ninterfaces;
extern int ptlrpc_uuid_to_peer(struct obd_uuid *uuid, struct ptlrpc_peer *peer);
+extern void request_out_callback (ptl_event_t *ev);
+extern void reply_in_callback(ptl_event_t *ev);
+extern void client_bulk_callback (ptl_event_t *ev);
+extern void request_in_callback(ptl_event_t *ev);
+extern void reply_out_callback(ptl_event_t *ev);
+extern void server_bulk_callback (ptl_event_t *ev);
/* ptlrpc/connection.c */
void ptlrpc_dump_connections(void);
void ptlrpc_cleanup_connection(void);
/* ptlrpc/niobuf.c */
-int ptlrpc_bulk_put(struct ptlrpc_bulk_desc *);
-int ptlrpc_bulk_get(struct ptlrpc_bulk_desc *);
-void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk);
+int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc);
+void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc);
int ptlrpc_register_bulk(struct ptlrpc_request *req);
void ptlrpc_unregister_bulk (struct ptlrpc_request *req);
-static inline int ptlrpc_bulk_complete (struct ptlrpc_bulk_desc *desc)
+static inline int ptlrpc_bulk_active (struct ptlrpc_bulk_desc *desc)
{
unsigned long flags;
int rc;
spin_lock_irqsave (&desc->bd_lock, flags);
- rc = desc->bd_complete;
+ rc = desc->bd_network_rw;
spin_unlock_irqrestore (&desc->bd_lock, flags);
return (rc);
}
+int ptlrpc_send_reply(struct ptlrpc_request *req, int);
int ptlrpc_reply(struct ptlrpc_request *req);
int ptlrpc_error(struct ptlrpc_request *req);
void ptlrpc_resend_req(struct ptlrpc_request *request);
int ptl_send_rpc(struct ptlrpc_request *request);
-void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd);
+void ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
/* ptlrpc/client.c */
void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
void ptlrpc_cleanup_client(struct obd_import *imp);
struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid);
+static inline int
+ptlrpc_client_receiving_reply (struct ptlrpc_request *req)
+{
+ unsigned long flags;
+ int rc;
+
+ spin_lock_irqsave(&req->rq_lock, flags);
+ rc = req->rq_receiving_reply;
+ spin_unlock_irqrestore(&req->rq_lock, flags);
+ return (rc);
+}
+
+static inline int
+ptlrpc_client_replied (struct ptlrpc_request *req)
+{
+ unsigned long flags;
+ int rc;
+
+ spin_lock_irqsave(&req->rq_lock, flags);
+ rc = req->rq_replied;
+ spin_unlock_irqrestore(&req->rq_lock, flags);
+ return (rc);
+}
+
+static inline void
+ptlrpc_wake_client_req (struct ptlrpc_request *req)
+{
+ if (req->rq_set == NULL)
+ wake_up(&req->rq_reply_waitq);
+ else
+ wake_up(&req->rq_set->set_waitq);
+}
+
int ptlrpc_queue_wait(struct ptlrpc_request *req);
int ptlrpc_replay_req(struct ptlrpc_request *req);
void ptlrpc_unregister_reply(struct ptlrpc_request *req);
void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request);
struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req);
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
- int type, int portal);
+ int npages, int type, int portal);
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
- int type, int portal);
+ int npages, int type, int portal);
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
-int ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
- struct page *page, int pageoffset, int len);
-void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *page);
+void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
+ struct page *page, int pageoffset, int len);
void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
struct obd_import *imp);
__u64 ptlrpc_next_xid(void);
/* ptlrpc/service.c */
-struct ptlrpc_service *
-ptlrpc_init_svc(__u32 nevents, __u32 nbufs, __u32 bufsize, __u32 max_req_size,
- int req_portal, int rep_portal, svc_handler_t, char *name,
- struct proc_dir_entry *proc_entry);
+void ptlrpc_save_lock (struct ptlrpc_request *req,
+ struct lustre_handle *lock, int mode);
+void ptlrpc_commit_replies (struct obd_device *obd);
+void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs);
+struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
+ int req_portal, int rep_portal,
+ svc_handler_t, char *name,
+ struct proc_dir_entry *proc_entry);
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
int ptlrpc_start_n_threads(struct obd_device *dev, struct ptlrpc_service *svc,
int cnt, char *base_name);
int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
char *name);
int ptlrpc_unregister_service(struct ptlrpc_service *service);
+int liblustre_check_services (void *arg);
struct ptlrpc_svc_data {
char *name;
char **bufs);
int lustre_pack_reply(struct ptlrpc_request *, int count, int *lens,
char **bufs);
+void lustre_free_reply_state(struct ptlrpc_reply_state *rs);
int lustre_msg_size(int count, int *lengths);
int lustre_unpack_msg(struct lustre_msg *m, int len);
void *lustre_msg_buf(struct lustre_msg *m, int n, int minlen);
#endif
/* ptlrpc/llog_server.c */
-struct llog_obd_ctxt;
int llog_origin_handle_create(struct ptlrpc_request *req);
int llog_origin_handle_next_block(struct ptlrpc_request *req);
int llog_origin_handle_read_header(struct ptlrpc_request *req);