RETURN(rc);
}
-static int seq_fid_alloc_prep(struct lu_client_seq *seq,
- wait_queue_entry_t *link)
-{
- if (seq->lcs_update) {
- add_wait_queue(&seq->lcs_waitq, link);
- set_current_state(TASK_UNINTERRUPTIBLE);
- mutex_unlock(&seq->lcs_mutex);
-
- schedule();
-
- mutex_lock(&seq->lcs_mutex);
- remove_wait_queue(&seq->lcs_waitq, link);
- set_current_state(TASK_RUNNING);
- return -EAGAIN;
- }
-
- ++seq->lcs_update;
- mutex_unlock(&seq->lcs_mutex);
-
- return 0;
-}
-
-static void seq_fid_alloc_fini(struct lu_client_seq *seq, __u64 seqnr,
- bool whole)
-{
- LASSERT(seq->lcs_update == 1);
-
- mutex_lock(&seq->lcs_mutex);
- if (seqnr != 0) {
- CDEBUG(D_INFO, "%s: New sequence [0x%16.16llx]\n",
- seq->lcs_name, seqnr);
-
- seq->lcs_fid.f_seq = seqnr;
- if (whole) {
- /*
- * Since the caller require the whole seq,
- * so marked this seq to be used
- */
- if (seq->lcs_type == LUSTRE_SEQ_METADATA)
- seq->lcs_fid.f_oid =
- LUSTRE_METADATA_SEQ_MAX_WIDTH;
- else
- seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH;
- } else {
- seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
- }
- seq->lcs_fid.f_ver = 0;
- }
-
- --seq->lcs_update;
- wake_up(&seq->lcs_waitq);
-}
-
/**
* Allocate the whole non-used seq to the caller.
*
int seq_client_get_seq(const struct lu_env *env,
struct lu_client_seq *seq, u64 *seqnr)
{
- wait_queue_entry_t link;
int rc;
LASSERT(seqnr != NULL);
mutex_lock(&seq->lcs_mutex);
- init_wait(&link);
-
- /* To guarantee that we can get a whole non-used sequence. */
- while (seq_fid_alloc_prep(seq, &link) != 0)
- ; /* do nothing */
rc = seq_client_alloc_seq(env, seq, seqnr);
- seq_fid_alloc_fini(seq, rc ? 0 : *seqnr, true);
- if (rc)
+ if (rc) {
CERROR("%s: Can't allocate new sequence: rc = %d\n",
seq->lcs_name, rc);
+ } else {
+ CDEBUG(D_INFO, "%s: New sequence [0x%16.16llx]\n",
+ seq->lcs_name, *seqnr);
+ seq->lcs_fid.f_seq = *seqnr;
+ seq->lcs_fid.f_ver = 0;
+ /*
+ * The caller require the whole seq,
+ * so marked this seq to be used
+ */
+ if (seq->lcs_type == LUSTRE_SEQ_METADATA)
+ seq->lcs_fid.f_oid =
+ LUSTRE_METADATA_SEQ_MAX_WIDTH;
+ else
+ seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH;
+ }
mutex_unlock(&seq->lcs_mutex);
return rc;
int seq_client_alloc_fid(const struct lu_env *env,
struct lu_client_seq *seq, struct lu_fid *fid)
{
- wait_queue_entry_t link;
int rc;
ENTRY;
LASSERT(seq != NULL);
LASSERT(fid != NULL);
- init_wait(&link);
mutex_lock(&seq->lcs_mutex);
if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST))
seq->lcs_fid.f_oid = seq->lcs_width;
- while (1) {
+ if (unlikely(!fid_is_zero(&seq->lcs_fid) &&
+ fid_oid(&seq->lcs_fid) < seq->lcs_width)) {
+ /* Just bump last allocated fid and return to caller. */
+ seq->lcs_fid.f_oid++;
+ rc = 0;
+ } else {
u64 seqnr;
- if (unlikely(!fid_is_zero(&seq->lcs_fid) &&
- fid_oid(&seq->lcs_fid) < seq->lcs_width)) {
- /* Just bump last allocated fid and return to caller. */
- seq->lcs_fid.f_oid++;
- rc = 0;
- break;
- }
-
- /*
- * Release seq::lcs_mutex via seq_fid_alloc_prep() to avoid
- * deadlock during seq_client_alloc_seq().
- */
- rc = seq_fid_alloc_prep(seq, &link);
- if (rc)
- continue;
-
rc = seq_client_alloc_seq(env, seq, &seqnr);
- /* Re-take seq::lcs_mutex via seq_fid_alloc_fini(). */
- seq_fid_alloc_fini(seq, rc ? 0 : seqnr, false);
if (rc) {
if (rc != -EINPROGRESS)
CERROR("%s: Can't allocate new sequence: rc = %d\n",
seq->lcs_name, rc);
- mutex_unlock(&seq->lcs_mutex);
+ } else {
+ CDEBUG(D_INFO, "%s: New sequence [0x%16.16llx]\n",
+ seq->lcs_name, seqnr);
- RETURN(rc);
+ seq->lcs_fid.f_seq = seqnr;
+ seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
+ seq->lcs_fid.f_ver = 0;
+ rc = 1;
}
-
- rc = 1;
- break;
}
- *fid = seq->lcs_fid;
+ if (rc >= 0) {
+ *fid = seq->lcs_fid;
+ CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,
+ PFID(fid));
+ }
mutex_unlock(&seq->lcs_mutex);
- CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid));
-
RETURN(rc);
}
EXPORT_SYMBOL(seq_client_alloc_fid);
*/
void seq_client_flush(struct lu_client_seq *seq)
{
- wait_queue_entry_t link;
-
LASSERT(seq != NULL);
- init_wait(&link);
mutex_lock(&seq->lcs_mutex);
- while (seq->lcs_update) {
- add_wait_queue(&seq->lcs_waitq, &link);
- set_current_state(TASK_UNINTERRUPTIBLE);
- mutex_unlock(&seq->lcs_mutex);
-
- schedule();
-
- mutex_lock(&seq->lcs_mutex);
- remove_wait_queue(&seq->lcs_waitq, &link);
- set_current_state(TASK_RUNNING);
- }
-
fid_zero(&seq->lcs_fid);
/**
* this id shld not be used for seq range allocation.
else
seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH;
- init_waitqueue_head(&seq->lcs_waitq);
/* Make sure that things are clear before work is started. */
seq_client_flush(seq);
/* Client sequence manager interface. */
struct lu_client_seq {
- /* Sequence-controller export. */
- struct obd_export *lcs_exp;
+ /* Sequence-controller export. */
+ struct obd_export *lcs_exp;
struct mutex lcs_mutex;
- /*
- * Range of allowed for allocation sequeces. When using lu_client_seq on
- * clients, this contains meta-sequence range. And for servers this
- * contains super-sequence range.
- */
- struct lu_seq_range lcs_space;
+ /*
+ * Range of allowed for allocation sequeces. When using lu_client_seq on
+ * clients, this contains meta-sequence range. And for servers this
+ * contains super-sequence range.
+ */
+ struct lu_seq_range lcs_space;
/* Seq related debugfs */
struct dentry *lcs_debugfs_entry;
- /* This holds last allocated fid in last obtained seq */
- struct lu_fid lcs_fid;
+ /* This holds last allocated fid in last obtained seq */
+ struct lu_fid lcs_fid;
- /* LUSTRE_SEQ_METADATA or LUSTRE_SEQ_DATA */
- enum lu_cli_type lcs_type;
+ /* LUSTRE_SEQ_METADATA or LUSTRE_SEQ_DATA */
+ enum lu_cli_type lcs_type;
/*
* Service uuid, passed from MDT + seq name to form unique seq name to
*/
char lcs_name[LUSTRE_MDT_MAXNAMELEN];
- /*
- * Sequence width, that is how many objects may be allocated in one
- * sequence. Default value for it is LUSTRE_SEQ_MAX_WIDTH.
- */
- __u64 lcs_width;
-
- /* Seq-server for direct talking */
- struct lu_server_seq *lcs_srv;
+ /*
+ * Sequence width, that is how many objects may be allocated in one
+ * sequence. Default value for it is LUSTRE_SEQ_MAX_WIDTH.
+ */
+ __u64 lcs_width;
- /* wait queue for fid allocation and update indicator */
- wait_queue_head_t lcs_waitq;
- int lcs_update;
+ /* Seq-server for direct talking */
+ struct lu_server_seq *lcs_srv;
};
/* server sequence manager interface */