Whamcloud - gitweb
- fixes and cleanup in seq-mgr about setting controller client. Return -ENODEV if...
authoryury <yury>
Tue, 10 Oct 2006 13:58:49 +0000 (13:58 +0000)
committeryury <yury>
Tue, 10 Oct 2006 13:58:49 +0000 (13:58 +0000)
- in fld_rrb_scan() added detailed dump of targets for case no target found by seq-built hash;
- in fld_client_add_target() detailed info about added target;

- in target_handle_connect() do not allow to connect if obd configuratoin is not yet finished. MGS obd is always counted as finished configuration and may accept connections from MGC;

- fixed leak of och in ll_close_inode_openhandle(). och should be marked DEAD also for cases !(och->och_flags & FMODE_WRITE);

- in lmv_clear_open_replay_data() handle possible error if no target found by fid;
- detailed debug in mdc_realloc_openmsg() about req realloc activities and attributes (sizes);
- in mdc_enqueue() do not realloc failed open reqs;

- in mdc_enqueue() set open replay data only in cases open req is not errorneous and will be replayed (req->rq_replay == 1);

- in mdc_enqueue() try to realloc open req for EA not only for reg files. Directories also may have stripe info (default stripe info for children files). Cleanups;

- in mdc_set_open_replay_data() add ref to open request saved to open replay data to prevent its freeing underneath what may confuse caller (mdc_enqueue() or llite);. Balance refc in mdc_clear_open_replay_data() which is called when open req is not longer needed to be preserved for replay (close is running or open req is errorneous);

- fixed typo in message in mdc_fid_init();
- in mdt fixes in comments, small cleanups;
- in test-framework.sh, find "evict_client" in mdd lprocfs (not imlemented yet).

13 files changed:
lustre/fid/fid_handler.c
lustre/fld/fld_request.c
lustre/ldlm/ldlm_lib.c
lustre/llite/file.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_handler.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/mgs/mgs_handler.c
lustre/tests/test-framework.sh

index a0068d2..703c47b 100644 (file)
@@ -64,23 +64,23 @@ int seq_server_set_cli(struct lu_server_seq *seq,
                 RETURN(0);
         }
 
-        if (seq->lss_cli) {
+        /*
+         * Ask client for new range, assign that range to ->seq_space and write
+         * seq state to backing store should be atomic.
+         */
+        down(&seq->lss_sem);
+
+        if (seq->lss_cli != NULL) {
                 CERROR("%s: Sequence-controller is already "
                        "assigned\n", seq->lss_name);
-                RETURN(-EINVAL);
+                GOTO(out_up, rc = -EINVAL);
         }
 
         CDEBUG(D_INFO|D_WARNING, "%s: Attached "
                "sequence client %s\n", seq->lss_name,
                cli->lcs_name);
 
-        /*
-         * Asking client for new range, assign that range to ->seq_space and
-         * write seq state to backing store should be atomic.
-         */
-        down(&seq->lss_sem);
-
-        /* Assign controller */
+        /* Assign controller. */
         seq->lss_cli = cli;
 
         /*
@@ -90,10 +90,9 @@ int seq_server_set_cli(struct lu_server_seq *seq,
         if (range_is_zero(&seq->lss_space)) {
                 rc = seq_client_alloc_super(cli, env);
                 if (rc) {
-                        up(&seq->lss_sem);
                         CERROR("%s: Can't allocate super-sequence, "
                                "rc %d\n", seq->lss_name, rc);
-                        RETURN(rc);
+                        GOTO(out_up, rc);
                 }
 
                 /* take super-seq from client seq mgr */
@@ -109,8 +108,10 @@ int seq_server_set_cli(struct lu_server_seq *seq,
                 }
         }
 
+        EXIT;
+out_up:
         up(&seq->lss_sem);
-        RETURN(rc);
+        return rc;
 }
 EXPORT_SYMBOL(seq_server_set_cli);
 
@@ -235,8 +236,8 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                 if (range_is_exhausted(space)) {
                         if (!seq->lss_cli) {
                                 CERROR("%s: No sequence controller client "
-                                       "is setup\n", seq->lss_name);
-                                RETURN(-EOPNOTSUPP);
+                                       "is setup.\n", seq->lss_name);
+                                RETURN(-ENODEV);
                         }
 
                         rc = seq_client_alloc_super(seq->lss_cli, env);
