From d1235afa64f6b8c7891f1a15d3a7e15edcf92ced Mon Sep 17 00:00:00 2001 From: rread Date: Wed, 8 Oct 2003 06:33:13 +0000 Subject: [PATCH] merge devel to lcfg --- lnet/klnds/gmlnd/gmlnd_comm.c | 45 ++++++++++------------ lnet/klnds/gmlnd/gmlnd_utils.c | 17 ++++++-- lustre/mds/mds_unlink_open.c | 66 +++++++++++++++++--------------- lustre/portals/knals/gmnal/gmnal_comm.c | 45 ++++++++++------------ lustre/portals/knals/gmnal/gmnal_utils.c | 17 ++++++-- 5 files changed, 106 insertions(+), 84 deletions(-) diff --git a/lnet/klnds/gmlnd/gmlnd_comm.c b/lnet/klnds/gmlnd/gmlnd_comm.c index 9e32145..772b9cc 100644 --- a/lnet/klnds/gmlnd/gmlnd_comm.c +++ b/lnet/klnds/gmlnd/gmlnd_comm.c @@ -38,6 +38,7 @@ gmnal_ct_thread(void *arg) { gmnal_data_t *nal_data; gm_recv_event_t *rxevent = NULL; + gm_recv_t *recv = NULL; if (!arg) { CDEBUG(D_TRACE, "NO nal_data. Exiting\n"); @@ -55,17 +56,18 @@ gmnal_ct_thread(void *arg) while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) { CDEBUG(D_NET, "waiting\n"); rxevent = gm_blocking_receive_no_spin(nal_data->gm_port); - CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent)); if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) { CDEBUG(D_INFO, "time to exit\n"); break; } + CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent)); switch (GM_RECV_EVENT_TYPE(rxevent)) { case(GM_RECV_EVENT): CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n"); + recv = (gm_recv_t*)&rxevent->recv; GMNAL_GM_UNLOCK(nal_data); - gmnal_add_rxtwe(nal_data, rxevent); + gmnal_add_rxtwe(nal_data, recv); GMNAL_GM_LOCK(nal_data); CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n"); break; @@ -109,8 +111,6 @@ gmnal_ct_thread(void *arg) int gmnal_rx_thread(void *arg) { gmnal_data_t *nal_data; - gm_recv_event_t *rxevent = NULL; - gm_recv_t *recv = NULL; void *buffer; gmnal_rxtwe_t *we = NULL; @@ -142,29 +142,26 @@ int gmnal_rx_thread(void *arg) CDEBUG(D_INFO, "Receive thread time to exit\n"); break; } - rxevent = we->rx; - CDEBUG(D_INFO, "thread got [%s]\n", gmnal_rxevent(rxevent)); - recv = (gm_recv_t*)&(rxevent->recv); - buffer = gm_ntohp(recv->buffer); - PORTAL_FREE(we, sizeof(gmnal_rxtwe_t)); + buffer = we->buffer; switch(((gmnal_msghdr_t*)buffer)->type) { case(GMNAL_SMALL_MESSAGE): - gmnal_pre_receive(nal_data, recv, + gmnal_pre_receive(nal_data, we, GMNAL_SMALL_MESSAGE); break; case(GMNAL_LARGE_MESSAGE_INIT): - gmnal_pre_receive(nal_data, recv, + gmnal_pre_receive(nal_data, we, GMNAL_LARGE_MESSAGE_INIT); break; case(GMNAL_LARGE_MESSAGE_ACK): - gmnal_pre_receive(nal_data, recv, + gmnal_pre_receive(nal_data, we, GMNAL_LARGE_MESSAGE_ACK); break; default: CDEBUG(D_ERROR, "Unsupported message type\n"); - gmnal_rx_bad(nal_data, recv, NULL); + gmnal_rx_bad(nal_data, we, NULL); } + PORTAL_FREE(we, sizeof(gmnal_rxtwe_t)); } spin_lock(&nal_data->rxthread_flag_lock); @@ -185,7 +182,7 @@ int gmnal_rx_thread(void *arg) * Deal with all endian stuff here. */ int -gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type) +gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type) { gmnal_srxd_t *srxd = NULL; void *buffer = NULL; @@ -193,15 +190,15 @@ gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type) gmnal_msghdr_t *gmnal_msghdr; ptl_hdr_t *portals_hdr; - CDEBUG(D_INFO, "nal_data [%p], recv [%p] type [%d]\n", - nal_data, recv, gmnal_type); + CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", + nal_data, we, gmnal_type); - buffer = gm_ntohp(recv->buffer);; - snode = (int)gm_ntoh_u16(recv->sender_node_id); - sport = (int)gm_ntoh_u8(recv->sender_port_id); - type = (int)gm_ntoh_u8(recv->type); - buffer = gm_ntohp(recv->buffer); - length = (int) gm_ntohl(recv->length); + buffer = we->buffer; + snode = we->snode; + sport = we->sport; + type = we->type; + buffer = we->buffer; + length = we->length; gmnal_msghdr = (gmnal_msghdr_t*)buffer; portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE); @@ -281,13 +278,13 @@ gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) * A bad message is one we don't expect or can't interpret */ int -gmnal_rx_bad(gmnal_data_t *nal_data, gm_recv_t *recv, gmnal_srxd_t *srxd) +gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd) { CDEBUG(D_TRACE, "Can't handle message\n"); if (!srxd) srxd = gmnal_rxbuffer_to_srxd(nal_data, - gm_ntohp(recv->buffer)); + we->buffer); if (srxd) { gmnal_rx_requeue_buffer(nal_data, srxd); } else { diff --git a/lnet/klnds/gmlnd/gmlnd_utils.c b/lnet/klnds/gmlnd/gmlnd_utils.c index 84fc3a0..d40a943 100644 --- a/lnet/klnds/gmlnd/gmlnd_utils.c +++ b/lnet/klnds/gmlnd/gmlnd_utils.c @@ -192,7 +192,7 @@ gmnal_get_stxd(gmnal_data_t *nal_data, int block) { gmnal_stxd_t *txd = NULL; - pid_t pid = current->pid; + pid_t pid = current->pid; CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n", @@ -825,8 +825,15 @@ gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov, } +/* + * extract info from the receive event. + * Have to do this before the next call to gm_receive + * Deal with all endian stuff here. + * Then stick work entry on list where rxthreads + * can get it to complete the receive + */ int -gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent) +gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv) { gmnal_rxtwe_t *we = NULL; @@ -837,7 +844,11 @@ gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent) CDEBUG(D_ERROR, "failed to malloc\n"); return(GMNAL_STATUS_FAIL); } - we->rx = rxevent; + we->buffer = gm_ntohp(recv->buffer); + we->snode = (int)gm_ntoh_u16(recv->sender_node_id); + we->sport = (int)gm_ntoh_u8(recv->sender_port_id); + we->type = (int)gm_ntoh_u8(recv->type); + we->length = (int)gm_ntohl(recv->length); spin_lock(&nal_data->rxtwe_lock); if (nal_data->rxtwe_tail) { diff --git a/lustre/mds/mds_unlink_open.c b/lustre/mds/mds_unlink_open.c index 82cc150..351969f 100644 --- a/lustre/mds/mds_unlink_open.c +++ b/lustre/mds/mds_unlink_open.c @@ -107,7 +107,7 @@ int mds_cleanup_orphans(struct obd_device *obd) struct mds_obd *mds = &obd->u.mds; struct obd_run_ctxt saved; struct file *file; - struct dentry *dchild; + struct dentry *dchild; struct inode *child_inode, *pending_dir = mds->mds_pending_dir->d_inode; struct l_linux_dirent *dirent, *ptr; unsigned int count = pending_dir->i_size; @@ -120,59 +120,64 @@ int mds_cleanup_orphans(struct obd_device *obd) struct mds_body *body; int lengths[3] = {sizeof(struct mds_body), mds->mds_max_mdsize, - mds->mds_max_cookiesize}; + mds->mds_max_cookiesize}; int rc = 0, rc2 = 0, item = 0; ENTRY; - + + RETURN(0); + push_ctxt(&saved, &obd->obd_ctxt, NULL); dget(mds->mds_pending_dir); mntget(mds->mds_vfsmnt); - file = dentry_open(mds->mds_pending_dir, mds->mds_vfsmnt, + file = dentry_open(mds->mds_pending_dir, mds->mds_vfsmnt, O_RDONLY | O_LARGEFILE); - if (IS_ERR(file)) + if (IS_ERR(file)) GOTO(err_open, rc2 = PTR_ERR(file)); - + OBD_ALLOC(dirent, count); if (dirent == NULL) GOTO(err_alloc_dirent, rc2 = -ENOMEM); - rc = l_readdir(file, dirent, count); + rc = l_readdir(file, dirent, count); filp_close(file, 0); if (rc < 0) GOTO(err_out, rc2 = rc); - for (ptr = dirent; (char *)ptr < (char *)dirent + rc; + for (ptr = dirent; (char *)ptr < (char *)dirent + rc; (char *)ptr += ptr->d_reclen) { - int namlen = strlen(ptr->d_name); + int namlen = strlen(ptr->d_name); if (((namlen == 1) && !strcmp(ptr->d_name, ".")) || - ((namlen == 2) && !strcmp(ptr->d_name, ".."))) + ((namlen == 2) && !strcmp(ptr->d_name, ".."))) continue; down(&pending_dir->i_sem); - dchild = lookup_one_len(ptr->d_name, mds->mds_pending_dir, namlen); + dchild = lookup_one_len(ptr->d_name, mds->mds_pending_dir, + namlen); if (IS_ERR(dchild)) { up(&pending_dir->i_sem); GOTO(err_out, rc2 = PTR_ERR(dchild)); } if (!dchild->d_inode) { - CDEBUG(D_ERROR, "orphan %s has been deleted\n", ptr->d_name); + CDEBUG(D_ERROR, "orphan %s has been deleted\n", + ptr->d_name); GOTO(next, rc2 = 0); } child_inode = dchild->d_inode; - if (mds_inode_is_orphan(child_inode) && + if (mds_inode_is_orphan(child_inode) && mds_open_orphan_count(child_inode)) { - CDEBUG(D_ERROR, "orphan %s was re-opened during recovery\n", - ptr->d_name); + CDEBUG(D_ERROR, "orphan %s was re-opened during " + "recovery\n", ptr->d_name); GOTO(next, rc2 = 0); } - CDEBUG(D_ERROR, "cleanup orphan %s start on mds and ost:\n", ptr->d_name); + CDEBUG(D_ERROR, "cleanup orphan %s start on mds and ost:\n", + ptr->d_name); LASSERT(mds->mds_osc_obd != NULL); - OBD_ALLOC(req, sizeof(*req)); + OBD_ALLOC(req, sizeof(*req)); if (!req) { CERROR("request allocation out of memory\n"); GOTO(err_lov_conn, rc2 = -ENOMEM); @@ -192,18 +197,18 @@ int mds_cleanup_orphans(struct obd_device *obd) lmm = lustre_msg_buf(req->rq_repmsg, 1, 0); #ifdef ENABLE_ORPHANS - if (mds_log_op_unlink(obd, child_inode, - req->rq_repmsg, 1) > 0) + if (mds_log_op_unlink(obd, child_inode, req->rq_repmsg, 1) > 0) oa->o_valid |= OBD_MD_FLCOOKIE; #endif - rc2 = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, body->eadatasize); + rc2 = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, + body->eadatasize); if (rc2 < 0) { CERROR("Error unpack md %p\n", lmm); GOTO(out_free_req, rc2 = 0); } else { LASSERT(rc2 >= sizeof(*lsm)); - rc2 = 0; + rc2 = 0; } oa = obdo_alloc(); @@ -220,24 +225,24 @@ int mds_cleanup_orphans(struct obd_device *obd) lustre_msg_buf(req->rq_repmsg, 2, sizeof(struct llog_cookie) * lsm->lsm_stripe_count); - if (oti.oti_logcookies == NULL) + if (oti.oti_logcookies == NULL) oa->o_valid &= ~OBD_MD_FLCOOKIE; } #endif rc2 = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti); - obdo_free(oa); + obdo_free(oa); if (rc2) { - CERROR("destroy orphan objid 0x"LPX64" on ost error %d\n", - lsm->lsm_object_id, rc2); + CERROR("destroy orphan objid 0x"LPX64" on ost error " + "%d\n", lsm->lsm_object_id, rc2); GOTO(out_free_memmd, rc2 = 0); } item ++; - CDEBUG(D_ERROR, "removed orphan %s object from ost successlly!\n", + CDEBUG(D_ERROR, "removed orphan %s object from ost\n", ptr->d_name); - handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK, NULL); + handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK, NULL); if (IS_ERR(handle)) { rc2 = PTR_ERR(handle); CERROR("error fsfilt_start: %d\n", rc2); @@ -247,18 +252,19 @@ int mds_cleanup_orphans(struct obd_device *obd) rc2 = vfs_unlink(pending_dir, dchild); if (rc2) { CERROR("error unlinking orphan from PENDING directory"); - CERROR("%s: rc %d\n", ptr->d_name, rc2); + CERROR("%s: rc %d\n", ptr->d_name, rc2); } if (handle) { int err = fsfilt_commit(obd, pending_dir, handle, 0); if (err) { - CERROR("error committing orphan unlink: %d\n", err); + CERROR("error committing orphan unlink: %d\n", + err); rc2 = err; GOTO(err_alloc_oa, rc2); } } - CDEBUG(D_ERROR, "removed orphan %s from mds successfully!\n", + CDEBUG(D_ERROR, "removed orphan %s from mds successfully!\n", ptr->d_name); out_free_memmd: diff --git a/lustre/portals/knals/gmnal/gmnal_comm.c b/lustre/portals/knals/gmnal/gmnal_comm.c index 9e32145..772b9cc 100644 --- a/lustre/portals/knals/gmnal/gmnal_comm.c +++ b/lustre/portals/knals/gmnal/gmnal_comm.c @@ -38,6 +38,7 @@ gmnal_ct_thread(void *arg) { gmnal_data_t *nal_data; gm_recv_event_t *rxevent = NULL; + gm_recv_t *recv = NULL; if (!arg) { CDEBUG(D_TRACE, "NO nal_data. Exiting\n"); @@ -55,17 +56,18 @@ gmnal_ct_thread(void *arg) while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) { CDEBUG(D_NET, "waiting\n"); rxevent = gm_blocking_receive_no_spin(nal_data->gm_port); - CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent)); if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) { CDEBUG(D_INFO, "time to exit\n"); break; } + CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent)); switch (GM_RECV_EVENT_TYPE(rxevent)) { case(GM_RECV_EVENT): CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n"); + recv = (gm_recv_t*)&rxevent->recv; GMNAL_GM_UNLOCK(nal_data); - gmnal_add_rxtwe(nal_data, rxevent); + gmnal_add_rxtwe(nal_data, recv); GMNAL_GM_LOCK(nal_data); CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n"); break; @@ -109,8 +111,6 @@ gmnal_ct_thread(void *arg) int gmnal_rx_thread(void *arg) { gmnal_data_t *nal_data; - gm_recv_event_t *rxevent = NULL; - gm_recv_t *recv = NULL; void *buffer; gmnal_rxtwe_t *we = NULL; @@ -142,29 +142,26 @@ int gmnal_rx_thread(void *arg) CDEBUG(D_INFO, "Receive thread time to exit\n"); break; } - rxevent = we->rx; - CDEBUG(D_INFO, "thread got [%s]\n", gmnal_rxevent(rxevent)); - recv = (gm_recv_t*)&(rxevent->recv); - buffer = gm_ntohp(recv->buffer); - PORTAL_FREE(we, sizeof(gmnal_rxtwe_t)); + buffer = we->buffer; switch(((gmnal_msghdr_t*)buffer)->type) { case(GMNAL_SMALL_MESSAGE): - gmnal_pre_receive(nal_data, recv, + gmnal_pre_receive(nal_data, we, GMNAL_SMALL_MESSAGE); break; case(GMNAL_LARGE_MESSAGE_INIT): - gmnal_pre_receive(nal_data, recv, + gmnal_pre_receive(nal_data, we, GMNAL_LARGE_MESSAGE_INIT); break; case(GMNAL_LARGE_MESSAGE_ACK): - gmnal_pre_receive(nal_data, recv, + gmnal_pre_receive(nal_data, we, GMNAL_LARGE_MESSAGE_ACK); break; default: CDEBUG(D_ERROR, "Unsupported message type\n"); - gmnal_rx_bad(nal_data, recv, NULL); + gmnal_rx_bad(nal_data, we, NULL); } + PORTAL_FREE(we, sizeof(gmnal_rxtwe_t)); } spin_lock(&nal_data->rxthread_flag_lock); @@ -185,7 +182,7 @@ int gmnal_rx_thread(void *arg) * Deal with all endian stuff here. */ int -gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type) +gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type) { gmnal_srxd_t *srxd = NULL; void *buffer = NULL; @@ -193,15 +190,15 @@ gmnal_pre_receive(gmnal_data_t *nal_data, gm_recv_t *recv, int gmnal_type) gmnal_msghdr_t *gmnal_msghdr; ptl_hdr_t *portals_hdr; - CDEBUG(D_INFO, "nal_data [%p], recv [%p] type [%d]\n", - nal_data, recv, gmnal_type); + CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", + nal_data, we, gmnal_type); - buffer = gm_ntohp(recv->buffer);; - snode = (int)gm_ntoh_u16(recv->sender_node_id); - sport = (int)gm_ntoh_u8(recv->sender_port_id); - type = (int)gm_ntoh_u8(recv->type); - buffer = gm_ntohp(recv->buffer); - length = (int) gm_ntohl(recv->length); + buffer = we->buffer; + snode = we->snode; + sport = we->sport; + type = we->type; + buffer = we->buffer; + length = we->length; gmnal_msghdr = (gmnal_msghdr_t*)buffer; portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE); @@ -281,13 +278,13 @@ gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) * A bad message is one we don't expect or can't interpret */ int -gmnal_rx_bad(gmnal_data_t *nal_data, gm_recv_t *recv, gmnal_srxd_t *srxd) +gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd) { CDEBUG(D_TRACE, "Can't handle message\n"); if (!srxd) srxd = gmnal_rxbuffer_to_srxd(nal_data, - gm_ntohp(recv->buffer)); + we->buffer); if (srxd) { gmnal_rx_requeue_buffer(nal_data, srxd); } else { diff --git a/lustre/portals/knals/gmnal/gmnal_utils.c b/lustre/portals/knals/gmnal/gmnal_utils.c index 84fc3a0..d40a943 100644 --- a/lustre/portals/knals/gmnal/gmnal_utils.c +++ b/lustre/portals/knals/gmnal/gmnal_utils.c @@ -192,7 +192,7 @@ gmnal_get_stxd(gmnal_data_t *nal_data, int block) { gmnal_stxd_t *txd = NULL; - pid_t pid = current->pid; + pid_t pid = current->pid; CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n", @@ -825,8 +825,15 @@ gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov, } +/* + * extract info from the receive event. + * Have to do this before the next call to gm_receive + * Deal with all endian stuff here. + * Then stick work entry on list where rxthreads + * can get it to complete the receive + */ int -gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent) +gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv) { gmnal_rxtwe_t *we = NULL; @@ -837,7 +844,11 @@ gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_event_t *rxevent) CDEBUG(D_ERROR, "failed to malloc\n"); return(GMNAL_STATUS_FAIL); } - we->rx = rxevent; + we->buffer = gm_ntohp(recv->buffer); + we->snode = (int)gm_ntoh_u16(recv->sender_node_id); + we->sport = (int)gm_ntoh_u8(recv->sender_port_id); + we->type = (int)gm_ntoh_u8(recv->type); + we->length = (int)gm_ntohl(recv->length); spin_lock(&nal_data->rxtwe_lock); if (nal_data->rxtwe_tail) { -- 1.8.3.1