Whamcloud - gitweb
add ability to resend request if it isn't fit in reply buffer.
authorshadow <shadow>
Sat, 19 Sep 2009 11:13:21 +0000 (11:13 +0000)
committershadow <shadow>
Sat, 19 Sep 2009 11:13:21 +0000 (11:13 +0000)
init llog too early.

Branch b1_8
b=19526
i=nathan
i=e.mei
i=tappro

15 files changed:
lustre/ChangeLog
lustre/include/lustre_net.h
lustre/include/obd.h
lustre/liblustre/super.c
lustre/llite/llite_lib.c
lustre/lov/lov_log.c
lustre/lov/lov_obd.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c
lustre/mds/mds_lov.c
lustre/mds/mds_reint.c
lustre/ptlrpc/client.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c

index 8afeb8a..3ebd312 100644 (file)
@@ -15,6 +15,13 @@ tbd Sun Microsystems, Inc.
          of Lustre filesystem with 4K stack may cause a stack overflow. For
          more information, please refer to bugzilla 17630.
 
+Severity   : normal
+Bugzilla   : 19526
+Description: can't stat file in some situation.
+Details    : improve initialize osc date when target is added to mds and 
+             ability to resend too big getattr request is client isn't have info
+             about ost.
+
 Severity   : enhancement
 Bugzilla   : 20539
 Description: Add OEL5 support.