index 2cfb56d..7d21c3d 100644 (file)
@@ -74,6 +74,21 @@ fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
                         RETURN(target);
         }
 
+        CERROR("%s: Can't find target by hash %d (seq "LPX64"). "
+               "Targets (%d):\n", fld->lcf_name, hash, seq,
+               fld->lcf_count);
+
+        list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
+                const char *srv_name = target->ft_srv ?
+                        target->ft_srv->lsf_name : "<null>";
+                const char *exp_name = target->ft_exp ?
+                        target->ft_exp->exp_obd->obd_uuid->uuid : "<null>";
+                
+                CERROR("  exp: 0x%p (%s), srv: 0x%p (%s), idx: %d\n",
+                       target->ft_exp, exp_name, target->ft_srv,
+                       srv_name, target->ft_idx);
+        }
+        
         /*
          * If target is not found, there is logical error anyway, so here is
          * LBUG() to catch this situation.
@@ -143,8 +158,8 @@ int fld_client_add_target(struct lu_client_fld *fld,
         LASSERT(name != NULL);
         LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
 
-        CDEBUG(D_INFO|D_WARNING, "%s: Adding target %s\n",
-              fld->lcf_name, name);
+        CDEBUG(D_INFO|D_WARNING, "%s: Adding target %s (idx %d)\n",
+              fld->lcf_name, name, tar->ft_idx);
 
         OBD_ALLOC_PTR(target);
         if (target == NULL)
index 74911f3..5ef80a2 100644 (file)
@@ -568,11 +568,15 @@ int target_handle_connect(struct ptlrpc_request *req)
         target = class_uuid2obd(&tgtuuid);
         if (!target)
                 target = class_name2obd(str);
-        
-        if (!target || target->obd_stopping || !target->obd_set_up) {
+
+        /*
+         * Do not accept connections if obd is not set up, or not configured
+         * yet.
+         */
+        if (!target || target->obd_stopping || !target->obd_set_up ||
+            !target->obd_configured) {
                 DEBUG_REQ(D_ERROR, req, "UUID '%s' is not available "
-                          " for connect (%s)", str,
-                          !target ? "no target" :
+                          " for connect (%s)", str, !target ? "no target" :
                           (target->obd_stopping ? "stopping" : "not set up"));
                 GOTO(out, rc = -ENODEV);
         }
index 8536d9e..fdcfa1d 100644 (file)
@@ -156,7 +156,7 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
         EXIT;
 out:
         md_clear_open_replay_data(md_exp, och);
-        if (epoch_close)
+        if (epoch_close || !(och->och_flags & FMODE_WRITE))
                 och->och_fh.cookie = DEAD_HANDLE_MAGIC;
         return rc;
 }
index 3ea4b49..31f1bf7 100644 (file)
@@ -2476,6 +2476,8 @@ int lmv_clear_open_replay_data(struct obd_export *exp,
         ENTRY;
 
         tgt_exp = lmv_get_export(lmv, och->och_fid);
+        if (IS_ERR(tgt_exp))
+                RETURN(PTR_ERR(tgt_exp));
 
         RETURN(md_clear_open_replay_data(tgt_exp, och));
 }
index 30edcfe..b6658da 100644 (file)
@@ -247,8 +247,12 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req,
         if (new_msg != NULL) {
                 struct lustre_msg *old_msg = req->rq_reqmsg;
 
-                DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
+                DEBUG_REQ(D_INFO, req, "Replace reqmsg for larger EA %u\n",
                           body->eadatasize);
+
+                CDEBUG(D_INFO, "Copy old msg of size %d to new allocated msg "
+                       "of size %d\n", old_size, new_size);
+                
                 memcpy(new_msg, old_msg, old_size);
                 lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 4,
                                       body->eadatasize);
@@ -260,6 +264,8 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req,
 
                 OBD_FREE(old_msg, old_size);
         } else {
+                CERROR("Can't allocate new msg, size %d\n",
+                       new_size);
                 body->valid &= ~OBD_MD_FLEASIZE;
                 body->eadatasize = 0;
         }
