-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
/*
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
# endif
#endif /* __KERNEL__ */
-/* Size over which to OBD_VMALLOC() rather than OBD_ALLOC() service request
- * buffers */
-#define SVC_BUF_VMALLOC_THRESHOLD (2 * CFS_PAGE_SIZE)
+#define PTLRPC_NTHRS_MIN 2
/**
* The following constants determine how memory is used to buffer incoming
#define LDLM_MAXREQSIZE (5 * 1024)
#define LDLM_MAXREPSIZE (1024)
-#define MDT_MIN_THREADS 2UL
-#define MDT_MAX_THREADS 512UL
-#define MDT_NUM_THREADS max(min_t(unsigned long, MDT_MAX_THREADS, \
- cfs_num_physpages >> (25 - CFS_PAGE_SHIFT)), \
- 2UL)
-
/** Absolute limits */
-#define MDS_THREADS_MIN 2
-#define MDS_THREADS_MAX 512
-#define MDS_THREADS_MIN_READPAGE 2
+#ifndef MDT_MAX_THREADS
+#define MDT_MIN_THREADS PTLRPC_NTHRS_MIN
+#define MDT_MAX_THREADS 512UL
+#endif
#define MDS_NBUFS (64 * cfs_num_online_cpus())
-#define MDS_BUFSIZE (8 * 1024)
/**
* Assume file name length = FNAME_MAX = 256 (true for ext3).
* path name length = PATH_MAX = 4096
- * LOV MD size max = EA_MAX = 4000
+ * LOV MD size max = EA_MAX = 48000 (2000 stripes)
* symlink: FNAME_MAX + PATH_MAX <- largest
* link: FNAME_MAX + PATH_MAX (mds_rec_link < mds_rec_create)
* rename: FNAME_MAX + FNAME_MAX
* open: FNAME_MAX + EA_MAX
*
* MDS_MAXREQSIZE ~= 4736 bytes =
- * lustre_msg + ldlm_request + mds_body + mds_rec_create + FNAME_MAX + PATH_MAX
+ * lustre_msg + ldlm_request + mdt_body + mds_rec_create + FNAME_MAX + PATH_MAX
* MDS_MAXREPSIZE ~= 8300 bytes = lustre_msg + llog_header
* or, for mds_close() and mds_reint_unlink() on a many-OST filesystem:
- * = 9210 bytes = lustre_msg + mds_body + 160 * (easize + cookiesize)
+ * = 9210 bytes = lustre_msg + mdt_body + 160 * (easize + cookiesize)
*
* Realistic size is about 512 bytes (20 character name + 128 char symlink),
* except in the open case where there are a large number of OSTs in a LOV.
*/
-#define MDS_MAXREQSIZE (5 * 1024)
-#define MDS_MAXREPSIZE max(9 * 1024, 362 + LOV_MAX_STRIPE_COUNT * 56)
+#define MDS_MAXREPSIZE max(10 * 1024, 362 + LOV_MAX_STRIPE_COUNT * 56)
+#define MDS_MAXREQSIZE MDS_MAXREPSIZE
+
+/** MDS_BUFSIZE = max_reqsize + max sptlrpc payload size */
+#define MDS_BUFSIZE (MDS_MAXREQSIZE + 1024)
-/** FLD_MAXREQSIZE == lustre_msg + __u32 padding + ptlrpc_body + opc + md_fld */
+/** FLD_MAXREQSIZE == lustre_msg + __u32 padding + ptlrpc_body + opc */
#define FLD_MAXREQSIZE (160)
-/** FLD_MAXREPSIZE == lustre_msg + ptlrpc_body + md_fld */
+/** FLD_MAXREPSIZE == lustre_msg + ptlrpc_body */
#define FLD_MAXREPSIZE (152)
/**
union ptlrpc_async_args {
/**
* Scratchpad for passing args to completion interpreter. Users
- * cast to the struct of their choosing, and LASSERT that this is
+ * cast to the struct of their choosing, and CLASSERT that this is
* big enough. For _tons_ of context, OBD_ALLOC a struct and store
* a pointer to it here. The pointer_arg ensures this struct is at
* least big enough for that.
*/
void *pointer_arg[11];
- __u64 space[6];
+ __u64 space[7];
};
struct ptlrpc_request_set;
typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
+typedef int (*set_producer_func)(struct ptlrpc_request_set *, void *);
/**
* Definition of request set structure.
* returned.
*/
struct ptlrpc_request_set {
- /** number of uncompleted requests */
- cfs_atomic_t set_remaining;
- /** wait queue to wait on for request events */
- cfs_waitq_t set_waitq;
- cfs_waitq_t *set_wakeup_ptr;
- /** List of requests in the set */
- cfs_list_t set_requests;
- /**
- * List of completion callbacks to be called when the set is completed
- * This is only used if \a set_interpret is NULL.
- * Links struct ptlrpc_set_cbdata.
- */
- cfs_list_t set_cblist;
- /** Completion callback, if only one. */
- set_interpreter_func set_interpret;
- /** opaq argument passed to completion \a set_interpret callback. */
- void *set_arg;
- /**
- * Lock for \a set_new_requests manipulations
- * locked so that any old caller can communicate requests to
- * the set holder who can then fold them into the lock-free set
- */
- cfs_spinlock_t set_new_req_lock;
- /** List of new yet unsent requests. Only used with ptlrpcd now. */
- cfs_list_t set_new_requests;
+ cfs_atomic_t set_refcount;
+ /** number of in queue requests */
+ cfs_atomic_t set_new_count;
+ /** number of uncompleted requests */
+ cfs_atomic_t set_remaining;
+ /** wait queue to wait on for request events */
+ cfs_waitq_t set_waitq;
+ cfs_waitq_t *set_wakeup_ptr;
+ /** List of requests in the set */
+ cfs_list_t set_requests;
+ /**
+ * List of completion callbacks to be called when the set is completed
+ * This is only used if \a set_interpret is NULL.
+ * Links struct ptlrpc_set_cbdata.
+ */
+ cfs_list_t set_cblist;
+ /** Completion callback, if only one. */
+ set_interpreter_func set_interpret;
+ /** opaq argument passed to completion \a set_interpret callback. */
+ void *set_arg;
+ /**
+ * Lock for \a set_new_requests manipulations
+ * locked so that any old caller can communicate requests to
+ * the set holder who can then fold them into the lock-free set
+ */
+ cfs_spinlock_t set_new_req_lock;
+ /** List of new yet unsent requests. Only used with ptlrpcd now. */
+ cfs_list_t set_new_requests;
+
+ /** rq_status of requests that have been freed already */
+ int set_rc;
+ /** Additional fields used by the flow control extension */
+ /** Maximum number of RPCs in flight */
+ int set_max_inflight;
+ /** Callback function used to generate RPCs */
+ set_producer_func set_producer;
+ /** opaq argument passed to the producer callback */
+ void *set_producer_arg;
};
/**
};
struct ptlrpc_bulk_desc;
+struct ptlrpc_service_part;
/**
* ptlrpc callback & work item stuff
void *cbid_arg; /* additional arg */
};
-/** Maximum number of locks to fit into reply state */
+/** Maximum number of locks to fit into reply state */
#define RS_MAX_LOCKS 8
-#define RS_DEBUG 1
+#define RS_DEBUG 0
/**
* Structure to define reply state on the server
/** xid */
__u64 rs_xid;
struct obd_export *rs_export;
- struct ptlrpc_service *rs_service;
+ struct ptlrpc_service_part *rs_svcpt;
/** Lnet metadata handle for the reply */
lnet_handle_md_t rs_md_h;
cfs_atomic_t rs_refcount;
* Check if the request is a high priority one.
*/
int (*hpreq_check)(struct ptlrpc_request *);
+ /**
+ * Called after the request has been handled.
+ */
+ void (*hpreq_fini)(struct ptlrpc_request *);
};
/**
rq_reply_truncate:1,
rq_committed:1,
/* whether the "rq_set" is a valid one */
- rq_invalid_rqset:1;
+ rq_invalid_rqset:1,
+ rq_generation_set:1;
enum rq_phase rq_phase; /* one of RQ_PHASE_* */
enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
/**
* security and encryption data
* @{ */
- struct ptlrpc_cli_ctx *rq_cli_ctx; /* client's half ctx */
- struct ptlrpc_svc_ctx *rq_svc_ctx; /* server's half ctx */
- cfs_list_t rq_ctx_chain; /* link to waited ctx */
+ struct ptlrpc_cli_ctx *rq_cli_ctx; /**< client's half ctx */
+ struct ptlrpc_svc_ctx *rq_svc_ctx; /**< server's half ctx */
+ cfs_list_t rq_ctx_chain; /**< link to waited ctx */
- struct sptlrpc_flavor rq_flvr; /* client & server */
+ struct sptlrpc_flavor rq_flvr; /**< for client & server */
enum lustre_sec_part rq_sp_from;
unsigned long /* client/server security flags */
rq_auth_remote:1, /* authed as remote user */
rq_auth_usr_root:1, /* authed as root */
rq_auth_usr_mdt:1, /* authed as mdt */
+ rq_auth_usr_ost:1, /* authed as ost */
/* security tfm flags */
rq_pack_udesc:1,
rq_pack_bulk:1,
/* (server side), pointed directly into req buffer */
struct ptlrpc_user_desc *rq_user_desc;
- /** @} */
-
/** early replies go to offset 0, regular replies go after that */
unsigned int rq_reply_off;
int rq_clrbuf_len; /* only in priv mode */
int rq_clrdata_len; /* only in priv mode */
+ /** @} */
+
/** Fields that help to see if request and reply were swabbed or not */
__u32 rq_req_swab_mask;
__u32 rq_rep_swab_mask;
#define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s%s"
-void _debug_req(struct ptlrpc_request *req, __u32 mask,
+void _debug_req(struct ptlrpc_request *req,
struct libcfs_debug_msg_data *data, const char *fmt, ...)
- __attribute__ ((format (printf, 4, 5)));
+ __attribute__ ((format (printf, 3, 4)));
/**
* Helper that decides if we need to print request accordig to current debug
* level settings
*/
-#define debug_req(cdls, level, req, file, func, line, fmt, a...) \
+#define debug_req(msgdata, mask, cdls, req, fmt, a...) \
do { \
- CFS_CHECK_STACK(); \
+ CFS_CHECK_STACK(msgdata, mask, cdls); \
\
- if (((level) & D_CANTMASK) != 0 || \
- ((libcfs_debug & (level)) != 0 && \
- (libcfs_subsystem_debug & DEBUG_SUBSYSTEM) != 0)) { \
- static struct libcfs_debug_msg_data _req_dbg_data = \
- DEBUG_MSG_DATA_INIT(cdls, DEBUG_SUBSYSTEM, file, func, line); \
- _debug_req((req), (level), &_req_dbg_data, fmt, ##a); \
- } \
+ if (((mask) & D_CANTMASK) != 0 || \
+ ((libcfs_debug & (mask)) != 0 && \
+ (libcfs_subsystem_debug & DEBUG_SUBSYSTEM) != 0)) \
+ _debug_req((req), msgdata, fmt, ##a); \
} while(0)
/**
do { \
if ((level) & (D_ERROR | D_WARNING)) { \
static cfs_debug_limit_state_t cdls; \
- debug_req(&cdls, level, req, __FILE__, __func__, __LINE__, \
- "@@@ "fmt" ", ## args); \
- } else \
- debug_req(NULL, level, req, __FILE__, __func__, __LINE__, \
- "@@@ "fmt" ", ## args); \
+ LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, level, &cdls); \
+ debug_req(&msgdata, level, &cdls, req, "@@@ "fmt" ", ## args);\
+ } else { \
+ LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, level, NULL); \
+ debug_req(&msgdata, level, NULL, req, "@@@ "fmt" ", ## args); \
+ } \
} while (0)
/** @} */
#endif
};
+enum {
+ SVC_STOPPED = 1 << 0,
+ SVC_STOPPING = 1 << 1,
+ SVC_STARTING = 1 << 2,
+ SVC_RUNNING = 1 << 3,
+ SVC_EVENT = 1 << 4,
+ SVC_SIGNAL = 1 << 5,
+};
+
+#define PTLRPC_THR_NAME_LEN 32
/**
* Definition of server service thread structure
*/
/**
* the svc this thread belonged to b=18582
*/
- struct ptlrpc_service *t_svc;
- cfs_waitq_t t_ctl_waitq;
- struct lu_env *t_env;
+ struct ptlrpc_service_part *t_svcpt;
+ cfs_waitq_t t_ctl_waitq;
+ struct lu_env *t_env;
+ char t_name[PTLRPC_THR_NAME_LEN];
};
+static inline int thread_is_init(struct ptlrpc_thread *thread)
+{
+ return thread->t_flags == 0;
+}
+
+static inline int thread_is_stopped(struct ptlrpc_thread *thread)
+{
+ return !!(thread->t_flags & SVC_STOPPED);
+}
+
+static inline int thread_is_stopping(struct ptlrpc_thread *thread)
+{
+ return !!(thread->t_flags & SVC_STOPPING);
+}
+
+static inline int thread_is_starting(struct ptlrpc_thread *thread)
+{
+ return !!(thread->t_flags & SVC_STARTING);
+}
+
+static inline int thread_is_running(struct ptlrpc_thread *thread)
+{
+ return !!(thread->t_flags & SVC_RUNNING);
+}
+
+static inline int thread_is_event(struct ptlrpc_thread *thread)
+{
+ return !!(thread->t_flags & SVC_EVENT);
+}
+
+static inline int thread_is_signal(struct ptlrpc_thread *thread)
+{
+ return !!(thread->t_flags & SVC_SIGNAL);
+}
+
+static inline void thread_clear_flags(struct ptlrpc_thread *thread, __u32 flags)
+{
+ thread->t_flags &= ~flags;
+}
+
+static inline void thread_set_flags(struct ptlrpc_thread *thread, __u32 flags)
+{
+ thread->t_flags = flags;
+}
+
+static inline void thread_add_flags(struct ptlrpc_thread *thread, __u32 flags)
+{
+ thread->t_flags |= flags;
+}
+
+static inline int thread_test_and_clear_flags(struct ptlrpc_thread *thread,
+ __u32 flags)
+{
+ if (thread->t_flags & flags) {
+ thread->t_flags &= ~flags;
+ return 1;
+ }
+ return 0;
+}
+
/**
* Request buffer descriptor structure.
* This is a structure that contains one posted request buffer for service.
/** History of requests for this buffer */
cfs_list_t rqbd_reqs;
/** Back pointer to service for which this buffer is registered */
- struct ptlrpc_service *rqbd_service;
+ struct ptlrpc_service_part *rqbd_svcpt;
/** LNet descriptor */
lnet_handle_md_t rqbd_md_h;
int rqbd_refcount;
struct ptlrpc_request rqbd_req;
};
-typedef int (*svc_handler_t)(struct ptlrpc_request *req);
-typedef void (*svcreq_printfn_t)(void *, struct ptlrpc_request *);
-typedef int (*svc_hpreq_handler_t)(struct ptlrpc_request *);
+typedef int (*svc_handler_t)(struct ptlrpc_request *req);
+
+struct ptlrpc_service_ops {
+ /**
+ * if non-NULL called during thread creation (ptlrpc_start_thread())
+ * to initialize service specific per-thread state.
+ */
+ int (*so_thr_init)(struct ptlrpc_thread *thr);
+ /**
+ * if non-NULL called during thread shutdown (ptlrpc_main()) to
+ * destruct state created by ->srv_init().
+ */
+ void (*so_thr_done)(struct ptlrpc_thread *thr);
+ /**
+ * Handler function for incoming requests for this service
+ */
+ int (*so_req_handler)(struct ptlrpc_request *req);
+ /**
+ * function to determine priority of the request, it's called
+ * on every new request
+ */
+ int (*so_hpreq_handler)(struct ptlrpc_request *);
+ /**
+ * service-specific print fn
+ */
+ void (*so_req_printer)(void *, struct ptlrpc_request *);
+};
+
+#ifndef __cfs_cacheline_aligned
+/* NB: put it here for reducing patche dependence */
+# define __cfs_cacheline_aligned
+#endif
/**
* How many high priority requests to serve before serving one normal
* or general metadata service for MDS.
*/
struct ptlrpc_service {
- cfs_list_t srv_list; /* chain thru all services */
- int srv_max_req_size; /* biggest request to receive */
- int srv_max_reply_size; /* biggest reply to send */
- int srv_buf_size; /* size of individual buffers */
- int srv_nbuf_per_group; /* # buffers to allocate in 1 group */
- int srv_nbufs; /* total # req buffer descs allocated */
- int srv_threads_min; /* threads to start at SOW */
- int srv_threads_max; /* thread upper limit */
- int srv_threads_started; /* index of last started thread */
- int srv_threads_running; /* # running threads */
- cfs_atomic_t srv_n_difficult_replies; /* # 'difficult' replies */
- int srv_n_active_reqs; /* # reqs being served */
- int srv_n_hpreq; /* # HPreqs being served */
- cfs_duration_t srv_rqbd_timeout; /* timeout before re-posting reqs, in tick */
- int srv_watchdog_factor; /* soft watchdog timeout multiplier */
- unsigned srv_cpu_affinity:1; /* bind threads to CPUs */
- unsigned srv_at_check:1; /* check early replies */
- unsigned srv_is_stopping:1; /* under unregister_service */
- cfs_time_t srv_at_checktime; /* debug */
-
- /** Local portal on which to receive requests */
- __u32 srv_req_portal;
- /** Portal on the client to send replies to */
- __u32 srv_rep_portal;
-
- /** AT stuff */
- /** @{ */
- struct adaptive_timeout srv_at_estimate;/* estimated rpc service time */
- cfs_spinlock_t srv_at_lock;
- struct ptlrpc_at_array srv_at_array; /* reqs waiting for replies */
- cfs_timer_t srv_at_timer; /* early reply timer */
- /** @} */
-
- int srv_n_queued_reqs; /* # reqs in either of the queues below */
- int srv_hpreq_count; /* # hp requests handled */
- int srv_hpreq_ratio; /* # hp per lp reqs to handle */
- cfs_list_t srv_req_in_queue; /* incoming reqs */
- cfs_list_t srv_request_queue; /* reqs waiting for service */
- cfs_list_t srv_request_hpq; /* high priority queue */
-
- cfs_list_t srv_request_history; /* request history */
- __u64 srv_request_seq; /* next request sequence # */
- __u64 srv_request_max_cull_seq; /* highest seq culled from history */
- svcreq_printfn_t srv_request_history_print_fn; /* service-specific print fn */
-
- cfs_list_t srv_idle_rqbds; /* request buffers to be reposted */
- cfs_list_t srv_active_rqbds; /* req buffers receiving */
- cfs_list_t srv_history_rqbds; /* request buffer history */
- int srv_nrqbd_receiving; /* # posted request buffers */
- int srv_n_history_rqbds; /* # request buffers in history */
- int srv_max_history_rqbds;/* max # request buffers in history */
-
- cfs_atomic_t srv_outstanding_replies;
- cfs_list_t srv_active_replies; /* all the active replies */
-#ifndef __KERNEL__
- cfs_list_t srv_reply_queue; /* replies waiting for service */
-#endif
- cfs_waitq_t srv_waitq; /* all threads sleep on this. This
- * wait-queue is signalled when new
- * incoming request arrives and when
- * difficult reply has to be handled. */
-
- cfs_list_t srv_threads; /* service thread list */
- /** Handler function for incoming requests for this service */
- svc_handler_t srv_handler;
- svc_hpreq_handler_t srv_hpreq_handler; /* hp request handler */
-
- char *srv_name; /* only statically allocated strings here; we don't clean them */
- char *srv_thread_name; /* only statically allocated strings here; we don't clean them */
-
- cfs_spinlock_t srv_lock;
-
+ /** serialize /proc operations */
+ cfs_spinlock_t srv_lock;
+ /** most often accessed fields */
+ /** chain thru all services */
+ cfs_list_t srv_list;
+ /** service operations table */
+ struct ptlrpc_service_ops srv_ops;
+ /** only statically allocated strings here; we don't clean them */
+ char *srv_name;
+ /** only statically allocated strings here; we don't clean them */
+ char *srv_thread_name;
+ /** service thread list */
+ cfs_list_t srv_threads;
+ /** threads to start at beginning of service */
+ int srv_threads_min;
+ /** thread upper limit */
+ int srv_threads_max;
/** Root of /proc dir tree for this service */
- cfs_proc_dir_entry_t *srv_procroot;
+ cfs_proc_dir_entry_t *srv_procroot;
/** Pointer to statistic data for this service */
- struct lprocfs_stats *srv_stats;
-
- /** List of free reply_states */
- cfs_list_t srv_free_rs_list;
- /** waitq to run, when adding stuff to srv_free_rs_list */
- cfs_waitq_t srv_free_rs_waitq;
-
+ struct lprocfs_stats *srv_stats;
+ /** # hp per lp reqs to handle */
+ int srv_hpreq_ratio;
+ /** biggest request to receive */
+ int srv_max_req_size;
+ /** biggest reply to send */
+ int srv_max_reply_size;
+ /** size of individual buffers */
+ int srv_buf_size;
+ /** # buffers to allocate in 1 group */
+ int srv_nbuf_per_group;
+ /** Local portal on which to receive requests */
+ __u32 srv_req_portal;
+ /** Portal on the client to send replies to */
+ __u32 srv_rep_portal;
/**
* Tags for lu_context associated with this thread, see struct
* lu_context.
*/
- __u32 srv_ctx_tags;
- /**
- * if non-NULL called during thread creation (ptlrpc_start_thread())
- * to initialize service specific per-thread state.
- */
- int (*srv_init)(struct ptlrpc_thread *thread);
- /**
- * if non-NULL called during thread shutdown (ptlrpc_main()) to
- * destruct state created by ->srv_init().
- */
- void (*srv_done)(struct ptlrpc_thread *thread);
+ __u32 srv_ctx_tags;
+ /** soft watchdog timeout multiplier */
+ int srv_watchdog_factor;
+ /** bind threads to CPUs */
+ unsigned srv_cpu_affinity:1;
+ /** under unregister_service */
+ unsigned srv_is_stopping:1;
+
+ /**
+ * max # request buffers in history, it needs to be convert into
+ * per-partition value when we have multiple partitions
+ */
+ int srv_max_history_rqbds;
+ /**
+ * partition data for ptlrpc service, only one instance so far,
+ * instance per CPT will come soon
+ */
+ struct ptlrpc_service_part *srv_part;
+};
- //struct ptlrpc_srv_ni srv_interfaces[0];
+/**
+ * Definition of PortalRPC service partition data.
+ * Although a service only has one instance of it right now, but we
+ * will have multiple instances very soon (instance per CPT).
+ *
+ * it has four locks:
+ * \a scp_lock
+ * serialize operations on rqbd and requests waiting for preprocess
+ * \a scp_req_lock
+ * serialize operations active requests sent to this portal
+ * \a scp_at_lock
+ * serialize adaptive timeout stuff
+ * \a scp_rep_lock
+ * serialize operations on RS list (reply states)
+ *
+ * We don't have any use-case to take two or more locks at the same time
+ * for now, so there is no lock order issue.
+ */
+struct ptlrpc_service_part {
+ /** back reference to owner */
+ struct ptlrpc_service *scp_service __cfs_cacheline_aligned;
+ /* CPT id, reserved */
+ int scp_cpt;
+ /** always increasing number */
+ int scp_thr_nextid;
+ /** # of starting threads */
+ int scp_nthrs_starting;
+ /** # of stopping threads, reserved for shrinking threads */
+ int scp_nthrs_stopping;
+ /** # running threads */
+ int scp_nthrs_running;
+ /** service threads list */
+ cfs_list_t scp_threads;
+
+ /**
+ * serialize the following fields, used for protecting
+ * rqbd list and incoming requests waiting for preprocess,
+ * threads starting & stopping are also protected by this lock.
+ */
+ cfs_spinlock_t scp_lock __cfs_cacheline_aligned;
+ /** total # req buffer descs allocated */
+ int scp_nrqbds_total;
+ /** # posted request buffers for receiving */
+ int scp_nrqbds_posted;
+ /** # incoming reqs */
+ int scp_nreqs_incoming;
+ /** request buffers to be reposted */
+ cfs_list_t scp_rqbd_idle;
+ /** req buffers receiving */
+ cfs_list_t scp_rqbd_posted;
+ /** incoming reqs */
+ cfs_list_t scp_req_incoming;
+ /** timeout before re-posting reqs, in tick */
+ cfs_duration_t scp_rqbd_timeout;
+ /**
+ * all threads sleep on this. This wait-queue is signalled when new
+ * incoming request arrives and when difficult reply has to be handled.
+ */
+ cfs_waitq_t scp_waitq;
+
+ /** request history */
+ cfs_list_t scp_hist_reqs;
+ /** request buffer history */
+ cfs_list_t scp_hist_rqbds;
+ /** # request buffers in history */
+ int scp_hist_nrqbds;
+ /** sequence number for request */
+ __u64 scp_hist_seq;
+ /** highest seq culled from history */
+ __u64 scp_hist_seq_culled;
+
+ /**
+ * serialize the following fields, used for processing requests
+ * sent to this portal
+ */
+ cfs_spinlock_t scp_req_lock __cfs_cacheline_aligned;
+ /** # reqs in either of the queues below */
+ /** reqs waiting for service */
+ cfs_list_t scp_req_pending;
+ /** high priority queue */
+ cfs_list_t scp_hreq_pending;
+ /** # reqs being served */
+ int scp_nreqs_active;
+ /** # HPreqs being served */
+ int scp_nhreqs_active;
+ /** # hp requests handled */
+ int scp_hreq_count;
+
+ /** AT stuff */
+ /** @{ */
+ /**
+ * serialize the following fields, used for changes on
+ * adaptive timeout
+ */
+ cfs_spinlock_t scp_at_lock __cfs_cacheline_aligned;
+ /** estimated rpc service time */
+ struct adaptive_timeout scp_at_estimate;
+ /** reqs waiting for replies */
+ struct ptlrpc_at_array scp_at_array;
+ /** early reply timer */
+ cfs_timer_t scp_at_timer;
+ /** debug */
+ cfs_time_t scp_at_checktime;
+ /** check early replies */
+ unsigned scp_at_check;
+ /** @} */
+
+ /**
+ * serialize the following fields, used for processing
+ * replies for this portal
+ */
+ cfs_spinlock_t scp_rep_lock __cfs_cacheline_aligned;
+ /** all the active replies */
+ cfs_list_t scp_rep_active;
+#ifndef __KERNEL__
+ /** replies waiting for service */
+ cfs_list_t scp_rep_queue;
+#endif
+ /** List of free reply_states */
+ cfs_list_t scp_rep_idle;
+ /** waitq to run, when adding stuff to srv_free_rs_list */
+ cfs_waitq_t scp_rep_waitq;
+ /** # 'difficult' replies */
+ cfs_atomic_t scp_nreps_difficult;
};
/**
* Environment for request interpreters to run in.
*/
struct lu_env pc_env;
+ /**
+ * Index of ptlrpcd thread in the array.
+ */
+ int pc_index;
+ /**
+ * Number of the ptlrpcd's partners.
+ */
+ int pc_npartners;
+ /**
+ * Pointer to the array of partners' ptlrpcd_ctl structure.
+ */
+ struct ptlrpcd_ctl **pc_partners;
+ /**
+ * Record the partner index to be processed next.
+ */
+ int pc_cursor;
#ifndef __KERNEL__
/**
* Async rpcs flag to make sure that ptlrpcd_check() is called only
/**
* This is a recovery ptlrpc thread.
*/
- LIOD_RECOVERY = 1 << 3
+ LIOD_RECOVERY = 1 << 3,
+ /**
+ * The ptlrpcd is bound to some CPU core.
+ */
+ LIOD_BIND = 1 << 4,
};
/* ptlrpc/events.c */
* underlying buffer
* @{
*/
-extern void request_out_callback (lnet_event_t *ev);
+extern void request_out_callback(lnet_event_t *ev);
extern void reply_in_callback(lnet_event_t *ev);
-extern void client_bulk_callback (lnet_event_t *ev);
+extern void client_bulk_callback(lnet_event_t *ev);
extern void request_in_callback(lnet_event_t *ev);
extern void reply_out_callback(lnet_event_t *ev);
-extern void server_bulk_callback (lnet_event_t *ev);
+#ifdef HAVE_SERVER_SUPPORT
+extern void server_bulk_callback(lnet_event_t *ev);
+#endif
/** @} */
/* ptlrpc/connection.c */
* Actual interfacing with LNet to put/get/register/unregister stuff
* @{
*/
+#ifdef HAVE_SERVER_SUPPORT
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
+ int npages, int type, int portal);
int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc);
void ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc);
-int ptlrpc_register_bulk(struct ptlrpc_request *req);
-int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async);
static inline int ptlrpc_server_bulk_active(struct ptlrpc_bulk_desc *desc)
{
cfs_spin_unlock(&desc->bd_lock);
return rc;
}
+#endif
+
+int ptlrpc_register_bulk(struct ptlrpc_request *req);
+int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async);
static inline int ptlrpc_client_bulk_active(struct ptlrpc_request *req)
{
void ptlrpc_resend_req(struct ptlrpc_request *request);
int ptlrpc_at_get_net_latency(struct ptlrpc_request *req);
int ptl_send_rpc(struct ptlrpc_request *request, int noreply);
-int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd);
+int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd);
/** @} */
/* ptlrpc/client.c */
void ptlrpc_abort_set(struct ptlrpc_request_set *set);
struct ptlrpc_request_set *ptlrpc_prep_set(void);
+struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
+ void *arg);
int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
set_interpreter_func fn, void *data);
int ptlrpc_set_next_timeout(struct ptlrpc_request_set *);
void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
void ptlrpc_set_destroy(struct ptlrpc_request_set *);
void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
-int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
- struct ptlrpc_request *req);
+void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
+ struct ptlrpc_request *req);
void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
void ptlrpc_req_finished(struct ptlrpc_request *request);
void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request);
struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req);
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
- int npages, int type, int portal);
-struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
+struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
int npages, int type, int portal);
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
__u64 ptlrpc_sample_next_xid(void);
__u64 ptlrpc_req_xid(struct ptlrpc_request *request);
+/* Set of routines to run a function in ptlrpcd context */
+void *ptlrpcd_alloc_work(struct obd_import *imp,
+ int (*cb)(const struct lu_env *, void *), void *data);
+void ptlrpcd_destroy_work(void *handler);
+int ptlrpcd_queue_work(void *handler);
+
/** @} */
+struct ptlrpc_service_buf_conf {
+ /* nbufs is how many buffers to post */
+ unsigned int bc_nbufs;
+ /* buffer size to post */
+ unsigned int bc_buf_size;
+ /* portal to listed for requests on */
+ unsigned int bc_req_portal;
+ /* portal of where to send replies to */
+ unsigned int bc_rep_portal;
+ /* maximum request size to be accepted for this service */
+ unsigned int bc_req_max_size;
+ /* maximum reply size this service can ever send */
+ unsigned int bc_rep_max_size;
+};
+
+struct ptlrpc_service_thr_conf {
+ /* threadname should be 8 characters or less - 6 will be added on */
+ char *tc_thr_name;
+ /* min number of service threads to start */
+ unsigned int tc_nthrs_min;
+ /* max number of service threads to start */
+ unsigned int tc_nthrs_max;
+ /* user specified threads number, it will be validated due to
+ * other members of this structure. */
+ unsigned int tc_nthrs_user;
+ /* set NUMA node affinity for service threads */
+ unsigned int tc_cpu_affinity;
+ /* Tags for lu_context associated with service thread */
+ __u32 tc_ctx_tags;
+};
struct ptlrpc_service_conf {
- int psc_nbufs;
- int psc_bufsize;
- int psc_max_req_size;
- int psc_max_reply_size;
- int psc_req_portal;
- int psc_rep_portal;
- int psc_watchdog_factor;
- int psc_min_threads;
- int psc_max_threads;
- __u32 psc_ctx_tags;
+ /* service name */
+ char *psc_name;
+ /* soft watchdog timeout multiplifier to print stuck service traces */
+ unsigned int psc_watchdog_factor;
+ /* buffer information */
+ struct ptlrpc_service_buf_conf psc_buf;
+ /* thread information */
+ struct ptlrpc_service_thr_conf psc_thr;
+ /* function table */
+ struct ptlrpc_service_ops psc_ops;
};
/* ptlrpc/service.c */
*
* @{
*/
-void ptlrpc_save_lock (struct ptlrpc_request *req,
- struct lustre_handle *lock, int mode, int no_ack);
+void ptlrpc_save_lock(struct ptlrpc_request *req,
+ struct lustre_handle *lock, int mode, int no_ack);
void ptlrpc_commit_replies(struct obd_export *exp);
-void ptlrpc_dispatch_difficult_reply (struct ptlrpc_reply_state *rs);
-void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs);
-struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
- svc_handler_t h, char *name,
- struct proc_dir_entry *proc_entry,
- svcreq_printfn_t prntfn,
- char *threadname);
-
-struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
- int max_reply_size,
- int req_portal, int rep_portal,
- int watchdog_factor,
- svc_handler_t, char *name,
- cfs_proc_dir_entry_t *proc_entry,
- svcreq_printfn_t,
- int min_threads, int max_threads,
- char *threadname, __u32 ctx_tags,
- svc_hpreq_handler_t);
+void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs);
+void ptlrpc_schedule_difficult_reply(struct ptlrpc_reply_state *rs);
+struct ptlrpc_service *ptlrpc_register_service(
+ struct ptlrpc_service_conf *conf,
+ struct proc_dir_entry *proc_entry);
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
-int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc);
-int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc);
+int ptlrpc_start_threads(struct ptlrpc_service *svc);
int ptlrpc_unregister_service(struct ptlrpc_service *service);
-int liblustre_check_services (void *arg);
+int liblustre_check_services(void *arg);
void ptlrpc_daemonize(char *name);
int ptlrpc_service_health_check(struct ptlrpc_service *);
void ptlrpc_hpreq_reorder(struct ptlrpc_request *req);
-void ptlrpc_server_active_request_inc(struct ptlrpc_request *req);
-void ptlrpc_server_active_request_dec(struct ptlrpc_request *req);
void ptlrpc_server_drop_request(struct ptlrpc_request *req);
#ifdef __KERNEL__
# define ptlrpc_hr_fini() do {} while(0)
#endif
-struct ptlrpc_svc_data {
- char *name;
- struct ptlrpc_service *svc;
- struct ptlrpc_thread *thread;
- struct obd_device *dev;
-};
/** @} */
/* ptlrpc/import.c */
* Import API
* @{
*/
-int ptlrpc_connect_import(struct obd_import *imp, char * new_uuid);
+int ptlrpc_connect_import(struct obd_import *imp);
int ptlrpc_init_import(struct obd_import *imp);
int ptlrpc_disconnect_import(struct obd_import *imp, int noclose);
int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
+void deuuidify(char *uuid, const char *prefix, char **uuid_start,
+ int *uuid_len);
/* ptlrpc/pack_generic.c */
int ptlrpc_reconnect_import(struct obd_import *imp);
int lustre_msg_buflen(struct lustre_msg *m, int n);
void lustre_msg_set_buflen(struct lustre_msg *m, int n, int len);
int lustre_msg_bufcount(struct lustre_msg *m);
-char *lustre_msg_string (struct lustre_msg *m, int n, int max_len);
+char *lustre_msg_string(struct lustre_msg *m, int n, int max_len);
__u32 lustre_msghdr_get_flags(struct lustre_msg *msg);
void lustre_msghdr_set_flags(struct lustre_msg *msg, __u32 flags);
__u32 lustre_msg_get_flags(struct lustre_msg *msg);
__u32 lustre_msg_get_magic(struct lustre_msg *msg);
__u32 lustre_msg_get_timeout(struct lustre_msg *msg);
__u32 lustre_msg_get_service_time(struct lustre_msg *msg);
+char *lustre_msg_get_jobid(struct lustre_msg *msg);
__u32 lustre_msg_get_cksum(struct lustre_msg *msg);
#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 9, 0, 0)
__u32 lustre_msg_calc_cksum(struct lustre_msg *msg, int compat18);
void ptlrpc_request_set_replen(struct ptlrpc_request *req);
void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout);
void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time);
+void lustre_msg_set_jobid(struct lustre_msg *msg, char *jobid);
void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum);
static inline void
return req->rq_no_resend;
}
+static inline int
+ptlrpc_server_get_timeout(struct ptlrpc_service_part *svcpt)
+{
+ int at = AT_OFF ? 0 : at_get(&svcpt->scp_at_estimate);
+
+ return svcpt->scp_service->srv_watchdog_factor *
+ max_t(int, at, obd_timeout);
+}
+
+static inline struct ptlrpc_service *
+ptlrpc_req2svc(struct ptlrpc_request *req)
+{
+ LASSERT(req->rq_rqbd != NULL);
+ return req->rq_rqbd->rqbd_svcpt->scp_service;
+}
+
/* ldlm/ldlm_lib.c */
/**
* Target client logic
int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
int priority);
int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid);
+int client_import_find_conn(struct obd_import *imp, lnet_nid_t peer,
+ struct obd_uuid *uuid);
int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid);
void client_destroy_import(struct obd_import *imp);
/** @} */
+#ifdef HAVE_SERVER_SUPPORT
int server_disconnect_export(struct obd_export *exp);
+#endif
/* ptlrpc/pinger.c */
/**
int ptlrpc_check_and_wait_suspend(struct ptlrpc_request *req);
/** @} */
-/* ptlrpc/ptlrpcd.c */
+/* ptlrpc daemon bind policy */
+typedef enum {
+ /* all ptlrpcd threads are free mode */
+ PDB_POLICY_NONE = 1,
+ /* all ptlrpcd threads are bound mode */
+ PDB_POLICY_FULL = 2,
+ /* <free1 bound1> <free2 bound2> ... <freeN boundN> */
+ PDB_POLICY_PAIR = 3,
+ /* <free1 bound1> <bound1 free2> ... <freeN boundN> <boundN free1>,
+ * means each ptlrpcd[X] has two partners: thread[X-1] and thread[X+1].
+ * If kernel supports NUMA, pthrpcd threads are binded and
+ * grouped by NUMA node */
+ PDB_POLICY_NEIGHBOR = 4,
+} pdb_policy_t;
+
+/* ptlrpc daemon load policy
+ * It is caller's duty to specify how to push the async RPC into some ptlrpcd
+ * queue, but it is not enforced, affected by "ptlrpcd_bind_policy". If it is
+ * "PDB_POLICY_FULL", then the RPC will be processed by the selected ptlrpcd,
+ * Otherwise, the RPC may be processed by the selected ptlrpcd or its partner,
+ * depends on which is scheduled firstly, to accelerate the RPC processing. */
+typedef enum {
+ /* on the same CPU core as the caller */
+ PDL_POLICY_SAME = 1,
+ /* within the same CPU partition, but not the same core as the caller */
+ PDL_POLICY_LOCAL = 2,
+ /* round-robin on all CPU cores, but not the same core as the caller */
+ PDL_POLICY_ROUND = 3,
+ /* the specified CPU core is preferred, but not enforced */
+ PDL_POLICY_PREFERRED = 4,
+} pdl_policy_t;
-/**
- * Ptlrpcd scope is a set of two threads: ptlrpcd-foo and ptlrpcd-foo-rcv,
- * these threads are used to asynchronously send requests queued with
- * ptlrpcd_add_req(req, PCSOPE_FOO), and to handle completion call-backs for
- * such requests. Multiple scopes are needed to avoid dead-locks.
- */
-enum ptlrpcd_scope {
- /** Scope of bulk read-write rpcs. */
- PSCOPE_BRW,
- /** Everything else. */
- PSCOPE_OTHER,
- PSCOPE_NR
-};
-
-int ptlrpcd_start(const char *name, struct ptlrpcd_ctl *pc);
+/* ptlrpc/ptlrpcd.c */
void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force);
void ptlrpcd_wake(struct ptlrpc_request *req);
-int ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope);
+void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx);
void ptlrpcd_add_rqset(struct ptlrpc_request_set *set);
int ptlrpcd_addref(void);
void ptlrpcd_decref(void);