struct ptlrpc_request_set;
typedef int (*set_producer_func)(struct ptlrpc_request_set *, void *);
-/**
+/*
* Definition of request set structure.
* Request set is a list of requests (not necessary to the same target) that
* once populated with RPCs could be sent in parallel.
* returned.
*/
struct ptlrpc_request_set {
- atomic_t set_refcount;
- /** number of in queue requests */
+ struct kref set_refcount;
+ /* number of in queue requests */
atomic_t set_new_count;
- /** number of uncompleted requests */
+ /* number of uncompleted requests. */
atomic_t set_remaining;
- /** wait queue to wait on for request events */
+ /* wait queue to wait on for request events */
wait_queue_head_t set_waitq;
- /** List of requests in the set */
+ /* List of requests in the set */
struct list_head set_requests;
- /**
+ /*
* Lock for \a set_new_requests manipulations
* locked so that any old caller can communicate requests to
* the set holder who can then fold them into the lock-free set
*/
spinlock_t set_new_req_lock;
- /** List of new yet unsent requests. Only used with ptlrpcd now. */
+ /* List of new yet unsent requests. Only used with ptlrpcd now. */
struct list_head set_new_requests;
- /** rq_status of requests that have been freed already */
+ /* rq_status of requests that have been freed already */
int set_rc;
- /** Additional fields used by the flow control extension */
- /** Maximum number of RPCs in flight */
+ /* Additional fields used by the flow control extension */
+ /* Maximum number of RPCs in flight */
int set_max_inflight;
- /** Callback function used to generate RPCs */
+ /* Callback function used to generate RPCs */
set_producer_func set_producer;
- /** opaq argument passed to the producer callback */
+ /* opaq argument passed to the producer callback */
void *set_producer_arg;
unsigned int set_allow_intr:1;
};
OBD_CPT_ALLOC(set, cfs_cpt_tab, cpt, sizeof(*set));
if (!set)
RETURN(NULL);
- atomic_set(&set->set_refcount, 1);
+ kref_init(&set->set_refcount);
INIT_LIST_HEAD(&set->set_requests);
init_waitqueue_head(&set->set_waitq);
atomic_set(&set->set_new_count, 0);
LASSERT(atomic_read(&set->set_remaining) == 0);
- ptlrpc_reqset_put(set);
+ kref_put(&set->set_refcount, ptlrpc_reqset_free);
EXIT;
}
EXPORT_SYMBOL(ptlrpc_set_destroy);
return 0;
}
EXPORT_SYMBOL(ptlrpcd_queue_work);
+
+/**
+ * ptlrpc_reqset_free() - Release memory allocated for ptlrpc_request_set
+ * @kref: kref when dropped below 1
+ *
+ * Used as a kref release callback, when the last user of ptlrpc_request_set
+ * is released.
+ */
+void ptlrpc_reqset_free(struct kref *kref)
+{
+ struct ptlrpc_request_set *set = container_of(kref,
+ struct ptlrpc_request_set,
+ set_refcount);
+ OBD_FREE_PTR(set);
+}
void ptlrpc_assign_next_xid_nolock(struct ptlrpc_request *req);
__u64 ptlrpc_known_replied_xid(struct obd_import *imp);
void ptlrpc_add_unreplied(struct ptlrpc_request *req);
+void ptlrpc_reqset_free(struct kref *kerf);
/* events.c */
int ptlrpc_init_portals(void);
void nodemap_mod_exit(void);
#endif /* HAVE_SERVER_SUPPORT */
-static inline void ptlrpc_reqset_put(struct ptlrpc_request_set *set)
-{
- if (atomic_dec_and_test(&set->set_refcount))
- OBD_FREE_PTR(set);
-}
-
/** initialise ptlrpc common fields */
static inline void ptlrpc_req_comm_init(struct ptlrpc_request *req)
{
}
EXPORT_SYMBOL(ptlrpcd_add_req);
-static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set)
-{
- atomic_inc(&set->set_refcount);
-}
-
-/**
+/*
* Check if there is more work to do on ptlrpcd set.
* Returns 1 if yes.
*/
continue;
}
- ptlrpc_reqset_get(ps);
+ kref_get(&ps->set_refcount);
spin_unlock(&partner->pc_lock);
if (atomic_read(&ps->set_new_count)) {
rc, partner->pc_index,
pc->pc_index);
}
- ptlrpc_reqset_put(ps);
+ kref_put(&ps->set_refcount, ptlrpc_reqset_free);
} while (rc == 0 && pc->pc_cursor != first);
}
}