X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=2d565f6394c23e0553475c15898b55321f297267;hp=85f33d51ff645fc28be44132f9fe0423f9843d37;hb=87c86d444e61e38d6454bba5700ba966dc1ac83d;hpb=5ba5b49768995332e0c94ba70ae4d65aaa2f9972 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 85f33d5..2d565f6 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -48,9 +48,10 @@ # include #endif -# include -#include +#include +#include #include +#include #include #include #include @@ -89,7 +90,9 @@ static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, if (lsm) { LASSERT(lsm->lsm_object_id); + LASSERT(lsm->lsm_object_gr); (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id); + (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr); } RETURN(lmm_size); @@ -136,7 +139,9 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, if (lmm != NULL) { /* XXX zero *lsmp? */ (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id); + (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr); LASSERT((*lsmp)->lsm_object_id); + LASSERT((*lsmp)->lsm_object_gr); } (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; @@ -180,8 +185,8 @@ static int osc_getattr_async(struct obd_export *exp, struct obdo *oa, struct osc_getattr_async_args *aa; ENTRY; - request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GETATTR, 1, - &size, NULL); + request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, + OST_GETATTR, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -207,8 +212,8 @@ static int osc_getattr(struct obd_export *exp, struct obdo *oa, int rc, size = sizeof(*body); ENTRY; - request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GETATTR, 1, - &size, NULL); + request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, + OST_GETATTR, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -251,8 +256,10 @@ static int osc_setattr(struct obd_export *exp, struct obdo *oa, int rc, size = sizeof(*body); ENTRY; - request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SETATTR, 1, &size, - NULL); + LASSERT(!(oa->o_valid & OBD_MD_FLGROUP) || oa->o_gr > 0); + + request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, + OST_SETATTR, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -261,17 +268,21 @@ static int osc_setattr(struct obd_export *exp, struct obdo *oa, request->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(request); - if (rc) - GOTO(out, rc); - - body = lustre_swab_repbuf(request, 0, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) - GOTO(out, rc = -EPROTO); + if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) { + ptlrpcd_add_req(request); + rc = 0; + } else { + rc = ptlrpc_queue_wait(request); + if (rc) + GOTO(out, rc); - memcpy(oa, &body->oa, sizeof(*oa)); + body = lustre_swab_repbuf(request, 0, sizeof(*body), + lustre_swab_ost_body); + if (body == NULL) + GOTO(out, rc = -EPROTO); + memcpy(oa, &body->oa, sizeof(*oa)); + } EXIT; out: ptlrpc_req_finished(request); @@ -281,6 +292,7 @@ out: int osc_real_create(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti) { + struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc; struct ptlrpc_request *request; struct ost_body *body; struct lov_stripe_md *lsm; @@ -297,8 +309,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, RETURN(rc); } - request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_CREATE, 1, &size, - NULL); + request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, + OST_CREATE, 1, &size, NULL); if (!request) GOTO(out, rc = -ENOMEM); @@ -311,6 +323,8 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, oa->o_flags == OBD_FL_DELORPHAN); DEBUG_REQ(D_HA, request, "delorphan from OST integration"); + /* Don't resend the delorphan request */ + request->rq_no_resend = request->rq_no_delay = 1; } rc = ptlrpc_queue_wait(request); @@ -324,6 +338,16 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, GOTO (out_req, rc = -EPROTO); } + if ((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_DELORPHAN) { + struct obd_import *imp = class_exp2cliimp(exp); + /* MDS declares last known object, OSS responses + * with next possible object -bzzz */ + spin_lock(&oscc->oscc_lock); + oscc->oscc_next_id = body->oa.o_id; + spin_unlock(&oscc->oscc_lock); + CDEBUG(D_HA, "%s: set nextid "LPD64" after recovery\n", + imp->imp_target_uuid.uuid, oa->o_id); + } memcpy(oa, &body->oa, sizeof(*oa)); /* This should really be sent by the OST */ @@ -335,6 +359,7 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, * This needs to be fixed in a big way. */ lsm->lsm_object_id = oa->o_id; + lsm->lsm_object_gr = oa->o_gr; *ea = lsm; if (oti != NULL) { @@ -372,8 +397,8 @@ static int osc_punch(struct obd_export *exp, struct obdo *oa, RETURN(-EINVAL); } - request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_PUNCH, 1, &size, - NULL); + request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, + OST_PUNCH, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -407,7 +432,8 @@ static int osc_punch(struct obd_export *exp, struct obdo *oa, } static int osc_sync(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, obd_size start, obd_size end) + struct lov_stripe_md *md, obd_size start, + obd_size end) { struct ptlrpc_request *request; struct ost_body *body; @@ -419,8 +445,8 @@ static int osc_sync(struct obd_export *exp, struct obdo *oa, RETURN(-EINVAL); } - request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SYNC, 1, &size, - NULL); + request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, + OST_SYNC, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -466,8 +492,8 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa, RETURN(-EINVAL); } - request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_DESTROY, 1, - &size, NULL); + request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, + OST_DESTROY, 1, &size, NULL); if (!request) RETURN(-ENOMEM); @@ -482,29 +508,38 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa, memcpy(&body->oa, oa, sizeof(*oa)); request->rq_replen = lustre_msg_size(1, &size); - rc = ptlrpc_queue_wait(request); - if (rc) - GOTO(out, rc); + if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) { + ptlrpcd_add_req(request); + rc = 0; + } else { + rc = ptlrpc_queue_wait(request); + + if (rc == -ENOENT) + rc = 0; - body = lustre_swab_repbuf(request, 0, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR ("Can't unpack body\n"); - GOTO (out, rc = -EPROTO); - } + if (rc) { + ptlrpc_req_finished(request); + RETURN(rc); + } - memcpy(oa, &body->oa, sizeof(*oa)); + body = lustre_swab_repbuf(request, 0, sizeof(*body), + lustre_swab_ost_body); + if (body == NULL) { + CERROR ("Can't unpack body\n"); + ptlrpc_req_finished(request); + RETURN(-EPROTO); + } - EXIT; - out: - ptlrpc_req_finished(request); - return rc; + memcpy(oa, &body->oa, sizeof(*oa)); + ptlrpc_req_finished(request); + } + RETURN(rc); } static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, long writing_bytes) { - obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT; + obd_valid bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT; LASSERT(!(oa->o_valid & bits)); @@ -531,6 +566,11 @@ static void osc_consume_write_grant(struct client_obd *cli, LASSERT(cli->cl_avail_grant >= 0); } +static unsigned long rpcs_in_flight(struct client_obd *cli) +{ + return cli->cl_r_in_flight + cli->cl_w_in_flight; +} + /* caller must hold loi_list_lock */ void osc_wake_cache_waiters(struct client_obd *cli) { @@ -547,12 +587,10 @@ void osc_wake_cache_waiters(struct client_obd *cli) /* if still dirty cache but no grant wait for pending RPCs that * may yet return us some grant before doing sync writes */ - if (cli->cl_brw_in_flight && cli->cl_avail_grant < PAGE_SIZE) { - CDEBUG(D_CACHE, "%d BRWs in flight, no grant\n", - cli->cl_brw_in_flight); - return; + if (cli->cl_w_in_flight && cli->cl_avail_grant < PAGE_SIZE) { + CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n", + cli->cl_w_in_flight); } - ocw = list_entry(l, struct osc_cache_waiter, ocw_entry); list_del_init(&ocw->ocw_entry); if (cli->cl_avail_grant < PAGE_SIZE) { @@ -593,7 +631,7 @@ static void handle_short_read(int nob_read, obd_count page_count, if (pga->count > nob_read) { /* EOF inside this page */ - ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK); + ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK); memset(ptr + nob_read, 0, pga->count - nob_read); kunmap(pga->pg); page_count--; @@ -608,7 +646,7 @@ static void handle_short_read(int nob_read, obd_count page_count, /* zero remaining pages */ while (page_count-- > 0) { - ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK); + ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK); memset(ptr, 0, pga->count); kunmap(pga->pg); pga++; @@ -619,7 +657,7 @@ static int check_write_rcs(struct ptlrpc_request *request, int requested_nob, int niocount, obd_count page_count, struct brw_page *pga) { - int *remote_rcs, i; + int *remote_rcs, i; /* return error if any niobuf was in error */ remote_rcs = lustre_swab_repbuf(request, 1, @@ -630,7 +668,7 @@ static int check_write_rcs(struct ptlrpc_request *request, } if (lustre_msg_swabbed(request->rq_repmsg)) for (i = 0; i < niocount; i++) - __swab32s(&remote_rcs[i]); + __swab32s((__u32 *)&remote_rcs[i]); for (i = 0; i < niocount; i++) { if (remote_rcs[i] < 0) @@ -665,7 +703,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) return 0; } - return (p1->off + p1->count == p2->off); + return (p1->disk_offset + p1->count == p2->disk_offset); } #if CHECKSUM_BULK @@ -703,7 +741,6 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, struct ost_body *body; struct obd_ioobj *ioobj; struct niobuf_remote *niobuf; - unsigned long flags; int niocount; int size[3]; int i; @@ -721,7 +758,7 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, size[1] = sizeof(*ioobj); size[2] = niocount * sizeof(*niobuf); - req = ptlrpc_prep_req(imp, opc, 3, size, NULL); + req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, opc, 3, size, NULL); if (req == NULL) return (-ENOMEM); @@ -745,29 +782,32 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, ioobj->ioo_bufcnt = niocount; LASSERT (page_count > 0); + for (requested_nob = i = 0; i < page_count; i++, niobuf++) { struct brw_page *pg = &pga[i]; struct brw_page *pg_prev = pg - 1; LASSERT(pg->count > 0); - LASSERT((pg->off & ~PAGE_MASK) + pg->count <= PAGE_SIZE); - LASSERTF(i == 0 || pg->off > pg_prev->off, + LASSERTF((pg->page_offset & ~PAGE_MASK)+ pg->count <= PAGE_SIZE, + "i: %d pg: %p pg_off: "LPU64", count: %u\n", i, pg, + pg->page_offset, pg->count); + LASSERTF(i == 0 || pg->disk_offset > pg_prev->disk_offset, "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n", i, page_count, - pg->pg, pg->pg->private, pg->pg->index, pg->off, + pg->pg, pg->pg->private, pg->pg->index, pg->disk_offset, pg_prev->pg, pg_prev->pg->private, pg_prev->pg->index, - pg_prev->off); + pg_prev->disk_offset); - ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~PAGE_MASK, - pg->count); + ptlrpc_prep_bulk_page(desc, pg->pg, + pg->page_offset & ~PAGE_MASK, pg->count); requested_nob += pg->count; if (i > 0 && can_merge_pages(pg_prev, pg)) { niobuf--; niobuf->len += pg->count; } else { - niobuf->offset = pg->off; + niobuf->offset = pg->disk_offset; niobuf->len = pg->count; niobuf->flags = pg->flag; } @@ -776,9 +816,6 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, LASSERT((void *)(niobuf - niocount) == lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf))); osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0); - spin_lock_irqsave(&req->rq_lock, flags); - req->rq_no_resend = 1; - spin_unlock_irqrestore(&req->rq_lock, flags); /* size[0] still sizeof (*body) */ if (opc == OST_WRITE) { @@ -865,12 +902,12 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, if (server_cksum != cksum) { CERROR("Bad checksum: server %x, client %x, server NID " LPX64" (%s)\n", server_cksum, cksum, - peer->peer_nid, str); + peer->peer_id.nid, str); cksum_counter = 0; oa->o_cksum = cksum; } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){ CWARN("Checksum %u from "LPX64" (%s) OK: %x\n", - cksum_counter, peer->peer_nid, str, cksum); + cksum_counter, peer->peer_id.nid, str, cksum); } } else { static int cksum_missed; @@ -879,7 +916,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, if ((cksum_missed & (-cksum_missed)) == cksum_missed) CERROR("Request checksum %u from "LPX64", no reply\n", cksum_missed, - req->rq_import->imp_connection->c_peer.peer_nid); + req->rq_import->imp_connection->c_peer.peer_id.nid); } #endif RETURN(0); @@ -899,8 +936,6 @@ restart_bulk: rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm, page_count, pga, &requested_nob, &niocount, &request); - /* NB ^ sets rq_no_resend */ - if (rc != 0) return (rc); @@ -929,13 +964,6 @@ static int brw_interpret(struct ptlrpc_request *request, struct brw_page *pga = aa->aa_pga; ENTRY; - /* XXX bug 937 here */ - if (rc == -ETIMEDOUT && request->rq_resend) { - DEBUG_REQ(D_HA, request, "BULK TIMEOUT"); - LBUG(); /* re-send. later. */ - //goto restart_bulk; - } - rc = osc_brw_fini_request(request, oa, requested_nob, niocount, page_count, pga, rc); RETURN (rc); @@ -955,8 +983,6 @@ static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa, rc = osc_brw_prep_request(cmd, class_exp2cliimp(exp), oa, lsm, page_count, pga, &requested_nob, &nio_count, &request); - /* NB ^ sets rq_no_resend */ - if (rc == 0) { LASSERT(sizeof(*aa) <= sizeof(request->rq_async_args)); aa = (struct osc_brw_async_args *)&request->rq_async_args; @@ -999,7 +1025,8 @@ static void sort_brw_pages(struct brw_page *array, int num) for (i = stride ; i < num ; i++) { tmp = array[i]; j = i; - while (j >= stride && array[j - stride].off > tmp.off) { + while (j >= stride && array[j - stride].disk_offset > + tmp.disk_offset) { array[j] = array[j - stride]; j -= stride; } @@ -1032,7 +1059,7 @@ static obd_count check_elan_limit(struct brw_page *pg, obd_count pages) } static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, obd_count page_count, + struct lov_stripe_md *lsm, obd_count page_count, struct brw_page *pga, struct obd_trans_info *oti) { ENTRY; @@ -1059,7 +1086,7 @@ static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa, sort_brw_pages(pga, pages_per_brw); pages_per_brw = check_elan_limit(pga, pages_per_brw); - rc = osc_brw_internal(cmd, exp, oa, md, pages_per_brw, pga); + rc = osc_brw_internal(cmd, exp, oa, lsm, pages_per_brw, pga); if (rc != 0) RETURN(rc); @@ -1071,7 +1098,7 @@ static int osc_brw(int cmd, struct obd_export *exp, struct obdo *oa, } static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, obd_count page_count, + struct lov_stripe_md *lsm, obd_count page_count, struct brw_page *pga, struct ptlrpc_request_set *set, struct obd_trans_info *oti) { @@ -1099,7 +1126,7 @@ static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa, sort_brw_pages(pga, pages_per_brw); pages_per_brw = check_elan_limit(pga, pages_per_brw); - rc = async_internal(cmd, exp, oa, md, pages_per_brw, pga, set); + rc = async_internal(cmd, exp, oa, lsm, pages_per_brw, pga, set); if (rc != 0) RETURN(rc); @@ -1198,9 +1225,10 @@ static int brw_interpret_oap(struct ptlrpc_request *request, struct osc_async_page *oap; struct client_obd *cli; struct list_head *pos, *n; + struct timeval now; ENTRY; - + do_gettimeofday(&now); rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob, aa->aa_nio_count, aa->aa_page_count, aa->aa_pga, rc); @@ -1220,10 +1248,22 @@ static int brw_interpret_oap(struct ptlrpc_request *request, spin_lock(&cli->cl_loi_list_lock); + if (request->rq_reqmsg->opc == OST_WRITE) + lprocfs_stime_record(&cli->cl_write_stime, &now, + &request->rq_rpcd_start); + else + lprocfs_stime_record(&cli->cl_read_stime, &now, + &request->rq_rpcd_start); + + + /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters * is called so we know whether to go to sync BRWs or wait for more * RPCs to complete */ - cli->cl_brw_in_flight--; + if (request->rq_reqmsg->opc == OST_WRITE) + cli->cl_w_in_flight--; + else + cli->cl_r_in_flight--; /* the caller may re-use the oap after the completion call so * we need to clean it up a little */ @@ -1239,7 +1279,6 @@ static int brw_interpret_oap(struct ptlrpc_request *request, osc_wake_cache_waiters(cli); osc_check_rpcs(cli); - spin_unlock(&cli->cl_loi_list_lock); obdo_free(aa->aa_oa); @@ -1281,7 +1320,8 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli, ops = oap->oap_caller_ops; caller_data = oap->oap_caller_data; } - pga[i].off = oap->oap_obj_off + oap->oap_page_off; + pga[i].disk_offset = oap->oap_obj_off + oap->oap_page_off; + pga[i].page_offset = pga[i].disk_offset; pga[i].pg = oap->oap_page; pga[i].count = oap->oap_count; pga[i].flag = oap->oap_brw_flags; @@ -1399,8 +1439,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, /* take the page out of our book-keeping */ list_del_init(&oap->oap_pending_item); lop_update_pending(cli, lop, cmd, -1); - if (!list_empty(&oap->oap_urgent_item)) - list_del_init(&oap->oap_urgent_item); + list_del_init(&oap->oap_urgent_item); /* ask the caller for the size of the io as the rpc leaves. */ if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) @@ -1467,17 +1506,20 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, #ifdef __KERNEL__ if (cmd == OBD_BRW_READ) { lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); - lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_brw_in_flight); + lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); } else { lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); lprocfs_oh_tally(&cli->cl_write_rpc_hist, - cli->cl_brw_in_flight); + cli->cl_w_in_flight); } #endif spin_lock(&cli->cl_loi_list_lock); - cli->cl_brw_in_flight++; + if (cmd == OBD_BRW_READ) + cli->cl_r_in_flight++; + else + cli->cl_w_in_flight++; /* queued sync pages can be torn down while the pages * were between the pending list and the rpc */ list_for_each(pos, &aa->aa_oaps) { @@ -1490,8 +1532,9 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, } } - CDEBUG(D_INODE, "req %p: %d pages, aa %p. now %d in flight\n", request, - page_count, aa, cli->cl_brw_in_flight); + CDEBUG(D_INODE, "req %p: %d pages, aa %p. now %dr/%dw in flight\n", + request, page_count, aa, cli->cl_r_in_flight, + cli->cl_w_in_flight); oap->oap_request = ptlrpc_request_addref(request); request->rq_interpret_reply = brw_interpret_oap; @@ -1615,9 +1658,9 @@ static void osc_check_rpcs(struct client_obd *cli) ENTRY; while ((loi = osc_next_loi(cli)) != NULL) { - LOI_DEBUG(loi, "%d in flight\n", cli->cl_brw_in_flight); - - if (cli->cl_brw_in_flight >= cli->cl_max_rpcs_in_flight) + LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli)); + + if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight) break; /* attempt some read/write balancing by alternating between @@ -1682,7 +1725,7 @@ static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw) int rc; ENTRY; spin_lock(&cli->cl_loi_list_lock); - rc = list_empty(&ocw->ocw_entry) || cli->cl_brw_in_flight == 0; + rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0; spin_unlock(&cli->cl_loi_list_lock); RETURN(rc); }; @@ -1694,6 +1737,7 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi, { struct osc_cache_waiter ocw; struct l_wait_info lwi = { 0 }; + struct timeval start, stop; CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n", cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant, @@ -1713,7 +1757,7 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi, /* Make sure that there are write rpcs in flight to wait for. This * is a little silly as this object may not have any pending but * other objects sure might. */ - if (cli->cl_brw_in_flight) { + if (cli->cl_w_in_flight) { list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters); init_waitqueue_head(&ocw.ocw_waitq); ocw.ocw_oap = oap; @@ -1724,9 +1768,11 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi, spin_unlock(&cli->cl_loi_list_lock); CDEBUG(0, "sleeping for cache space\n"); + do_gettimeofday(&start); l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi); - + do_gettimeofday(&stop); spin_lock(&cli->cl_loi_list_lock); + lprocfs_stime_record(&cli->cl_enter_stime, &stop, &start); if (!list_empty(&ocw.ocw_entry)) { list_del(&ocw.ocw_entry); RETURN(-EINTR); @@ -1794,18 +1840,10 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, RETURN(0); } -struct osc_async_page *oap_from_cookie(void *cookie) -{ - struct osc_async_page *oap = cookie; - if (oap->oap_magic != OAP_MAGIC) - return ERR_PTR(-EINVAL); - return oap; -}; - static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, void *cookie, int cmd, obd_off off, int count, - obd_flag brw_flags, enum async_flags async_flags) + obd_flags brw_flags, enum async_flags async_flags) { struct client_obd *cli = &exp->exp_obd->u.cli; struct osc_async_page *oap; @@ -1813,9 +1851,7 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm, int rc; ENTRY; - oap = oap_from_cookie(cookie); - if (IS_ERR(oap)) - RETURN(PTR_ERR(oap)); + oap = OAP_FROM_COOKIE(cookie); if (cli->cl_import == NULL || cli->cl_import->imp_invalid) RETURN(-EIO); @@ -1869,7 +1905,7 @@ static int osc_queue_async_io(struct obd_export *exp, struct lov_stripe_md *lsm, static int osc_set_async_flags(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, void *cookie, - obd_flag async_flags) + obd_flags async_flags) { struct client_obd *cli = &exp->exp_obd->u.cli; struct loi_oap_pages *lop; @@ -1877,9 +1913,7 @@ static int osc_set_async_flags(struct obd_export *exp, int rc = 0; ENTRY; - oap = oap_from_cookie(cookie); - if (IS_ERR(oap)) - RETURN(PTR_ERR(oap)); + oap = OAP_FROM_COOKIE(cookie); if (cli->cl_import == NULL || cli->cl_import->imp_invalid) RETURN(-EIO); @@ -1923,17 +1957,15 @@ static int osc_queue_group_io(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, struct obd_io_group *oig, void *cookie, int cmd, obd_off off, int count, - obd_flag brw_flags, - obd_flag async_flags) + obd_flags brw_flags, + obd_flags async_flags) { struct client_obd *cli = &exp->exp_obd->u.cli; struct osc_async_page *oap; struct loi_oap_pages *lop; ENTRY; - oap = oap_from_cookie(cookie); - if (IS_ERR(oap)) - RETURN(PTR_ERR(oap)); + oap = OAP_FROM_COOKIE(cookie); if (cli->cl_import == NULL || cli->cl_import->imp_invalid) RETURN(-EIO); @@ -2020,9 +2052,7 @@ static int osc_teardown_async_page(struct obd_export *exp, int rc = 0; ENTRY; - oap = oap_from_cookie(cookie); - if (IS_ERR(oap)) - RETURN(PTR_ERR(oap)); + oap = OAP_FROM_COOKIE(cookie); if (loi == NULL) loi = &lsm->lsm_oinfo[0]; @@ -2079,8 +2109,8 @@ static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa, size[1] = sizeof(struct obd_ioobj); size[2] = page_count * sizeof(*nioptr); - request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SAN_READ, 3, - size, NULL); + request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, + OST_SAN_READ, 3, size, NULL); if (!request) RETURN(-ENOMEM); @@ -2096,9 +2126,10 @@ static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa, for (mapped = 0; mapped < page_count; mapped++, nioptr++) { LASSERT(PageLocked(pga[mapped].pg)); - LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off); + LASSERT(mapped == 0 || + pga[mapped].disk_offset > pga[mapped - 1].disk_offset); - nioptr->offset = pga[mapped].off; + nioptr->offset = pga[mapped].disk_offset; nioptr->len = pga[mapped].count; nioptr->flags = pga[mapped].flag; } @@ -2207,8 +2238,8 @@ static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa, size[1] = sizeof(struct obd_ioobj); size[2] = page_count * sizeof(*nioptr); - request = ptlrpc_prep_req(class_exp2cliimp(exp), OST_SAN_WRITE, - 3, size, NULL); + request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, + OST_SAN_WRITE, 3, size, NULL); if (!request) RETURN(-ENOMEM); @@ -2225,9 +2256,10 @@ static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa, /* pack request */ for (mapped = 0; mapped < page_count; mapped++, nioptr++) { LASSERT(PageLocked(pga[mapped].pg)); - LASSERT(mapped == 0 || pga[mapped].off > pga[mapped - 1].off); + LASSERT(mapped == 0 || + pga[mapped].disk_offset > pga[mapped - 1].disk_offset); - nioptr->offset = pga[mapped].off; + nioptr->offset = pga[mapped].disk_offset; nioptr->len = pga[mapped].count; nioptr->flags = pga[mapped].flag; } @@ -2340,12 +2372,18 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data) { struct ldlm_lock *lock = ldlm_handle2lock(lockh); - LASSERT(lock != NULL); + if (lock == NULL) { + CERROR("lockh %p, data %p - client evicted?\n", lockh, data); + return; + } + l_lock(&lock->l_resource->lr_namespace->ns_lock); #ifdef __KERNEL__ if (lock->l_ast_data && lock->l_ast_data != data) { struct inode *new_inode = data; struct inode *old_inode = lock->l_ast_data; + if (!(old_inode->i_state & I_FREEING)) + LDLM_ERROR(lock, "inconsistent l_ast_data found"); LASSERTF(old_inode->i_state & I_FREEING, "Found existing inode %p/%lu/%u state %lu in lock: " "setting data to %p/%lu/%u\n", old_inode, @@ -2362,9 +2400,11 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data) static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, ldlm_iterator_t replace, void *data) { - struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} }; + struct ldlm_res_id res_id = { .name = {0} }; struct obd_device *obd = class_exp2obd(exp); + res_id.name[0] = lsm->lsm_object_id; + res_id.name[2] = lsm->lsm_object_gr; ldlm_change_cbdata(obd->obd_namespace, &res_id, replace, data); return 0; } @@ -2375,12 +2415,17 @@ static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, void *data, __u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh) { - struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} }; struct obd_device *obd = exp->exp_obd; + struct ldlm_res_id res_id = { .name = {0} }; struct ost_lvb lvb; + struct ldlm_reply *rep; + struct ptlrpc_request *req = NULL; int rc; ENTRY; + res_id.name[0] = lsm->lsm_object_id; + res_id.name[2] = lsm->lsm_object_gr; + /* Filesystem lock extents are extended to page boundaries so that * dealing with the page cache is a little smoother. */ policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; @@ -2393,6 +2438,12 @@ static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, mode, lockh); if (rc == 1) { + if (ptlrpcs_check_cred(obd->u.cli.cl_import)) { + /* return immediately if no credential held */ + ldlm_lock_decref(lockh, mode); + RETURN(-EACCES); + } + osc_set_data_with_check(lockh, data); if (*flags & LDLM_FL_HAS_INTENT) { /* I would like to be able to ASSERT here that rss <= @@ -2419,6 +2470,12 @@ static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, policy, LCK_PW, lockh); if (rc == 1) { + if (ptlrpcs_check_cred(obd->u.cli.cl_import)) { + /* return immediately if no credential held */ + ldlm_lock_decref(lockh, LCK_PW); + RETURN(-EACCES); + } + /* FIXME: This is not incredibly elegant, but it might * be more elegant than adding another parameter to * lock_match. I want a second opinion. */ @@ -2428,11 +2485,81 @@ static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, RETURN(ELDLM_OK); } } + if (mode == LCK_PW) { + rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, + policy, LCK_PR, lockh); + if (rc == 1) { + rc = ldlm_cli_convert(lockh, mode, flags); + if (!rc) { + /* Update readers/writers accounting */ + ldlm_lock_addref(lockh, LCK_PW); + ldlm_lock_decref(lockh, LCK_PR); + osc_set_data_with_check(lockh, data); + RETURN(ELDLM_OK); + } + /* If the conversion failed, we need to drop refcount + on matched lock before we get new one */ + /* XXX Won't it save us some efforts if we cancel PR + lock here? We are going to take PW lock anyway and it + will invalidate PR lock */ + ldlm_lock_decref(lockh, LCK_PR); + if (rc != EDEADLOCK) { + RETURN(rc); + } + } + } + + if (mode == LCK_PW) { + rc = ldlm_lock_match(obd->obd_namespace, 0, &res_id, type, + policy, LCK_PR, lockh); + if (rc == 1) { + rc = ldlm_cli_convert(lockh, mode, flags); + if (!rc) { + /* Update readers/writers accounting */ + ldlm_lock_addref(lockh, LCK_PW); + ldlm_lock_decref(lockh, LCK_PR); + osc_set_data_with_check(lockh, data); + RETURN(ELDLM_OK); + } + /* If the conversion failed, we need to drop refcount + on matched lock before we get new one */ + /* XXX Won't it save us some efforts if we cancel PR + lock here? We are going to take PW lock anyway and it + will invalidate PR lock */ + ldlm_lock_decref(lockh, LCK_PR); + if (rc != EDEADLOCK) { + RETURN(rc); + } + } + } no_match: - rc = ldlm_cli_enqueue(exp, NULL, obd->obd_namespace, res_id, type, + if (*flags & LDLM_FL_HAS_INTENT) { + int size[2] = {0, sizeof(struct ldlm_request)}; + + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION, + LDLM_ENQUEUE, 2, size, NULL); + if (req == NULL) + RETURN(-ENOMEM); + + size[0] = sizeof(*rep); + size[1] = sizeof(lvb); + req->rq_replen = lustre_msg_size(2, size); + } + rc = ldlm_cli_enqueue(exp, req, obd->obd_namespace, res_id, type, policy, mode, flags, bl_cb, cp_cb, gl_cb, data, &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh); + if (req != NULL) { + if (rc == ELDLM_LOCK_ABORTED) { + /* swabbed by ldlm_cli_enqueue() */ + LASSERT_REPSWABBED(req, 0); + rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep)); + LASSERT(rep != NULL); + if (rep->lock_policy_res1) + rc = rep->lock_policy_res1; + } + ptlrpc_req_finished(req); + } if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) { CDEBUG(D_INODE, "received kms == "LPU64", blocks == "LPU64"\n", @@ -2448,11 +2575,14 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, __u32 type, ldlm_policy_data_t *policy, __u32 mode, int *flags, void *data, struct lustre_handle *lockh) { - struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} }; + struct ldlm_res_id res_id = { .name = {0} }; struct obd_device *obd = exp->exp_obd; int rc; ENTRY; + res_id.name[0] = lsm->lsm_object_id; + res_id.name[2] = lsm->lsm_object_gr; + OBD_FAIL_RETURN(OBD_FAIL_OSC_MATCH, -EIO); /* Filesystem lock extents are extended to page boundaries so that @@ -2464,7 +2594,8 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm, rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type, policy, mode, lockh); if (rc) { - osc_set_data_with_check(lockh, data); + // if (!(*flags & LDLM_FL_TEST_LOCK)) + osc_set_data_with_check(lockh, data); RETURN(rc); } /* If we're trying to read, we also search for an existing PW lock. The @@ -2499,13 +2630,19 @@ static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md, } static int osc_cancel_unused(struct obd_export *exp, - struct lov_stripe_md *lsm, int flags, void *opaque) + struct lov_stripe_md *lsm, + int flags, void *opaque) { struct obd_device *obd = class_exp2obd(exp); - struct ldlm_res_id res_id = { .name = {lsm->lsm_object_id} }; + struct ldlm_res_id res_id = { .name = {0} }, *resp = NULL; - return ldlm_cli_cancel_unused(obd->obd_namespace, &res_id, flags, - opaque); + if (lsm != NULL) { + res_id.name[0] = lsm->lsm_object_id; + res_id.name[2] = lsm->lsm_object_gr; + resp = &res_id; + } + + return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque); } static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs, @@ -2522,7 +2659,8 @@ static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs, * during mount that would help a bit). Having relative timestamps * is not so great if request processing is slow, while absolute * timestamps are not ideal because they need time synchronization. */ - request = ptlrpc_prep_req(obd->u.cli.cl_import, OST_STATFS,0,NULL,NULL); + request = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_OBD_VERSION, + OST_STATFS, 0, NULL, NULL); if (!request) RETURN(-ENOMEM); @@ -2577,12 +2715,14 @@ static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump) RETURN(-ENOMEM); lumk->lmm_objects[0].l_object_id = lsm->lsm_object_id; + lumk->lmm_objects[0].l_object_gr = lsm->lsm_object_gr; } else { lum_size = sizeof(lum); lumk = &lum; } lumk->lmm_object_id = lsm->lsm_object_id; + lumk->lmm_object_gr = lsm->lsm_object_gr; lumk->lmm_stripe_count = 1; if (copy_to_user(lump, lumk, lum_size)) @@ -2633,6 +2773,11 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, GOTO(out, err = -EINVAL); } + if (data->ioc_inllen3 < sizeof(__u32)) { + OBD_FREE(buf, len); + GOTO(out, err = -EINVAL); + } + desc = (struct lov_desc *)data->ioc_inlbuf1; desc->ld_tgt_count = 1; desc->ld_active_tgt_count = 1; @@ -2641,8 +2786,8 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, desc->ld_default_stripe_offset = 0; desc->ld_pattern = 0; memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid)); - memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid)); + *((__u32 *)data->ioc_inlbuf3) = 1; err = copy_to_user((void *)uarg, buf, len); if (err) @@ -2668,6 +2813,10 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, err = ptlrpc_set_import_active(obd->u.cli.cl_import, data->ioc_offset); GOTO(out, err); + case IOC_OSC_CTL_RECOVERY: + err = ptlrpc_import_control_recovery(obd->u.cli.cl_import, + data->ioc_offset); + GOTO(out, err); default: CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", cmd, current->comm); GOTO(out, err = -ENOTTY); @@ -2681,7 +2830,7 @@ out: return err; } -static int osc_get_info(struct obd_export *exp, obd_count keylen, +static int osc_get_info(struct obd_export *exp, __u32 keylen, void *key, __u32 *vallen, void *val) { ENTRY; @@ -2699,12 +2848,12 @@ static int osc_get_info(struct obd_export *exp, obd_count keylen, obd_id *reply; char *bufs[1] = {key}; int rc; - req = ptlrpc_prep_req(class_exp2cliimp(exp), OST_GET_INFO, 1, - &keylen, bufs); + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, + OST_GET_INFO, 1, (int *)&keylen, bufs); if (req == NULL) RETURN(-ENOMEM); - req->rq_replen = lustre_msg_size(1, vallen); + req->rq_replen = lustre_msg_size(1, (int *)vallen); rc = ptlrpc_queue_wait(req); if (rc) GOTO(out, rc); @@ -2720,40 +2869,18 @@ static int osc_get_info(struct obd_export *exp, obd_count keylen, ptlrpc_req_finished(req); RETURN(rc); } - RETURN(-EINVAL); + RETURN(-EPROTO); } static int osc_set_info(struct obd_export *exp, obd_count keylen, void *key, obd_count vallen, void *val) { - struct ptlrpc_request *req; struct obd_device *obd = exp->exp_obd; struct obd_import *imp = class_exp2cliimp(exp); struct llog_ctxt *ctxt; - int rc, size = keylen; - char *bufs[1] = {key}; + int rc = 0; ENTRY; - if (keylen == strlen("next_id") && - memcmp(key, "next_id", strlen("next_id")) == 0) { - if (vallen != sizeof(obd_id)) - RETURN(-EINVAL); - obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1; - CDEBUG(D_INODE, "%s: set oscc_next_id = "LPU64"\n", - exp->exp_obd->obd_name, - obd->u.cli.cl_oscc.oscc_next_id); - - RETURN(0); - } - - if (keylen == strlen("growth_count") && - memcmp(key, "growth_count", strlen("growth_count")) == 0) { - if (vallen != sizeof(int)) - RETURN(-EINVAL); - obd->u.cli.cl_oscc.oscc_grow_count = *((int*)val); - RETURN(0); - } - if (keylen == strlen("unlinked") && memcmp(key, "unlinked", keylen) == 0) { struct osc_creator *oscc = &obd->u.cli.cl_oscc; @@ -2762,8 +2889,14 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen, spin_unlock(&oscc->oscc_lock); RETURN(0); } - - + if (keylen == strlen("unrecovery") && + memcmp(key, "unrecovery", keylen) == 0) { + struct osc_creator *oscc = &obd->u.cli.cl_oscc; + spin_lock(&oscc->oscc_lock); + oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING; + spin_unlock(&oscc->oscc_lock); + RETURN(0); + } if (keylen == strlen("initial_recov") && memcmp(key, "initial_recov", strlen("initial_recov")) == 0) { struct obd_import *imp = exp->exp_obd->u.cli.cl_import; @@ -2776,24 +2909,65 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen, RETURN(0); } - if (keylen < strlen("mds_conn") || - memcmp(key, "mds_conn", strlen("mds_conn")) != 0) + if (keylen == strlen("async") && + memcmp(key, "async", keylen) == 0) { + struct client_obd *cl = &obd->u.cli; + if (vallen != sizeof(int)) + RETURN(-EINVAL); + cl->cl_async = *(int *)val; + CDEBUG(D_HA, "%s: set async = %d\n", + obd->obd_name, cl->cl_async); + RETURN(0); + } + + if (keylen == strlen("sec") && + memcmp(key, "sec", keylen) == 0) { + struct client_obd *cli = &exp->exp_obd->u.cli; + + if (vallen == strlen("null") && + memcmp(val, "null", vallen) == 0) { + cli->cl_sec_flavor = PTLRPC_SEC_NULL; + cli->cl_sec_subflavor = 0; + RETURN(0); + } + if (vallen == strlen("krb5i") && + memcmp(val, "krb5i", vallen) == 0) { + cli->cl_sec_flavor = PTLRPC_SEC_GSS; + cli->cl_sec_subflavor = PTLRPC_SEC_GSS_KRB5I; + RETURN(0); + } + if (vallen == strlen("krb5p") && + memcmp(val, "krb5p", vallen) == 0) { + cli->cl_sec_flavor = PTLRPC_SEC_GSS; + cli->cl_sec_subflavor = PTLRPC_SEC_GSS_KRB5P; + RETURN(0); + } + CERROR("unrecognized security type %s\n", (char*) val); RETURN(-EINVAL); + } + if (keylen == strlen("flush_cred") && + memcmp(key, "flush_cred", keylen) == 0) { + struct client_obd *cli = &exp->exp_obd->u.cli; - req = ptlrpc_prep_req(imp, OST_SET_INFO, 1, &size, bufs); - if (req == NULL) - RETURN(-ENOMEM); + if (cli->cl_import) + ptlrpcs_import_flush_creds(cli->cl_import, + *((uid_t *) val)); + RETURN(0); + } - req->rq_replen = lustre_msg_size(0, NULL); - rc = ptlrpc_queue_wait(req); - ptlrpc_req_finished(req); + if (keylen < strlen("mds_conn") || + memcmp(key, "mds_conn", keylen) != 0) + RETURN(-EINVAL); - ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_ORIG_CTXT); + ctxt = llog_get_context(&exp->exp_obd->obd_llogs, + LLOG_UNLINK_ORIG_CTXT); if (ctxt) { - rc = llog_initiator_connect(ctxt); - if (rc) - RETURN(rc); + if (rc == 0) + rc = llog_initiator_connect(ctxt); + else + CERROR("cannot establish the connect for " + "ctxt %p: %d\n", ctxt, rc); } imp->imp_server_timeout = 1; @@ -2809,64 +2983,69 @@ static struct llog_operations osc_size_repl_logops = { }; static struct llog_operations osc_unlink_orig_logops; -static int osc_llog_init(struct obd_device *obd, struct obd_device *tgt, - int count, struct llog_catid *catid) + +static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs, + struct obd_device *tgt, int count, + struct llog_catid *catid) { int rc; ENTRY; osc_unlink_orig_logops = llog_lvfs_ops; osc_unlink_orig_logops.lop_setup = llog_obd_origin_setup; - osc_unlink_orig_logops.lop_cleanup = llog_obd_origin_cleanup; - osc_unlink_orig_logops.lop_add = llog_obd_origin_add; + osc_unlink_orig_logops.lop_cleanup = llog_catalog_cleanup; + osc_unlink_orig_logops.lop_add = llog_catalog_add; osc_unlink_orig_logops.lop_connect = llog_origin_connect; - rc = llog_setup(obd, LLOG_UNLINK_ORIG_CTXT, tgt, count, - &catid->lci_logid, &osc_unlink_orig_logops); + rc = obd_llog_setup(obd, llogs, LLOG_UNLINK_ORIG_CTXT, tgt, count, + &catid->lci_logid, &osc_unlink_orig_logops); if (rc) RETURN(rc); - rc = llog_setup(obd, LLOG_SIZE_REPL_CTXT, tgt, count, NULL, - &osc_size_repl_logops); + rc = obd_llog_setup(obd, llogs, LLOG_SIZE_REPL_CTXT, tgt, count, NULL, + &osc_size_repl_logops); RETURN(rc); } -static int osc_llog_finish(struct obd_device *obd, int count) +static int osc_llog_finish(struct obd_device *obd, + struct obd_llogs *llogs, int count) { int rc; ENTRY; - rc = llog_cleanup(llog_get_context(obd, LLOG_UNLINK_ORIG_CTXT)); + rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_UNLINK_ORIG_CTXT)); if (rc) RETURN(rc); - rc = llog_cleanup(llog_get_context(obd, LLOG_SIZE_REPL_CTXT)); + rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_SIZE_REPL_CTXT)); RETURN(rc); } - static int osc_connect(struct lustre_handle *exph, - struct obd_device *obd, struct obd_uuid *cluuid) + struct obd_device *obd, struct obd_uuid *cluuid, + struct obd_connect_data *data, + unsigned long connect_flags) { int rc; - - rc = client_connect_import(exph, obd, cluuid); - - return rc; + ENTRY; + rc = client_connect_import(exph, obd, cluuid, data, connect_flags); + RETURN(rc); } -static int osc_disconnect(struct obd_export *exp, int flags) +static int osc_disconnect(struct obd_export *exp, unsigned long flags) { struct obd_device *obd = class_exp2obd(exp); - struct llog_ctxt *ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT); + struct llog_ctxt *ctxt; int rc; + ENTRY; + ctxt = llog_get_context(&obd->obd_llogs, LLOG_SIZE_REPL_CTXT); if (obd->u.cli.cl_conn_count == 1) /* flush any remaining cancel messages out to the target */ llog_sync(ctxt, exp); rc = client_disconnect_export(exp, flags); - return rc; + RETURN(rc); } static int osc_import_event(struct obd_device *obd, @@ -2892,7 +3071,7 @@ static int osc_import_event(struct obd_device *obd, } case IMP_EVENT_INACTIVE: { if (obd->obd_observer) - rc = obd_notify(obd->obd_observer, obd, 0); + rc = obd_notify(obd->obd_observer, obd, 0, 0); break; } case IMP_EVENT_INVALIDATE: { @@ -2912,8 +3091,17 @@ static int osc_import_event(struct obd_device *obd, break; } case IMP_EVENT_ACTIVE: { + /* Only do this on the MDS OSC's */ + if (imp->imp_server_timeout) { + struct osc_creator *oscc = &obd->u.cli.cl_oscc; + + spin_lock(&oscc->oscc_lock); + oscc->oscc_flags &= ~OSCC_FLAG_NOSPC; + spin_unlock(&oscc->oscc_lock); + } + if (obd->obd_observer) - rc = obd_notify(obd->obd_observer, obd, 1); + rc = obd_notify(obd->obd_observer, obd, 1, 0); break; } default: @@ -2923,49 +3111,78 @@ static int osc_import_event(struct obd_device *obd, RETURN(rc); } -int osc_setup(struct obd_device *obd, obd_count len, void *buf) +static int osc_attach(struct obd_device *dev, obd_count len, void *data) { + struct lprocfs_static_vars lvars; int rc; + ENTRY; + + lprocfs_init_vars(osc,&lvars); + rc = lprocfs_obd_attach(dev, lvars.obd_vars); + if (rc < 0) + RETURN(rc); + rc = lproc_osc_attach_seqstat(dev); + if (rc < 0) { + lprocfs_obd_detach(dev); + RETURN(rc); + } + + ptlrpc_lprocfs_register_obd(dev); + RETURN(0); +} + +static int osc_detach(struct obd_device *dev) +{ + ptlrpc_lprocfs_unregister_obd(dev); + return lprocfs_obd_detach(dev); +} + +static int osc_setup(struct obd_device *obd, obd_count len, void *buf) +{ + int rc; + ENTRY; rc = ptlrpcd_addref(); if (rc) - return rc; + RETURN(rc); rc = client_obd_setup(obd, len, buf); - if (rc) { + if (rc) ptlrpcd_decref(); - } else { - struct lprocfs_static_vars lvars; - - lprocfs_init_vars(osc, &lvars); - if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) { - lproc_osc_attach_seqstat(obd); - ptlrpc_lprocfs_register_obd(obd); - } - + else oscc_init(obd); - } RETURN(rc); } -int osc_cleanup(struct obd_device *obd, int flags) +static int osc_cleanup(struct obd_device *obd, int flags) { + struct osc_creator *oscc = &obd->u.cli.cl_oscc; int rc; - ptlrpc_lprocfs_unregister_obd(obd); - lprocfs_obd_cleanup(obd); + rc = ldlm_cli_cancel_unused(obd->obd_namespace, NULL, + LDLM_FL_CONFIG_CHANGE, NULL); + if (rc) + RETURN(rc); + + spin_lock(&oscc->oscc_lock); + oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING; + oscc->oscc_flags |= OSCC_FLAG_EXITING; + spin_unlock(&oscc->oscc_lock); rc = client_obd_cleanup(obd, flags); ptlrpcd_decref(); RETURN(rc); } - struct obd_ops osc_obd_ops = { .o_owner = THIS_MODULE, + .o_attach = osc_attach, + .o_detach = osc_detach, .o_setup = osc_setup, .o_cleanup = osc_cleanup, + .o_add_conn = client_import_add_conn, + .o_del_conn = client_import_del_conn, .o_connect = osc_connect, .o_disconnect = osc_disconnect, .o_statfs = osc_statfs, @@ -3002,7 +3219,11 @@ struct obd_ops osc_obd_ops = { #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) struct obd_ops sanosc_obd_ops = { .o_owner = THIS_MODULE, + .o_attach = osc_attach, + .o_detach = osc_detach, .o_cleanup = client_obd_cleanup, + .o_add_conn = client_import_add_conn, + .o_del_conn = client_import_del_conn, .o_connect = osc_connect, .o_disconnect = client_disconnect_export, .o_statfs = osc_statfs, @@ -3043,13 +3264,13 @@ int __init osc_init(void) lprocfs_init_vars(osc, &sanlvars); #endif - rc = class_register_type(&osc_obd_ops, lvars.module_vars, + rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars, LUSTRE_OSC_NAME); if (rc) RETURN(rc); #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - rc = class_register_type(&sanosc_obd_ops, sanlvars.module_vars, + rc = class_register_type(&sanosc_obd_ops, NULL, sanlvars.module_vars, LUSTRE_SANOSC_NAME); if (rc) class_unregister_type(LUSTRE_OSC_NAME);