super->pccs_generation = 1;
super->pccs_async_threshold = PCC_DEFAULT_ASYNC_THRESHOLD;
super->pccs_mode = S_IRUSR;
+ atomic_set(&super->pccs_attaches_queued, 0);
+ super->pccs_maximum_queued_attaches = PCCS_DEFAULT_ATTACH_QUEUE_DEPTH;
return 0;
}
pcc_attach_context_alloc(struct file *file, struct inode *inode, __u32 id)
{
struct pcc_attach_context *pccx;
+ struct pcc_super *super = ll_i2pccs(inode);
OBD_ALLOC_PTR(pccx);
if (!pccx)
pccx->pccx_file = get_file(file);
pccx->pccx_inode = inode;
pccx->pccx_attach_id = id;
+ atomic_inc(&super->pccs_attaches_queued);
return pccx;
}
static inline void pcc_attach_context_free(struct pcc_attach_context *pccx)
{
+ struct pcc_super *super = ll_i2pccs(pccx->pccx_inode);
+
+ atomic_dec(&super->pccs_attaches_queued);
LASSERT(pccx->pccx_file != NULL);
fput(pccx->pccx_file);
OBD_FREE_PTR(pccx);
static inline int pcc_do_readonly_attach(struct file *file,
struct inode *inode, __u32 roid)
{
+ struct pcc_super *super = ll_i2pccs(inode);
+ bool async = true;
int rc;
- if (max_t(__u64, ll_i2info(inode)->lli_lazysize, i_size_read(inode)) >=
- ll_i2pccs(inode)->pccs_async_threshold) {
+ /* force sync if we're over the queueing limit, so this thread can't
+ * contribute to the queue any more. This lets us exceed the queue
+ * depth by $NUMTHREADS, but that should be fine - deep queues are
+ * OK, the main thing is to avoid unbounded numbers of kthreads.
+ */
+ if (atomic_read(&super->pccs_attaches_queued) >=
+ super->pccs_maximum_queued_attaches)
+ async = false;
+ /* if the file size is < the async threshold, don't do async */
+ if (max_t(__u64, ll_i2info(inode)->lli_lazysize, i_size_read(inode)) <
+ super->pccs_async_threshold)
+ async = false;
+
+ if (async) {
rc = pcc_readonly_attach_async(file, inode, roid);
if (!rc || rc == -EINPROGRESS)
return rc;
#define PCC_DEFAULT_ASYNC_THRESHOLD (256 << 20)
+/* after this many attaches are queued up, fall back to sync attach. each
+ * attach creates a kthread, so we don't allow too many at once, but sync
+ * attach is very bad for applications, so we try to be generous.
+ */
+#define PCCS_DEFAULT_ATTACH_QUEUE_DEPTH 1024
struct pcc_super {
/* Protect pccs_datasets */
struct rw_semaphore pccs_rw_sem;
__u64 pccs_async_threshold;
bool pccs_async_affinity;
umode_t pccs_mode;
+ atomic_t pccs_attaches_queued;
+ int pccs_maximum_queued_attaches;
};
struct pcc_inode {