From aba940fa10195579eac0701846922547c6ad0dae Mon Sep 17 00:00:00 2001 From: Mr NeilBrown Date: Fri, 22 Jan 2021 13:42:33 +1100 Subject: [PATCH] LU-14357 fid: simplify locking for fid updates 'struct lu_client_seq' contains a mutex (lcs_mutex) and a second open-coded mutex (lcs_waitq, lcs_update). Both of these are using in gettign a new fid, possibly from the server. lcs_mutex is the main mutex which protects the local variables. If an RPC to the server is required, the extra mutex is held during that RPC. This was apparently intended to avoid some deadlock, presumably with seq_client_flush(). However as seq_client_flush() now takes both mutexes as well, it is still prone to any such deadlock, but does not appear to actually suffer from one. See: Commit 23e2a370c8a3 ("b=24255 move seq_client_alloc_seq out of lcs_sem") Commit d1feb5c774d4 ("LU-662 fix conflict between seq_client_flush and seq_client_alloc_fid") for some of the history. The extra open-coded mutex appears to provide no value, so let's remove it. As part of this, seq_fid_alloc_fini() is open-coded into the two call sites, which adds further simplification. Signed-off-by: Mr NeilBrown Change-Id: Ia39eca7d925c9d49fbf942923de8af79dba4f6bf Reviewed-on: https://review.whamcloud.com/41299 Tested-by: jenkins Tested-by: Maloo Reviewed-by: James Simmons Reviewed-by: Alex Zhuravlev Reviewed-by: Lai Siyao Reviewed-by: Oleg Drokin --- lustre/fid/fid_request.c | 140 +++++++++++--------------------------------- lustre/include/lustre_fid.h | 42 ++++++------- 2 files changed, 53 insertions(+), 129 deletions(-) diff --git a/lustre/fid/fid_request.c b/lustre/fid/fid_request.c index add56f6..2fa8590 100644 --- a/lustre/fid/fid_request.c +++ b/lustre/fid/fid_request.c @@ -250,59 +250,6 @@ static int seq_client_alloc_seq(const struct lu_env *env, RETURN(rc); } -static int seq_fid_alloc_prep(struct lu_client_seq *seq, - wait_queue_entry_t *link) -{ - if (seq->lcs_update) { - add_wait_queue(&seq->lcs_waitq, link); - set_current_state(TASK_UNINTERRUPTIBLE); - mutex_unlock(&seq->lcs_mutex); - - schedule(); - - mutex_lock(&seq->lcs_mutex); - remove_wait_queue(&seq->lcs_waitq, link); - set_current_state(TASK_RUNNING); - return -EAGAIN; - } - - ++seq->lcs_update; - mutex_unlock(&seq->lcs_mutex); - - return 0; -} - -static void seq_fid_alloc_fini(struct lu_client_seq *seq, __u64 seqnr, - bool whole) -{ - LASSERT(seq->lcs_update == 1); - - mutex_lock(&seq->lcs_mutex); - if (seqnr != 0) { - CDEBUG(D_INFO, "%s: New sequence [0x%16.16llx]\n", - seq->lcs_name, seqnr); - - seq->lcs_fid.f_seq = seqnr; - if (whole) { - /* - * Since the caller require the whole seq, - * so marked this seq to be used - */ - if (seq->lcs_type == LUSTRE_SEQ_METADATA) - seq->lcs_fid.f_oid = - LUSTRE_METADATA_SEQ_MAX_WIDTH; - else - seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH; - } else { - seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID; - } - seq->lcs_fid.f_ver = 0; - } - - --seq->lcs_update; - wake_up(&seq->lcs_waitq); -} - /** * Allocate the whole non-used seq to the caller. * @@ -316,23 +263,31 @@ static void seq_fid_alloc_fini(struct lu_client_seq *seq, __u64 seqnr, int seq_client_get_seq(const struct lu_env *env, struct lu_client_seq *seq, u64 *seqnr) { - wait_queue_entry_t link; int rc; LASSERT(seqnr != NULL); mutex_lock(&seq->lcs_mutex); - init_wait(&link); - - /* To guarantee that we can get a whole non-used sequence. */ - while (seq_fid_alloc_prep(seq, &link) != 0) - ; /* do nothing */ rc = seq_client_alloc_seq(env, seq, seqnr); - seq_fid_alloc_fini(seq, rc ? 0 : *seqnr, true); - if (rc) + if (rc) { CERROR("%s: Can't allocate new sequence: rc = %d\n", seq->lcs_name, rc); + } else { + CDEBUG(D_INFO, "%s: New sequence [0x%16.16llx]\n", + seq->lcs_name, *seqnr); + seq->lcs_fid.f_seq = *seqnr; + seq->lcs_fid.f_ver = 0; + /* + * The caller require the whole seq, + * so marked this seq to be used + */ + if (seq->lcs_type == LUSTRE_SEQ_METADATA) + seq->lcs_fid.f_oid = + LUSTRE_METADATA_SEQ_MAX_WIDTH; + else + seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH; + } mutex_unlock(&seq->lcs_mutex); return rc; @@ -354,59 +309,48 @@ EXPORT_SYMBOL(seq_client_get_seq); int seq_client_alloc_fid(const struct lu_env *env, struct lu_client_seq *seq, struct lu_fid *fid) { - wait_queue_entry_t link; int rc; ENTRY; LASSERT(seq != NULL); LASSERT(fid != NULL); - init_wait(&link); mutex_lock(&seq->lcs_mutex); if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST)) seq->lcs_fid.f_oid = seq->lcs_width; - while (1) { + if (unlikely(!fid_is_zero(&seq->lcs_fid) && + fid_oid(&seq->lcs_fid) < seq->lcs_width)) { + /* Just bump last allocated fid and return to caller. */ + seq->lcs_fid.f_oid++; + rc = 0; + } else { u64 seqnr; - if (unlikely(!fid_is_zero(&seq->lcs_fid) && - fid_oid(&seq->lcs_fid) < seq->lcs_width)) { - /* Just bump last allocated fid and return to caller. */ - seq->lcs_fid.f_oid++; - rc = 0; - break; - } - - /* - * Release seq::lcs_mutex via seq_fid_alloc_prep() to avoid - * deadlock during seq_client_alloc_seq(). - */ - rc = seq_fid_alloc_prep(seq, &link); - if (rc) - continue; - rc = seq_client_alloc_seq(env, seq, &seqnr); - /* Re-take seq::lcs_mutex via seq_fid_alloc_fini(). */ - seq_fid_alloc_fini(seq, rc ? 0 : seqnr, false); if (rc) { if (rc != -EINPROGRESS) CERROR("%s: Can't allocate new sequence: rc = %d\n", seq->lcs_name, rc); - mutex_unlock(&seq->lcs_mutex); + } else { + CDEBUG(D_INFO, "%s: New sequence [0x%16.16llx]\n", + seq->lcs_name, seqnr); - RETURN(rc); + seq->lcs_fid.f_seq = seqnr; + seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID; + seq->lcs_fid.f_ver = 0; + rc = 1; } - - rc = 1; - break; } - *fid = seq->lcs_fid; + if (rc >= 0) { + *fid = seq->lcs_fid; + CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, + PFID(fid)); + } mutex_unlock(&seq->lcs_mutex); - CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); - RETURN(rc); } EXPORT_SYMBOL(seq_client_alloc_fid); @@ -417,24 +361,9 @@ EXPORT_SYMBOL(seq_client_alloc_fid); */ void seq_client_flush(struct lu_client_seq *seq) { - wait_queue_entry_t link; - LASSERT(seq != NULL); - init_wait(&link); mutex_lock(&seq->lcs_mutex); - while (seq->lcs_update) { - add_wait_queue(&seq->lcs_waitq, &link); - set_current_state(TASK_UNINTERRUPTIBLE); - mutex_unlock(&seq->lcs_mutex); - - schedule(); - - mutex_lock(&seq->lcs_mutex); - remove_wait_queue(&seq->lcs_waitq, &link); - set_current_state(TASK_RUNNING); - } - fid_zero(&seq->lcs_fid); /** * this id shld not be used for seq range allocation. @@ -497,7 +426,6 @@ void seq_client_init(struct lu_client_seq *seq, else seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH; - init_waitqueue_head(&seq->lcs_waitq); /* Make sure that things are clear before work is started. */ seq_client_flush(seq); diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index d7388f0..fccb5ff 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -409,25 +409,25 @@ struct lu_server_seq; /* Client sequence manager interface. */ struct lu_client_seq { - /* Sequence-controller export. */ - struct obd_export *lcs_exp; + /* Sequence-controller export. */ + struct obd_export *lcs_exp; struct mutex lcs_mutex; - /* - * Range of allowed for allocation sequeces. When using lu_client_seq on - * clients, this contains meta-sequence range. And for servers this - * contains super-sequence range. - */ - struct lu_seq_range lcs_space; + /* + * Range of allowed for allocation sequeces. When using lu_client_seq on + * clients, this contains meta-sequence range. And for servers this + * contains super-sequence range. + */ + struct lu_seq_range lcs_space; /* Seq related debugfs */ struct dentry *lcs_debugfs_entry; - /* This holds last allocated fid in last obtained seq */ - struct lu_fid lcs_fid; + /* This holds last allocated fid in last obtained seq */ + struct lu_fid lcs_fid; - /* LUSTRE_SEQ_METADATA or LUSTRE_SEQ_DATA */ - enum lu_cli_type lcs_type; + /* LUSTRE_SEQ_METADATA or LUSTRE_SEQ_DATA */ + enum lu_cli_type lcs_type; /* * Service uuid, passed from MDT + seq name to form unique seq name to @@ -435,18 +435,14 @@ struct lu_client_seq { */ char lcs_name[LUSTRE_MDT_MAXNAMELEN]; - /* - * Sequence width, that is how many objects may be allocated in one - * sequence. Default value for it is LUSTRE_SEQ_MAX_WIDTH. - */ - __u64 lcs_width; - - /* Seq-server for direct talking */ - struct lu_server_seq *lcs_srv; + /* + * Sequence width, that is how many objects may be allocated in one + * sequence. Default value for it is LUSTRE_SEQ_MAX_WIDTH. + */ + __u64 lcs_width; - /* wait queue for fid allocation and update indicator */ - wait_queue_head_t lcs_waitq; - int lcs_update; + /* Seq-server for direct talking */ + struct lu_server_seq *lcs_srv; }; /* server sequence manager interface */ -- 1.8.3.1