index 8cc7483..d7ff6c9 100644 (file)
@@ -324,6 +324,7 @@ struct ptlrpc_request {
                 rq_sent_final:1,    /* stop sending early replies */
                 rq_hp:1,            /* high priority RPC */
                 rq_at_linked:1,     /* link into service's srv_at_array */
+                rq_reply_truncate:1, /* reply is truncated */
                 rq_fake:1,          /* fake request - just for timeout only */
                 /* a copy of the request is queued to replay during recovery */
                 rq_copy_queued:1,
index 0ddc520..3229771 100644 (file)
@@ -698,6 +698,7 @@ struct lov_qos {
 struct lov_tgt_desc {
         struct list_head    ltd_kill;
         struct obd_uuid     ltd_uuid;
+        struct obd_device  *ltd_obd;
         struct obd_export  *ltd_exp;
         struct ltd_qos      ltd_qos;     /* qos info per target */
         __u32               ltd_gen;
@@ -885,6 +886,8 @@ enum llog_ctxt_id {
  * Events signalled through obd_notify() upcall-chain.
  */
 enum obd_notify_event {
+        /* device target is created */
+        OBD_NOTIFY_CREATE,
         /* DEVICE connect start */
         OBD_NOTIFY_CONNECT,
         /* Device activated */
index a8b94c4..7a5b29d 100644 (file)
@@ -2075,6 +2075,10 @@ llu_fsswop_mount(const char *source,
         obd->obd_upcall.onu_owner = &sbi->ll_lco;
         obd->obd_upcall.onu_upcall = ll_ocd_update;
 
+        /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
+         * targets */
+        obd_notify(obd, NULL, OBD_NOTIFY_CREATE, NULL);
+
         obd_register_lock_cancel_cb(obd, llu_extent_lock_cancel_cb);
 
         ocd.ocd_connect_flags = OBD_CONNECT_SRVLOCK | OBD_CONNECT_REQPORTAL |
index 6ce1b05..926aff7 100644 (file)
@@ -415,6 +415,10 @@ static int client_common_fill_super(struct super_block *sb,
         obd->obd_upcall.onu_upcall = ll_ocd_update;
         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
 
+        /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
+         * targets */
+        obd_notify(obd, NULL, OBD_NOTIFY_CREATE, NULL);
+
         obd_register_lock_cancel_cb(obd, ll_extent_lock_cancel_cb);
         obd_register_page_removal_cb(obd, ll_page_removal_cb, ll_pin_extent_cb);
 
index 0728203..368654f 100644 (file)
@@ -240,8 +240,7 @@ int lov_llog_init(struct obd_device *obd, struct obd_device *disk_obd,
                         continue;
 
                 CDEBUG(D_CONFIG, "init %s\n", lov->lov_tgts[i]->ltd_uuid.uuid);
-                child = class_find_client_obd(&lov->lov_tgts[i]->ltd_uuid,
-                                              LUSTRE_OSC_NAME, &obd->obd_uuid);
+                child = lov->lov_tgts[i]->ltd_obd;
                 if (!child) {
                         CERROR("Can't find osc\n");
                         continue;
index 69d6b6e..9d1040a 100644 (file)
@@ -231,31 +231,29 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
                 /* NULL watched means all osc's in the lov (only for syncs) */
                 /* sync event should be send lov idx as data */
                 struct lov_obd *lov = &obd->u.lov;
-                struct obd_device *tgt_obd;
-                int i;
+                int i, is_sync;
 
-                if ((ev == OBD_NOTIFY_SYNC) ||
-                    (ev == OBD_NOTIFY_SYNC_NONBLOCK))
-                        data = &i;
+                data = &i;
+                is_sync = (ev == OBD_NOTIFY_SYNC) ||
+                          (ev == OBD_NOTIFY_SYNC_NONBLOCK);
 
                 obd_getref(obd);
                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
 
+                        if (!lov->lov_tgts[i])
+                                continue;
                         /* don't send sync event if target not
                          * connected/activated */
-                        if (!lov->lov_tgts[i] ||
-                            !lov->lov_tgts[i]->ltd_active)
+                        if (is_sync &&  !lov->lov_tgts[i]->ltd_active)
                                 continue;
 
-                        tgt_obd = class_exp2obd(lov->lov_tgts[i]->ltd_exp);
-
-                        rc = obd_notify_observer(obd, tgt_obd, ev, data);
+                        rc = obd_notify_observer(obd, lov->lov_tgts[i]->ltd_obd,
+                                                 ev, data);
                         if (rc) {
                                 CERROR("%s: notify %s of %s failed %d\n",
                                        obd->obd_name,
                                        obd->obd_observer->obd_name,
-                                       tgt_obd->obd_name, rc);
-                                break;
+                                       lov->lov_tgts[i]->ltd_obd->obd_name, rc);
                         }
                 }
                 obd_putref(obd);
@@ -283,15 +281,7 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
         if (!lov->lov_tgts[index])
                 RETURN(-EINVAL);
 
-        tgt_uuid = lov->lov_tgts[index]->ltd_uuid;
-        tgt_obd = class_find_client_obd(&tgt_uuid, LUSTRE_OSC_NAME,
-                                        &obd->obd_uuid);
-
-        if (!tgt_obd) {
-                CERROR("Target %s not attached\n", obd_uuid2str(&tgt_uuid));
-                RETURN(-EINVAL);
-        }
-
+        tgt_obd = lov->lov_tgts[index]->ltd_obd;
         if (!tgt_obd->obd_set_up) {
                 CERROR("Target %s not set up\n", obd_uuid2str(&tgt_uuid));
                 RETURN(-EINVAL);
@@ -597,6 +587,7 @@ static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
 static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
                           __u32 index, int gen, int active)
 {
+        struct obd_device *tgt_obd;
         struct lov_obd *lov = &obd->u.lov;
         struct lov_tgt_desc *tgt;
         int rc;
@@ -611,14 +602,21 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
                 RETURN(-EINVAL);
         }
 
+
+        tgt_obd = class_find_client_obd(uuidp, LUSTRE_OSC_NAME,
+                                        &obd->obd_uuid);
+        if (!tgt_obd) {
+                CERROR("Target %s not attached\n", obd_uuid2str(uuidp));
+                RETURN(-EINVAL);
+        }
+
         mutex_down(&lov->lov_lock);
 
         if ((index < lov->lov_tgt_size) && (lov->lov_tgts[index] != NULL)) {
                 tgt = lov->lov_tgts[index];
                 CERROR("UUID %s already assigned at LOV target index %d\n",
                        obd_uuid2str(&tgt->ltd_uuid), index);
-                mutex_up(&lov->lov_lock);
-                RETURN(-EEXIST);
+                GOTO(err_unlock, rc = -EEXIST);
         }
 
         if (index >= lov->lov_tgt_size) {
@@ -630,10 +628,8 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
                 while (newsize < index + 1)
                         newsize = newsize << 1;
                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
-                if (newtgts == NULL) {
-                        mutex_up(&lov->lov_lock);
-                        RETURN(-ENOMEM);
-                }
+                if (newtgts == NULL)
+                        GOTO(err_unlock, rc = -ENOMEM);
 
                 if (lov->lov_tgt_size) {
                         memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
@@ -655,24 +651,21 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
         }
 
         OBD_ALLOC_PTR(tgt);
-        if (!tgt) {
-                mutex_up(&lov->lov_lock);
-                RETURN(-ENOMEM);
-        }
+        if (!tgt)
+                GOTO(err_unlock, rc = -ENOMEM);
 
         rc = lov_ost_pool_add(&lov->lov_packed, index, lov->lov_tgt_size);
-        if (rc) {
-                mutex_up(&lov->lov_lock);
-                OBD_FREE_PTR(tgt);
-                RETURN(rc);
-        }
+        if (rc)
+                GOTO(err_free_tgt, rc = -EEXIST);
 
         memset(tgt, 0, sizeof(*tgt));
+        tgt->ltd_obd = tgt_obd;
         tgt->ltd_uuid = *uuidp;
         /* XXX - add a sanity check on the generation number. */
         tgt->ltd_gen = gen;
         tgt->ltd_index = index;
         tgt->ltd_activate = active;
+
         lov->lov_tgts[index] = tgt;
         if (index >= lov->desc.ld_tgt_count)
                 lov->desc.ld_tgt_count = index + 1;
@@ -682,6 +675,8 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
         CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
                 index, tgt->ltd_gen, lov->desc.ld_tgt_count);
 
+        rc = obd_notify(obd, tgt_obd, OBD_NOTIFY_CREATE, &index);
+
         if (lov->lov_connects == 0) {
                 /* lov_connect hasn't been called yet. We'll do the
                    lov_connect_obd on this target when that fn first runs,
@@ -711,6 +706,12 @@ out:
         }
         obd_putref(obd);
         RETURN(rc);
+
+err_free_tgt:
+        OBD_FREE_PTR(tgt);
+err_unlock:
+        mutex_up(&lov->lov_lock);
+        RETURN(rc);
 }
 
 /* Schedule a target for deletion */
index 9104238..7d39a04 100644 (file)
@@ -137,5 +137,19 @@ static inline int mdc_exp_is_2_0_server(struct obd_export *exp) {
 
 static inline int mdc_req_is_2_0_server(struct ptlrpc_request *req) {
        LASSERT(req);
-        return mdc_exp_is_2_0_server(req->rq_export);
+       return mdc_exp_is_2_0_server(req->rq_export);
+}
+
+static inline void mdc_update_max_ea_from_body(struct obd_export *exp,
+                                               struct mds_body *body)
+{
+        if (body->valid & OBD_MD_FLMODEASIZE) {
+                if (exp->exp_obd->u.cli.cl_max_mds_easize < body->max_mdsize) 
+                        exp->exp_obd->u.cli.cl_max_mds_easize = 
+                                                body->max_mdsize;
+                if (exp->exp_obd->u.cli.cl_max_mds_cookiesize < 
+                                                body->max_cookiesize)
+                        exp->exp_obd->u.cli.cl_max_mds_cookiesize = 
+                                                body->max_cookiesize;
+        }
 }
index 89d7fb1..a5ad574 100644 (file)
@@ -526,6 +526,8 @@ static int mdc_finish_enqueue(struct obd_export *exp,
                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
                         void *eadata;
 
+                        mdc_update_max_ea_from_body(exp, body);
+
                         /* The eadata is opaque; just check that it is there.
                          * Eventually, obd_unpackmd() will check the contents */
                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
index bca065e..2fb34e2 100644 (file)
@@ -164,6 +164,8 @@ int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size,
         CDEBUG(D_NET, "mode: %o\n", body->mode);
 
         lustre_set_rep_swabbed(req, REPLY_REC_OFF + 1);
+        mdc_update_max_ea_from_body(exp, body);
+
         if (body->eadatasize != 0) {
                 /* reply indicates presence of eadata; check it's there... */
                 eadata = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1,
index 7e640c8..5c13f76 100644 (file)
@@ -220,6 +220,15 @@ static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
         return 0;
 }
 
+static int mds_lov_objinit(struct mds_obd *mds, __u32 index)
+{
+        __u32 page = index / OBJID_PER_PAGE();
+        __u32 off = index % OBJID_PER_PAGE();
+        obd_id *data =  mds->mds_lov_page_array[page];
+
+        return (data[off] > 0);
+}
+
 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
 {
         int rc = 0;
@@ -475,7 +484,7 @@ static int mds_lov_get_objid(struct obd_device * obd,
         off = idx % OBJID_PER_PAGE();
 
         data = mds->mds_lov_page_array[page];
-        if (data[off] < 2 || 
+        if (data[off] < 2 ||
             !(osc_exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN)) {
                 /* We never read this lastid; ask the osc */
                 struct obd_id_info lastid;
@@ -603,9 +612,7 @@ static int mds_lov_update_mds(struct obd_device *obd,
 
         /* Don't let anyone else mess with mds_lov_objids now */
         old_count = mds->mds_lov_desc.ld_tgt_count;
-        rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid);
-        if (rc)
-                GOTO(out, rc);
+        LASSERT(mds_lov_objinit(mds, idx));
 
         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n",
                idx, obd->obd_recovering, obd->obd_async_recov, old_count,
@@ -684,12 +691,9 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
                 GOTO(error_exit, rc);
         }
 
-        rc = obd_llog_init(obd, obd, NULL); 
-        if (rc) {
-                CERROR("MDS cannot register as observer of LOV %s (%d)\n",
-                       lov_name, rc);
-                GOTO(error_exit, rc);
-        }
+        /* ask lov to generate OBD_NOTIFY_CREATE events for already registered
+         * targets */
+        obd_notify(obd->u.mds.mds_osc_obd, NULL, OBD_NOTIFY_CREATE, NULL);
 
         OBD_ALLOC(data, sizeof(*data));
         if (data == NULL)
@@ -1150,6 +1154,14 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
 
         switch (ev) {
+        case OBD_NOTIFY_CREATE:
+                CWARN("MDS %s: add target %s\n",obd->obd_name,
+                      obd_uuid2str(&watched->u.cli.cl_target_uuid));
+                /* We still have to fix the lov descriptor for ost's */
+                LASSERT(data);
+                rc = mds_lov_update_desc(obd, *(__u32 *)data,
+                                          &watched->u.cli.cl_target_uuid);
+                RETURN(rc);
         /* We only handle these: */
         case OBD_NOTIFY_ACTIVE:
                 /* lov want one or more _active_ targets for work */
@@ -1181,15 +1193,8 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
                       obd->obd_name,
                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
-                /* We still have to fix the lov descriptor for ost's added
-                   after the mdt in the config log.  They didn't make it into
-                   mds_lov_connect. */
-                LASSERT(data);
-                rc = mds_lov_update_desc(obd, *(__u32 *)data,
-                                          &watched->u.cli.cl_target_uuid);
-
                 mds_allow_cli(obd, CONFIG_SYNC);
-                RETURN(rc);
+                RETURN(0);
         }
 
         rc = mds_lov_start_synchronize(obd, watched, data,
index ae52b78..d64f5e9 100644 (file)
@@ -1834,7 +1834,7 @@ void mds_shrink_body_reply(struct ptlrpc_request *req,
                            int reply_mdoff)
 {
         struct mds_body *rq_body;
-        const long have_acl = OBD_MD_FLCOOKIE | OBD_MD_FLACL;
+        const long long have_acl = OBD_MD_FLCOOKIE | OBD_MD_FLACL;
         const long have_md = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
         ENTRY;
 
index 2652070..34f585a 100644 (file)
@@ -1010,6 +1010,14 @@ static int after_reply(struct ptlrpc_request *req)
         /* NB Until this point, the whole of the incoming message,
          * including buflens, status etc is in the sender's byte order. */
 
+        if (req->rq_reply_truncate && !req->rq_no_resend) {
+                req->rq_resend = 1;
+                OBD_FREE(req->rq_repbuf, req->rq_replen);
+                req->rq_repbuf = NULL;
+                req->rq_replen = req->rq_nob_received;
+                RETURN(0);
+        }
+
         LASSERT (req->rq_nob_received <= req->rq_replen);
         rc = unpack_reply(req);
         if (rc)
@@ -2059,14 +2067,13 @@ restart:
 
                 if (req->rq_err) {
                         /* rq_status was set locally */
-                        rc = -EIO;
+                        rc = req->rq_status ? req->rq_status : -EIO;
                 }
                 else if (req->rq_intr) {
                         rc = -EINTR;
                 }
                 else if (req->rq_no_resend) {
-                        spin_unlock(&imp->imp_lock);
-                        GOTO(out, rc = -ETIMEDOUT);
+                        rc = -ETIMEDOUT;
                 }
                 else {
                         GOTO(restart, rc);
@@ -2145,6 +2152,7 @@ restart:
         if (req->rq_err) {
                 DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
                           rc, req->rq_status);
+                rc = rc ? rc : req->rq_status;
                 GOTO(out, rc = rc ? rc : -EIO);
         }
 
index 18d5eff..3853b09 100644 (file)
@@ -110,12 +110,23 @@ void reply_in_callback(lnet_event_t *ev)
 
         if (ev->status)
                 goto out_wake;
+
         if (ev->type == LNET_EVENT_UNLINK) {
                 LASSERT(ev->unlinked);
                 DEBUG_REQ(D_RPCTRACE, req, "unlink");
                 goto out_wake;
         }
 
+        if (ev->mlength < ev->rlength ) {
+                CDEBUG(D_RPCTRACE, "truncate req %p rpc %d - %d+%d\n", req,
+                       req->rq_replen, ev->rlength, ev->offset);
+                req->rq_reply_truncate = 1;
+                req->rq_replied = 1;
+                req->rq_status = -EOVERFLOW;
+                req->rq_nob_received = ev->rlength + ev->offset;
+                goto out_wake;
+        }
+
         if ((ev->offset == 0) &&
             (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
                 /* Early reply */
index f9599c0..2391db1 100644 (file)
@@ -566,6 +566,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         request->rq_resend = 0;
         request->rq_restart = 0;
         request->rq_rep_swab_mask = 0;
+        request->rq_reply_truncate = 0;
         spin_unlock(&request->rq_lock);
 
         if (!noreply) {
@@ -575,7 +576,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
                 reply_md.threshold = LNET_MD_THRESH_INF;
                 /* Manage remote for early replies */
                 reply_md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |
-                        LNET_MD_MANAGE_REMOTE;
+                        LNET_MD_MANAGE_REMOTE |
+                        LNET_MD_TRUNCATE; /* allow to make EBIG error */
                 reply_md.user_ptr  = &request->rq_reply_cbid;
                 reply_md.eq_handle = ptlrpc_eq_h;