From: shadow Date: Sat, 19 Sep 2009 11:13:21 +0000 (+0000) Subject: add ability to resend request if it isn't fit in reply buffer. X-Git-Tag: v1_8_2_01~1^2~88 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=3a32da6bd1daf6977a28f64471d304d43be93c40;p=fs%2Flustre-release.git add ability to resend request if it isn't fit in reply buffer. init llog too early. Branch b1_8 b=19526 i=nathan i=e.mei i=tappro --- diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 8afeb8a..3ebd312 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -15,6 +15,13 @@ tbd Sun Microsystems, Inc. of Lustre filesystem with 4K stack may cause a stack overflow. For more information, please refer to bugzilla 17630. +Severity : normal +Bugzilla : 19526 +Description: can't stat file in some situation. +Details : improve initialize osc date when target is added to mds and + ability to resend too big getattr request is client isn't have info + about ost. + Severity : enhancement Bugzilla : 20539 Description: Add OEL5 support. diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 8cc7483..d7ff6c9 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -324,6 +324,7 @@ struct ptlrpc_request { rq_sent_final:1, /* stop sending early replies */ rq_hp:1, /* high priority RPC */ rq_at_linked:1, /* link into service's srv_at_array */ + rq_reply_truncate:1, /* reply is truncated */ rq_fake:1, /* fake request - just for timeout only */ /* a copy of the request is queued to replay during recovery */ rq_copy_queued:1, diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 0ddc520..3229771 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -698,6 +698,7 @@ struct lov_qos { struct lov_tgt_desc { struct list_head ltd_kill; struct obd_uuid ltd_uuid; + struct obd_device *ltd_obd; struct obd_export *ltd_exp; struct ltd_qos ltd_qos; /* qos info per target */ __u32 ltd_gen; @@ -885,6 +886,8 @@ enum llog_ctxt_id { * Events signalled through obd_notify() upcall-chain. */ enum obd_notify_event { + /* device target is created */ + OBD_NOTIFY_CREATE, /* DEVICE connect start */ OBD_NOTIFY_CONNECT, /* Device activated */ diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index a8b94c4..7a5b29d 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -2075,6 +2075,10 @@ llu_fsswop_mount(const char *source, obd->obd_upcall.onu_owner = &sbi->ll_lco; obd->obd_upcall.onu_upcall = ll_ocd_update; + /* ask lov to generate OBD_NOTIFY_CREATE events for already registered + * targets */ + obd_notify(obd, NULL, OBD_NOTIFY_CREATE, NULL); + obd_register_lock_cancel_cb(obd, llu_extent_lock_cancel_cb); ocd.ocd_connect_flags = OBD_CONNECT_SRVLOCK | OBD_CONNECT_REQPORTAL | diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 6ce1b05..926aff70 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -415,6 +415,10 @@ static int client_common_fill_super(struct super_block *sb, obd->obd_upcall.onu_upcall = ll_ocd_update; data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT; + /* ask lov to generate OBD_NOTIFY_CREATE events for already registered + * targets */ + obd_notify(obd, NULL, OBD_NOTIFY_CREATE, NULL); + obd_register_lock_cancel_cb(obd, ll_extent_lock_cancel_cb); obd_register_page_removal_cb(obd, ll_page_removal_cb, ll_pin_extent_cb); diff --git a/lustre/lov/lov_log.c b/lustre/lov/lov_log.c index 0728203..368654f 100644 --- a/lustre/lov/lov_log.c +++ b/lustre/lov/lov_log.c @@ -240,8 +240,7 @@ int lov_llog_init(struct obd_device *obd, struct obd_device *disk_obd, continue; CDEBUG(D_CONFIG, "init %s\n", lov->lov_tgts[i]->ltd_uuid.uuid); - child = class_find_client_obd(&lov->lov_tgts[i]->ltd_uuid, - LUSTRE_OSC_NAME, &obd->obd_uuid); + child = lov->lov_tgts[i]->ltd_obd; if (!child) { CERROR("Can't find osc\n"); continue; diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 69d6b6e..9d1040a 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -231,31 +231,29 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched, /* NULL watched means all osc's in the lov (only for syncs) */ /* sync event should be send lov idx as data */ struct lov_obd *lov = &obd->u.lov; - struct obd_device *tgt_obd; - int i; + int i, is_sync; - if ((ev == OBD_NOTIFY_SYNC) || - (ev == OBD_NOTIFY_SYNC_NONBLOCK)) - data = &i; + data = &i; + is_sync = (ev == OBD_NOTIFY_SYNC) || + (ev == OBD_NOTIFY_SYNC_NONBLOCK); obd_getref(obd); for (i = 0; i < lov->desc.ld_tgt_count; i++) { + if (!lov->lov_tgts[i]) + continue; /* don't send sync event if target not * connected/activated */ - if (!lov->lov_tgts[i] || - !lov->lov_tgts[i]->ltd_active) + if (is_sync && !lov->lov_tgts[i]->ltd_active) continue; - tgt_obd = class_exp2obd(lov->lov_tgts[i]->ltd_exp); - - rc = obd_notify_observer(obd, tgt_obd, ev, data); + rc = obd_notify_observer(obd, lov->lov_tgts[i]->ltd_obd, + ev, data); if (rc) { CERROR("%s: notify %s of %s failed %d\n", obd->obd_name, obd->obd_observer->obd_name, - tgt_obd->obd_name, rc); - break; + lov->lov_tgts[i]->ltd_obd->obd_name, rc); } } obd_putref(obd); @@ -283,15 +281,7 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate, if (!lov->lov_tgts[index]) RETURN(-EINVAL); - tgt_uuid = lov->lov_tgts[index]->ltd_uuid; - tgt_obd = class_find_client_obd(&tgt_uuid, LUSTRE_OSC_NAME, - &obd->obd_uuid); - - if (!tgt_obd) { - CERROR("Target %s not attached\n", obd_uuid2str(&tgt_uuid)); - RETURN(-EINVAL); - } - + tgt_obd = lov->lov_tgts[index]->ltd_obd; if (!tgt_obd->obd_set_up) { CERROR("Target %s not set up\n", obd_uuid2str(&tgt_uuid)); RETURN(-EINVAL); @@ -597,6 +587,7 @@ static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid, static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, __u32 index, int gen, int active) { + struct obd_device *tgt_obd; struct lov_obd *lov = &obd->u.lov; struct lov_tgt_desc *tgt; int rc; @@ -611,14 +602,21 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, RETURN(-EINVAL); } + + tgt_obd = class_find_client_obd(uuidp, LUSTRE_OSC_NAME, + &obd->obd_uuid); + if (!tgt_obd) { + CERROR("Target %s not attached\n", obd_uuid2str(uuidp)); + RETURN(-EINVAL); + } + mutex_down(&lov->lov_lock); if ((index < lov->lov_tgt_size) && (lov->lov_tgts[index] != NULL)) { tgt = lov->lov_tgts[index]; CERROR("UUID %s already assigned at LOV target index %d\n", obd_uuid2str(&tgt->ltd_uuid), index); - mutex_up(&lov->lov_lock); - RETURN(-EEXIST); + GOTO(err_unlock, rc = -EEXIST); } if (index >= lov->lov_tgt_size) { @@ -630,10 +628,8 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, while (newsize < index + 1) newsize = newsize << 1; OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize); - if (newtgts == NULL) { - mutex_up(&lov->lov_lock); - RETURN(-ENOMEM); - } + if (newtgts == NULL) + GOTO(err_unlock, rc = -ENOMEM); if (lov->lov_tgt_size) { memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) * @@ -655,24 +651,21 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, } OBD_ALLOC_PTR(tgt); - if (!tgt) { - mutex_up(&lov->lov_lock); - RETURN(-ENOMEM); - } + if (!tgt) + GOTO(err_unlock, rc = -ENOMEM); rc = lov_ost_pool_add(&lov->lov_packed, index, lov->lov_tgt_size); - if (rc) { - mutex_up(&lov->lov_lock); - OBD_FREE_PTR(tgt); - RETURN(rc); - } + if (rc) + GOTO(err_free_tgt, rc = -EEXIST); memset(tgt, 0, sizeof(*tgt)); + tgt->ltd_obd = tgt_obd; tgt->ltd_uuid = *uuidp; /* XXX - add a sanity check on the generation number. */ tgt->ltd_gen = gen; tgt->ltd_index = index; tgt->ltd_activate = active; + lov->lov_tgts[index] = tgt; if (index >= lov->desc.ld_tgt_count) lov->desc.ld_tgt_count = index + 1; @@ -682,6 +675,8 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp, CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n", index, tgt->ltd_gen, lov->desc.ld_tgt_count); + rc = obd_notify(obd, tgt_obd, OBD_NOTIFY_CREATE, &index); + if (lov->lov_connects == 0) { /* lov_connect hasn't been called yet. We'll do the lov_connect_obd on this target when that fn first runs, @@ -711,6 +706,12 @@ out: } obd_putref(obd); RETURN(rc); + +err_free_tgt: + OBD_FREE_PTR(tgt); +err_unlock: + mutex_up(&lov->lov_lock); + RETURN(rc); } /* Schedule a target for deletion */ diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index 9104238..7d39a04 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -137,5 +137,19 @@ static inline int mdc_exp_is_2_0_server(struct obd_export *exp) { static inline int mdc_req_is_2_0_server(struct ptlrpc_request *req) { LASSERT(req); - return mdc_exp_is_2_0_server(req->rq_export); + return mdc_exp_is_2_0_server(req->rq_export); +} + +static inline void mdc_update_max_ea_from_body(struct obd_export *exp, + struct mds_body *body) +{ + if (body->valid & OBD_MD_FLMODEASIZE) { + if (exp->exp_obd->u.cli.cl_max_mds_easize < body->max_mdsize) + exp->exp_obd->u.cli.cl_max_mds_easize = + body->max_mdsize; + if (exp->exp_obd->u.cli.cl_max_mds_cookiesize < + body->max_cookiesize) + exp->exp_obd->u.cli.cl_max_mds_cookiesize = + body->max_cookiesize; + } } diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 89d7fb1..a5ad574 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -526,6 +526,8 @@ static int mdc_finish_enqueue(struct obd_export *exp, if ((body->valid & OBD_MD_FLEASIZE) != 0) { void *eadata; + mdc_update_max_ea_from_body(exp, body); + /* The eadata is opaque; just check that it is there. * Eventually, obd_unpackmd() will check the contents */ eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1, diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index bca065e..2fb34e2 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -164,6 +164,8 @@ int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size, CDEBUG(D_NET, "mode: %o\n", body->mode); lustre_set_rep_swabbed(req, REPLY_REC_OFF + 1); + mdc_update_max_ea_from_body(exp, body); + if (body->eadatasize != 0) { /* reply indicates presence of eadata; check it's there... */ eadata = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index 7e640c8..5c13f76 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -220,6 +220,15 @@ static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index) return 0; } +static int mds_lov_objinit(struct mds_obd *mds, __u32 index) +{ + __u32 page = index / OBJID_PER_PAGE(); + __u32 off = index % OBJID_PER_PAGE(); + obd_id *data = mds->mds_lov_page_array[page]; + + return (data[off] > 0); +} + int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm) { int rc = 0; @@ -475,7 +484,7 @@ static int mds_lov_get_objid(struct obd_device * obd, off = idx % OBJID_PER_PAGE(); data = mds->mds_lov_page_array[page]; - if (data[off] < 2 || + if (data[off] < 2 || !(osc_exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN)) { /* We never read this lastid; ask the osc */ struct obd_id_info lastid; @@ -603,9 +612,7 @@ static int mds_lov_update_mds(struct obd_device *obd, /* Don't let anyone else mess with mds_lov_objids now */ old_count = mds->mds_lov_desc.ld_tgt_count; - rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid); - if (rc) - GOTO(out, rc); + LASSERT(mds_lov_objinit(mds, idx)); CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n", idx, obd->obd_recovering, obd->obd_async_recov, old_count, @@ -684,12 +691,9 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) GOTO(error_exit, rc); } - rc = obd_llog_init(obd, obd, NULL); - if (rc) { - CERROR("MDS cannot register as observer of LOV %s (%d)\n", - lov_name, rc); - GOTO(error_exit, rc); - } + /* ask lov to generate OBD_NOTIFY_CREATE events for already registered + * targets */ + obd_notify(obd->u.mds.mds_osc_obd, NULL, OBD_NOTIFY_CREATE, NULL); OBD_ALLOC(data, sizeof(*data)); if (data == NULL) @@ -1150,6 +1154,14 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev); switch (ev) { + case OBD_NOTIFY_CREATE: + CWARN("MDS %s: add target %s\n",obd->obd_name, + obd_uuid2str(&watched->u.cli.cl_target_uuid)); + /* We still have to fix the lov descriptor for ost's */ + LASSERT(data); + rc = mds_lov_update_desc(obd, *(__u32 *)data, + &watched->u.cli.cl_target_uuid); + RETURN(rc); /* We only handle these: */ case OBD_NOTIFY_ACTIVE: /* lov want one or more _active_ targets for work */ @@ -1181,15 +1193,8 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, CWARN("MDS %s: in recovery, not resetting orphans on %s\n", obd->obd_name, obd_uuid2str(&watched->u.cli.cl_target_uuid)); - /* We still have to fix the lov descriptor for ost's added - after the mdt in the config log. They didn't make it into - mds_lov_connect. */ - LASSERT(data); - rc = mds_lov_update_desc(obd, *(__u32 *)data, - &watched->u.cli.cl_target_uuid); - mds_allow_cli(obd, CONFIG_SYNC); - RETURN(rc); + RETURN(0); } rc = mds_lov_start_synchronize(obd, watched, data, diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index ae52b78..d64f5e9 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -1834,7 +1834,7 @@ void mds_shrink_body_reply(struct ptlrpc_request *req, int reply_mdoff) { struct mds_body *rq_body; - const long have_acl = OBD_MD_FLCOOKIE | OBD_MD_FLACL; + const long long have_acl = OBD_MD_FLCOOKIE | OBD_MD_FLACL; const long have_md = OBD_MD_FLEASIZE | OBD_MD_FLDIREA; ENTRY; diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 2652070..34f585a 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -1010,6 +1010,14 @@ static int after_reply(struct ptlrpc_request *req) /* NB Until this point, the whole of the incoming message, * including buflens, status etc is in the sender's byte order. */ + if (req->rq_reply_truncate && !req->rq_no_resend) { + req->rq_resend = 1; + OBD_FREE(req->rq_repbuf, req->rq_replen); + req->rq_repbuf = NULL; + req->rq_replen = req->rq_nob_received; + RETURN(0); + } + LASSERT (req->rq_nob_received <= req->rq_replen); rc = unpack_reply(req); if (rc) @@ -2059,14 +2067,13 @@ restart: if (req->rq_err) { /* rq_status was set locally */ - rc = -EIO; + rc = req->rq_status ? req->rq_status : -EIO; } else if (req->rq_intr) { rc = -EINTR; } else if (req->rq_no_resend) { - spin_unlock(&imp->imp_lock); - GOTO(out, rc = -ETIMEDOUT); + rc = -ETIMEDOUT; } else { GOTO(restart, rc); @@ -2145,6 +2152,7 @@ restart: if (req->rq_err) { DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d", rc, req->rq_status); + rc = rc ? rc : req->rq_status; GOTO(out, rc = rc ? rc : -EIO); } diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 18d5eff..3853b09 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -110,12 +110,23 @@ void reply_in_callback(lnet_event_t *ev) if (ev->status) goto out_wake; + if (ev->type == LNET_EVENT_UNLINK) { LASSERT(ev->unlinked); DEBUG_REQ(D_RPCTRACE, req, "unlink"); goto out_wake; } + if (ev->mlength < ev->rlength ) { + CDEBUG(D_RPCTRACE, "truncate req %p rpc %d - %d+%d\n", req, + req->rq_replen, ev->rlength, ev->offset); + req->rq_reply_truncate = 1; + req->rq_replied = 1; + req->rq_status = -EOVERFLOW; + req->rq_nob_received = ev->rlength + ev->offset; + goto out_wake; + } + if ((ev->offset == 0) && (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) { /* Early reply */ diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index f9599c0..2391db1 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -566,6 +566,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) request->rq_resend = 0; request->rq_restart = 0; request->rq_rep_swab_mask = 0; + request->rq_reply_truncate = 0; spin_unlock(&request->rq_lock); if (!noreply) { @@ -575,7 +576,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) reply_md.threshold = LNET_MD_THRESH_INF; /* Manage remote for early replies */ reply_md.options = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT | - LNET_MD_MANAGE_REMOTE; + LNET_MD_MANAGE_REMOTE | + LNET_MD_TRUNCATE; /* allow to make EBIG error */ reply_md.user_ptr = &request->rq_reply_cbid; reply_md.eq_handle = ptlrpc_eq_h;