@@ -299,7 +305,6 @@ int mdc_enqueue(struct obd_export *exp,
                                                    cl_max_mds_easize };
         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
         int repbufcnt = 4, ea_off, rc;
-        void *eadata;
         ENTRY;
 
         LASSERTF(lock_type == LDLM_IBITS, "lock type %d\n", lock_type);
@@ -511,8 +516,8 @@ int mdc_enqueue(struct obd_export *exp,
         /* We know what to expect, so we do any byte flipping required here */
         LASSERT(repbufcnt == 7 || repbufcnt == 6 || repbufcnt == 2);
         if (repbufcnt >= 6) {
-                struct mdt_body *body;
                 int reply_off = DLM_REPLY_REC_OFF;
+                struct mdt_body *body;
 
                 body = lustre_swab_repbuf(req, reply_off++, sizeof(*body),
                                          lustre_swab_mdt_body);
@@ -521,54 +526,57 @@ int mdc_enqueue(struct obd_export *exp,
                         RETURN (-EPROTO);
                 }
 
-                /* If this is a successful OPEN request, we need to set
-                   replay handler and data early, so that if replay happens
-                   immediately after swabbing below, new reply is swabbed
-                   by that handler correctly */
-                if (it_disposition(it, DISP_OPEN_OPEN) &&
-                    !it_open_error(DISP_OPEN_OPEN, it))
+                if (req->rq_replay && it_disposition(it, DISP_OPEN_OPEN) &&
+                    !it_open_error(DISP_OPEN_OPEN, it)) {
+                        /*
+                         * If this is a successful OPEN request, we need to set
+                         * replay handler and data early, so that if replay
+                         * happens immediately after swabbing below, new reply
+                         * is swabbed by that handler correctly.
+                         */
                         mdc_set_open_replay_data(NULL, NULL, req);
-
-                if ((body->valid & OBD_MD_FLDIREA) != 0) {
-                        if (body->eadatasize) {
-                                eadata = lustre_swab_repbuf(req, reply_off++,
-                                                            body->eadatasize, NULL);
-                                if (eadata == NULL) {
-                                        CERROR ("Missing/short eadata\n");
-                                        RETURN (-EPROTO);
-                                }
-                        }
                 }
-                if ((body->valid & OBD_MD_FLEASIZE)) {
-                        /* The eadata is opaque; just check that it is there.
-                         * Eventually, obd_unpackmd() will check the contents */
+
+                if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
+                        void *eadata;
+                        
+                        /*
+                         * The eadata is opaque; just check that it is there.
+                         * Eventually, obd_unpackmd() will check the contents.
+                         */
                         eadata = lustre_swab_repbuf(req, reply_off++,
                                                     body->eadatasize, NULL);
                         if (eadata == NULL) {
-                                CERROR ("Missing/short eadata\n");
-                                RETURN (-EPROTO);
+                                CERROR("Missing/short eadata\n");
+                                RETURN(-EPROTO);
                         }
                         if (body->valid & OBD_MD_FLMODEASIZE) {
                                 if (obddev->u.cli.cl_max_mds_easize < 
-                                                        body->max_mdsize) {
+                                    body->max_mdsize) {
                                         obddev->u.cli.cl_max_mds_easize = 
                                                 body->max_mdsize;
                                         CDEBUG(D_INFO, "maxeasize become %d\n",
                                                body->max_mdsize);
                                 }
                                 if (obddev->u.cli.cl_max_mds_cookiesize <
-                                                        body->max_cookiesize) {
+                                    body->max_cookiesize) {
                                         obddev->u.cli.cl_max_mds_cookiesize =
                                                 body->max_cookiesize;
                                         CDEBUG(D_INFO, "cookiesize become %d\n",
                                                body->max_cookiesize);
                                 }
                         }
-                        /* We save the reply LOV EA in case we have to replay
-                         * a create for recovery.  If we didn't allocate a
-                         * large enough request buffer above we need to
-                         * reallocate it here to hold the actual LOV EA. */
-                        if (it->it_op & IT_OPEN) {
+                        
+                        /*
+                         * We save the reply LOV EA in case we have to replay a
+                         * create for recovery.  If we didn't allocate a large
+                         * enough request buffer above we need to reallocate it
+                         * here to hold the actual LOV EA.
+                         *
+                         * To not save LOV EA if request is not going to replay
+                         * (for example error one).
+                         */
+                        if ((it->it_op & IT_OPEN) && req->rq_replay) {
                                 if (lustre_msg_buflen(req->rq_reqmsg,
                                                       DLM_INTENT_REC_OFF + 4) <
                                     body->eadatasize)
index b01a7ec..8209f23 100644 (file)
@@ -675,8 +675,7 @@ static void mdc_replay_open(struct ptlrpc_request *req)
                 if (och != NULL)
                         LASSERT(!memcmp(&old, &epoch->handle, sizeof(old)));
                 DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
-                memcpy(&epoch->handle, &body->handle,
-                       sizeof(epoch->handle));
+                memcpy(&epoch->handle, &body->handle, sizeof(epoch->handle));
         }
 
         EXIT;
@@ -695,36 +694,40 @@ int mdc_set_open_replay_data(struct obd_export *exp,
                                                sizeof(*body));
         ENTRY;
 
-        /* incoming message in my byte order (it's been swabbed) */
         LASSERT(rec != NULL);
+
+        /* Incoming message in my byte order (it's been swabbed). */
         LASSERT_REPSWABBED(open_req, DLM_REPLY_REC_OFF);
-        /* outgoing messages always in my byte order */
+
+        /* Outgoing messages always in my byte order. */
         LASSERT(body != NULL);
 
         if (och) {
                 OBD_ALLOC(mod, sizeof(*mod));
                 if (mod == NULL) {
-                        DEBUG_REQ(D_ERROR, open_req, "can't allocate mdc_open_data");
+                        DEBUG_REQ(D_ERROR, open_req,
+                                  "Can't allocate mdc_open_data");
                         RETURN(0);
                 }
 
                 och->och_mod = mod;
                 mod->mod_och = och;
-                mod->mod_open_req = open_req;
                 open_req->rq_cb_data = mod;
                 open_req->rq_commit_cb = mdc_commit_open;
+                mod->mod_open_req = ptlrpc_request_addref(open_req);
+                
         }
 
         rec->cr_fid2 = body->fid1;
         rec->cr_ioepoch = body->ioepoch;
         open_req->rq_replay_cb = mdc_replay_open;
         if (!fid_is_sane(&body->fid1)) {
-                DEBUG_REQ(D_ERROR, open_req, "saving replay request with "
+                DEBUG_REQ(D_ERROR, open_req, "Saving replay request with "
                           "insane fid");
                 LBUG();
         }
 
-        DEBUG_REQ(D_HA, open_req, "set up replay data");
+        DEBUG_REQ(D_HA, open_req, "Set up open replay data");
         RETURN(0);
 }
 
@@ -734,13 +737,18 @@ int mdc_clear_open_replay_data(struct obd_export *exp,
         struct mdc_open_data *mod = och->och_mod;
         ENTRY;
 
-        /* Don't free the structure now (it happens in mdc_commit_open, after
+        /*
+         * Don't free the structure now (it happens in mdc_commit_open(), after
          * we're sure we won't need to fix up the close request in the future),
          * but make sure that replay doesn't poke at the och, which is about to
-         * be freed. */
+         * be freed.
+         */
         LASSERT(mod != LP_POISON);
         if (mod != NULL)
+                if (mod->mod_open_req != NULL)
+                        ptlrpc_req_finished(mod->mod_open_req);
                 mod->mod_och = NULL;
+
         och->och_mod = NULL;
         RETURN(0);
 }
@@ -768,8 +776,10 @@ static void mdc_commit_close(struct ptlrpc_request *req)
         LASSERT(open_req->rq_transno != 0);
         LASSERT(open_req->rq_import == imp);
 
-        /* We no longer want to preserve this for transno-unconditional
-         * replay. */
+        /*
+         * We no longer want to preserve this for transno-unconditional
+         * replay. Decref open req here as well.
+         */
         spin_lock(&open_req->rq_lock);
         open_req->rq_replay = 0;
         spin_unlock(&open_req->rq_lock);
@@ -1369,7 +1379,7 @@ static int mdc_fid_init(struct obd_export *exp)
         /* pre-allocate meta-sequence */
         rc = seq_client_alloc_meta(cli->cl_seq, NULL);
         if (rc) {
-                CERROR("can't allocate new mata-sequence, "
+                CERROR("Can't allocate new meta-sequence, "
                        "rc %d\n", rc);
                 GOTO(out_free_seq, rc);
         }
index 5ebcadf..a3a4576 100644 (file)
@@ -3011,7 +3011,7 @@ static int mdd_open(const struct lu_env *env, struct md_object *obj,
 
         rc = mdd_open_sanity_check(env, mdd_obj, flags);
         if (rc == 0)
-                mdd_obj->mod_count ++;
+                mdd_obj->mod_count++;
 
         mdd_write_unlock(env, mdd_obj);
         return rc;
index 8fff090..ad54ac0 100644 (file)
@@ -563,11 +563,11 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 RETURN(err_serious(-EFAULT));
 
         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
-                        PFID(mdt_object_fid(parent)), name, ldlm_rep);
+               PFID(mdt_object_fid(parent)), name, ldlm_rep);
 
         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
         if (strlen(name) == 0) {
-                /* only getattr on the child. parent is on another node. */
+                /* Only getattr on the child. Parent is on another node. */
                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
                 child = parent;
                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
index e289fb7..6aab3b3 100644 (file)
@@ -399,9 +399,11 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
         }
 
         if (isreg && !(ma->ma_valid & MA_LOV)) {
-                /*No EA, check whether it is will set regEA and dirEA
-                 *since in above attr get, these size might be zero,
-                 *so reset it, to retrieve the MD after create obj*/
+                /*
+                 * No EA, check whether it is will set regEA and dirEA since in
+                 * above attr get, these size might be zero, so reset it, to
+                 * retrieve the MD after create obj.
+                 */
                 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
                                                        &RMF_MDT_MD,
                                                        RCL_SERVER);
@@ -412,10 +414,10 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
         }
 
         CDEBUG(D_INODE, "after open, ma_valid bit = "LPX64" lmm_size = %d\n",
-                        ma->ma_valid, ma->ma_lmm_size);
+               ma->ma_valid, ma->ma_lmm_size);
 
         if (ma->ma_valid & MA_LOV) {
-                LASSERT(ma->ma_lmm_size);
+                LASSERT(ma->ma_lmm_size != 0);
                 repbody->eadatasize = ma->ma_lmm_size;
                 if (isdir)
                         repbody->valid |= OBD_MD_FLDIREA;
@@ -512,8 +514,7 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         repbody->aclsize = 0;
 
         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
-                                               &RMF_MDT_MD,
+        ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill, &RMF_MDT_MD,
                                                RCL_SERVER);
         ma->ma_need = MA_INODE | MA_LOV;
 
@@ -683,8 +684,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         repbody->aclsize = 0;
 
         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
-                                               &RMF_MDT_MD,
+        ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill, &RMF_MDT_MD,
                                                RCL_SERVER);
         ma->ma_need = MA_INODE | MA_LOV;
 
index 6fcee2d..1cb6f7e 100644 (file)
@@ -497,7 +497,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
                       mdt_object_child(ms), rr->rr_name, ma);
 
         EXIT;
-out_unlock_target:
+
         mdt_object_unlock_put(info, mp, lhp, rc);
 out_unlock_source:
         mdt_object_unlock_put(info, ms, lhs, rc);
index d5ffb2e..9af4807 100644 (file)
@@ -146,6 +146,9 @@ static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 GOTO(err_ops, rc = -ENOMEM);
         }
 
+        /* MGS is always configured. */
+        obd->obd_configured = 1;
+
         /* ldlm setup */
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "mgs_ldlm_client", &obd->obd_ldlm_client);
index 21ba352..e5a5b1a 100644 (file)
@@ -502,8 +502,8 @@ replay_barrier_nodf() {
 }
 
 mds_evict_client() {
-    UUID=`cat /proc/fs/lustre/mdc/${mds_svc}-mdc-*/uuid`
-    do_facet mds "echo $UUID > /proc/fs/lustre/mds/${mds_svc}/evict_client"
+    UUID=`cat /proc/fs/lustre/mdc/${mds1_svc}-mdc-*/uuid`
+    do_facet mds1 "echo $UUID > /proc/fs/lustre/mdd/${mds1_svc}/evict_client"
 }
 
 ost_evict_client() {