X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=4ca9be098e23c9aa6dc302f68a960520096f2357;hp=1acf2d13d4b0a687d554a67ea5093dacfb6d17c9;hb=1a409a3e6a74685970ee779ebe32917bf51eaf3a;hpb=ac5fcdce025b4825500c0308d89dfdab1faece51 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 1acf2d1..4ca9be0 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -27,7 +27,6 @@ */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. */ #define DEBUG_SUBSYSTEM S_OSC @@ -36,7 +35,6 @@ #include #include #include -#include #include #include #include @@ -50,6 +48,7 @@ #include #include "osc_internal.h" +#include atomic_t osc_pool_req_count; unsigned int osc_reqpool_maxreqcount; @@ -450,14 +449,7 @@ int osc_fallocate_base(struct obd_export *exp, struct obdo *oa, int rc; ENTRY; - /* - * Only mode == 0 (which is standard prealloc) is supported now. - * Punch is not supported yet. - */ - if (mode & ~FALLOC_FL_KEEP_SIZE) - RETURN(-EOPNOTSUPP); oa->o_falloc_mode = mode; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_FALLOCATE); if (req == NULL) @@ -476,7 +468,7 @@ int osc_fallocate_base(struct obd_export *exp, struct obdo *oa, ptlrpc_request_set_replen(req); - req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; + req->rq_interpret_reply = osc_setattr_interpret; BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args)); sa = ptlrpc_req_async_args(sa, req); sa->sa_oa = oa; @@ -487,6 +479,7 @@ int osc_fallocate_base(struct obd_export *exp, struct obdo *oa, RETURN(0); } +EXPORT_SYMBOL(osc_fallocate_base); static int osc_sync_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *args, int rc) @@ -699,7 +692,7 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, oa->o_valid |= bits; spin_lock(&cli->cl_loi_list_lock); - if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM)) + if (cli->cl_ocd_grant_param) oa->o_dirty = cli->cl_dirty_grant; else oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT; @@ -730,13 +723,12 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, nrpages *= cli->cl_max_rpcs_in_flight + 1; nrpages = max(nrpages, cli->cl_dirty_max_pages); undirty = nrpages << PAGE_SHIFT; - if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, - GRANT_PARAM)) { + if (cli->cl_ocd_grant_param) { int nrextents; /* take extent tax into account when asking for more * grant space */ - nrextents = (nrpages + cli->cl_max_extent_pages - 1) / + nrextents = (nrpages + cli->cl_max_extent_pages - 1) / cli->cl_max_extent_pages; undirty += nrextents * cli->cl_grant_extent_tax; } @@ -747,11 +739,20 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, ~(PTLRPC_MAX_BRW_SIZE * 4UL)); } oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; - oa->o_dropped = cli->cl_lost_grant; - cli->cl_lost_grant = 0; + /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */ + if (cli->cl_lost_grant > INT_MAX) { + CDEBUG(D_CACHE, + "%s: avoided o_dropped overflow: cl_lost_grant %lu\n", + cli_name(cli), cli->cl_lost_grant); + oa->o_dropped = INT_MAX; + } else { + oa->o_dropped = cli->cl_lost_grant; + } + cli->cl_lost_grant -= oa->o_dropped; spin_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n", - oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant); + CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu" + " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty, + oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant); } void osc_update_next_shrink(struct client_obd *cli) @@ -1023,12 +1024,19 @@ void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) spin_lock(&cli->cl_loi_list_lock); cli->cl_avail_grant = ocd->ocd_grant; if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) { - cli->cl_avail_grant -= cli->cl_reserved_grant; + unsigned long consumed = cli->cl_reserved_grant; + if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) - cli->cl_avail_grant -= cli->cl_dirty_grant; + consumed += cli->cl_dirty_grant; else - cli->cl_avail_grant -= - cli->cl_dirty_pages << PAGE_SHIFT; + consumed += cli->cl_dirty_pages << PAGE_SHIFT; + if (cli->cl_avail_grant < consumed) { + CERROR("%s: granted %ld but already consumed %ld\n", + cli_name(cli), cli->cl_avail_grant, consumed); + cli->cl_avail_grant = 0; + } else { + cli->cl_avail_grant -= consumed; + } } if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) { @@ -1046,10 +1054,10 @@ void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) ~chunk_mask) & chunk_mask; /* determine maximum extent size, in #pages */ size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits; - cli->cl_max_extent_pages = size >> PAGE_SHIFT; - if (cli->cl_max_extent_pages == 0) - cli->cl_max_extent_pages = 1; + cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1; + cli->cl_ocd_grant_param = 1; } else { + cli->cl_ocd_grant_param = 0; cli->cl_grant_extent_tax = 0; cli->cl_chunkbits = PAGE_SHIFT; cli->cl_max_extent_pages = DT_MAX_BRW_PAGES; @@ -1170,7 +1178,7 @@ static int osc_checksum_bulk_t10pi(const char *obd_name, int nob, size_t pg_count, struct brw_page **pga, int opc, obd_dif_csum_fn *fn, int sector_size, - u32 *check_sum) + u32 *check_sum, bool resend) { struct ahash_request *req; /* Used Adler as the default checksum type on top of DIF tags */ @@ -1203,6 +1211,10 @@ static int osc_checksum_bulk_t10pi(const char *obd_name, int nob, buffer = kmap(__page); guard_start = (__u16 *)buffer; guard_number = PAGE_SIZE / sizeof(*guard_start); + CDEBUG(D_PAGE | (resend ? D_HA : 0), + "GRD tags per page=%u, resend=%u, bytes=%u, pages=%zu\n", + guard_number, resend, nob, pg_count); + while (nob > 0 && pg_count > 0) { unsigned int count = pga[i]->count > nob ? nob : pga[i]->count; @@ -1228,6 +1240,12 @@ static int osc_checksum_bulk_t10pi(const char *obd_name, int nob, guard_number - used_number, &used, sector_size, fn); + if (unlikely(resend)) + CDEBUG(D_PAGE | D_HA, + "pga[%u]: used %u off %llu+%u gen checksum: %*phN\n", + i, used, pga[i]->off & ~PAGE_MASK, count, + (int)(used * sizeof(*guard_start)), + guard_start + used_number); if (rc) break; @@ -1266,7 +1284,7 @@ out: #else /* !CONFIG_CRC_T10DIF */ #define obd_dif_ip_fn NULL #define obd_dif_crc_fn NULL -#define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \ +#define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum, re) \ -EOPNOTSUPP #endif /* CONFIG_CRC_T10DIF */ @@ -1328,7 +1346,7 @@ static int osc_checksum_bulk_rw(const char *obd_name, enum cksum_types cksum_type, int nob, size_t pg_count, struct brw_page **pga, int opc, - u32 *check_sum) + u32 *check_sum, bool resend) { obd_dif_csum_fn *fn = NULL; int sector_size = 0; @@ -1339,7 +1357,8 @@ static int osc_checksum_bulk_rw(const char *obd_name, if (fn) rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga, - opc, fn, sector_size, check_sum); + opc, fn, sector_size, check_sum, + resend); else rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type, check_sum); @@ -1354,8 +1373,11 @@ static inline void osc_release_bounce_pages(struct brw_page **pga, int i; for (i = 0; i < page_count; i++) { - if (!pga[i]->pg->mapping) - /* bounce pages are unmapped */ + /* Bounce pages allocated by a call to + * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request() + * are identified thanks to the PageChecked flag. + */ + if (PageChecked(pga[i]->pg)) llcrypt_finalize_bounce_page(&pga[i]->pg); pga[i]->count -= pga[i]->bp_count_diff; pga[i]->off += pga[i]->bp_off_diff; @@ -1379,10 +1401,26 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, struct brw_page *pg_prev; void *short_io_buf; const char *obd_name = cli->cl_import->imp_obd->obd_name; - struct inode *inode; + struct inode *inode = NULL; + bool directio = false; + bool enable_checksum = true; ENTRY; - inode = page2inode(pga[0]->pg); + if (pga[0]->pg) { + inode = page2inode(pga[0]->pg); + if (inode == NULL) { + /* Try to get reference to inode from cl_page if we are + * dealing with direct IO, as handled pages are not + * actual page cache pages. + */ + struct osc_async_page *oap = brw_page2oap(pga[0]); + struct cl_page *clpage = oap2cl_page(oap); + + inode = clpage->cp_inode; + if (inode) + directio = true; + } + } if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ)) RETURN(-ENOMEM); /* Recoverable */ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2)) @@ -1407,6 +1445,8 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, bool retried = false; bool lockedbymyself; u32 nunits = (pg->off & ~PAGE_MASK) + pg->count; + struct address_space *map_orig = NULL; + pgoff_t index_orig; retry_encrypt: if (nunits & ~LUSTRE_ENCRYPTION_MASK) @@ -1422,10 +1462,20 @@ retry_encrypt: * which means only once the page is fully processed. */ lockedbymyself = trylock_page(pg->pg); + if (directio) { + map_orig = pg->pg->mapping; + pg->pg->mapping = inode->i_mapping; + index_orig = pg->pg->index; + pg->pg->index = pg->off >> PAGE_SHIFT; + } data_page = llcrypt_encrypt_pagecache_blocks(pg->pg, nunits, 0, GFP_NOFS); + if (directio) { + pg->pg->mapping = map_orig; + pg->pg->index = index_orig; + } if (lockedbymyself) unlock_page(pg->pg); if (IS_ERR(data_page)) { @@ -1438,6 +1488,10 @@ retry_encrypt: ptlrpc_request_free(req); RETURN(rc); } + /* Set PageChecked flag on bounce page for + * disambiguation in osc_release_bounce_pages(). + */ + SetPageChecked(data_page); pg->pg = data_page; /* there should be no gap in the middle of page array */ if (i == page_count - 1) { @@ -1493,11 +1547,22 @@ retry_encrypt: } } + if (lnet_is_rdma_only_page(pga[0]->pg)) { + enable_checksum = false; + short_io_size = 0; + } + /* Check if read/write is small enough to be a short io. */ if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 || !imp_connect_shortio(cli->cl_import)) short_io_size = 0; + /* If this is an empty RPC to old server, just ignore it */ + if (!short_io_size && !pga[0]->pg) { + ptlrpc_request_free(req); + RETURN(-ENODATA); + } + req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT, opc == OST_READ ? 0 : short_io_size); if (opc == OST_READ) @@ -1642,10 +1707,12 @@ no_bulk: if (osc_should_shrink_grant(cli)) osc_shrink_grant_local(cli, &body->oa); + if (!cli->cl_checksum || sptlrpc_flavor_has_bulk(&req->rq_flvr)) + enable_checksum = false; + /* size[REQ_REC_OFF] still sizeof (*body) */ if (opc == OST_WRITE) { - if (cli->cl_checksum && - !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { + if (enable_checksum) { /* store cl_cksum_type in a local variable since * it can be changed via lprocfs */ enum cksum_types cksum_type = cli->cl_cksum_type; @@ -1660,17 +1727,18 @@ no_bulk: rc = osc_checksum_bulk_rw(obd_name, cksum_type, requested_nob, page_count, pga, OST_WRITE, - &body->oa.o_cksum); + &body->oa.o_cksum, resend); if (rc < 0) { - CDEBUG(D_PAGE, "failed to checksum, rc = %d\n", + CDEBUG(D_PAGE, "failed to checksum: rc = %d\n", rc); GOTO(out, rc); } - CDEBUG(D_PAGE, "checksum at write origin: %x\n", - body->oa.o_cksum); + CDEBUG(D_PAGE | (resend ? D_HA : 0), + "checksum at write origin: %x (%x)\n", + body->oa.o_cksum, cksum_type); - /* save this in 'oa', too, for later checking */ - oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; + /* save this in 'oa', too, for later checking */ + oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; oa->o_flags |= obd_cksum_type_pack(obd_name, cksum_type); } else { @@ -1683,8 +1751,7 @@ no_bulk: req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, sizeof(__u32) * niocount); } else { - if (cli->cl_checksum && - !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { + if (enable_checksum) { if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) body->oa.o_flags = 0; body->oa.o_flags |= obd_cksum_type_pack(obd_name, @@ -1736,15 +1803,15 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count, * file/fid, not during the resends/retries. */ snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name), "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x", - (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ? - libcfs_debug_file_path_arr : - LIBCFS_DEBUG_FILE_PATH_DEFAULT), + (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ? + libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT), oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, pga[0]->off, pga[page_count-1]->off + pga[page_count-1]->count - 1, client_cksum, server_cksum); + CWARN("dumping checksum data to %s\n", dbgcksum_file_name); filp = filp_open(dbgcksum_file_name, O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600); if (IS_ERR(filp)) { @@ -1771,8 +1838,6 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count, } len -= rc; buf += rc; - CDEBUG(D_INFO, "%s: wrote %d bytes\n", - dbgcksum_file_name, rc); } kunmap(pga[i]->pg); } @@ -1781,6 +1846,8 @@ static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count, if (rc) CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc); filp_close(filp, NULL); + + libcfs_debug_dumplog(); } static int @@ -1833,7 +1900,7 @@ check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer, rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob, aa->aa_page_count, aa->aa_ppga, OST_WRITE, fn, sector_size, - &new_cksum); + &new_cksum, true); else rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count, aa->aa_ppga, OST_WRITE, cksum_type, @@ -1881,6 +1948,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) struct ost_body *body; u32 client_cksum = 0; struct inode *inode; + unsigned int blockbits = 0, blocksize = 0; ENTRY; @@ -1994,22 +2062,23 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) } } - if (rc < aa->aa_requested_nob) - handle_short_read(rc, aa->aa_page_count, aa->aa_ppga); + if (rc < aa->aa_requested_nob) + handle_short_read(rc, aa->aa_page_count, aa->aa_ppga); - if (body->oa.o_valid & OBD_MD_FLCKSUM) { - static int cksum_counter; - u32 server_cksum = body->oa.o_cksum; - char *via = ""; - char *router = ""; + if (body->oa.o_valid & OBD_MD_FLCKSUM) { + static int cksum_counter; + u32 server_cksum = body->oa.o_cksum; + int nob = rc; + char *via = ""; + char *router = ""; enum cksum_types cksum_type; u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ? body->oa.o_flags : 0; cksum_type = obd_cksum_type_unpack(o_flags); - rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc, + rc = osc_checksum_bulk_rw(obd_name, cksum_type, nob, aa->aa_page_count, aa->aa_ppga, - OST_READ, &client_cksum); + OST_READ, &client_cksum, false); if (rc < 0) GOTO(out, rc); @@ -2021,8 +2090,12 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) if (server_cksum != client_cksum) { struct ost_body *clbody; + __u32 client_cksum2; u32 page_count = aa->aa_page_count; + osc_checksum_bulk_rw(obd_name, cksum_type, nob, + page_count, aa->aa_ppga, + OST_READ, &client_cksum2, true); clbody = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); if (cli->cl_checksum_dump) @@ -2032,7 +2105,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " "%s%s%s inode "DFID" object "DOSTID - " extent [%llu-%llu], client %x, " + " extent [%llu-%llu], client %x/%x, " "server %x, cksum_type %x\n", obd_name, libcfs_nid2str(peer->nid), @@ -2047,8 +2120,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) aa->aa_ppga[0]->off, aa->aa_ppga[page_count-1]->off + aa->aa_ppga[page_count-1]->count - 1, - client_cksum, server_cksum, - cksum_type); + client_cksum, client_cksum2, + server_cksum, cksum_type); cksum_counter = 0; aa->aa_oa->o_cksum = client_cksum; rc = -EAGAIN; @@ -2070,6 +2143,19 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) } inode = page2inode(aa->aa_ppga[0]->pg); + if (inode == NULL) { + /* Try to get reference to inode from cl_page if we are + * dealing with direct IO, as handled pages are not + * actual page cache pages. + */ + struct osc_async_page *oap = brw_page2oap(aa->aa_ppga[0]); + + inode = oap2cl_page(oap)->cp_inode; + if (inode) { + blockbits = inode->i_blkbits; + blocksize = 1 << blockbits; + } + } if (inode && IS_ENCRYPTED(inode)) { int idx; @@ -2094,18 +2180,36 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) break; } - /* The page is already locked when we arrive here, - * except when we deal with a twisted page for - * specific Direct IO support, in which case - * PageChecked flag is set on page. - */ - if (PageChecked(pg->pg)) - lock_page(pg->pg); - rc = llcrypt_decrypt_pagecache_blocks(pg->pg, - LUSTRE_ENCRYPTION_UNIT_SIZE, - offs); - if (PageChecked(pg->pg)) - unlock_page(pg->pg); + if (blockbits) { + /* This is direct IO case. Directly call + * decrypt function that takes inode as + * input parameter. Page does not need + * to be locked. + */ + u64 lblk_num = + ((u64)(pg->off >> PAGE_SHIFT) << + (PAGE_SHIFT - blockbits)) + + (offs >> blockbits); + unsigned int i; + + for (i = offs; + i < offs + + LUSTRE_ENCRYPTION_UNIT_SIZE; + i += blocksize, lblk_num++) { + rc = + llcrypt_decrypt_block_inplace( + inode, pg->pg, + blocksize, i, + lblk_num); + if (rc) + break; + } + } else { + rc = llcrypt_decrypt_pagecache_blocks( + pg->pg, + LUSTRE_ENCRYPTION_UNIT_SIZE, + offs); + } if (rc) GOTO(out, rc); @@ -2224,7 +2328,7 @@ static void sort_brw_pages(struct brw_page **array, int num) static void osc_release_ppga(struct brw_page **ppga, size_t count) { LASSERT(ppga != NULL); - OBD_FREE_PTR_ARRAY(ppga, count); + OBD_FREE_PTR_ARRAY_LARGE(ppga, count); } static int brw_interpret(const struct lu_env *env, @@ -2256,7 +2360,7 @@ static int brw_interpret(const struct lu_env *env, req->rq_import->imp_obd->obd_name, POSTID(&aa->aa_oa->o_oi), rc); } else if (rc == -EINPROGRESS || - client_should_resend(aa->aa_resends, aa->aa_cli)) { + client_should_resend(aa->aa_resends, aa->aa_cli)) { rc = osc_brw_redo_request(req, aa, rc); } else { CERROR("%s: too many resent retries for object: " @@ -2331,7 +2435,7 @@ static int brw_interpret(const struct lu_env *env, list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { list_del_init(&ext->oe_link); osc_extent_finish(env, ext, 1, - rc && req->rq_no_delay ? -EWOULDBLOCK : rc); + rc && req->rq_no_delay ? -EAGAIN : rc); } LASSERT(list_empty(&aa->aa_exts)); LASSERT(list_empty(&aa->aa_oaps)); @@ -2424,7 +2528,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, if (mem_tight) mpflag = memalloc_noreclaim_save(); - OBD_ALLOC_PTR_ARRAY(pga, page_count); + OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count); if (pga == NULL) GOTO(out, rc = -ENOMEM); @@ -2461,7 +2565,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, } /* first page in the list */ - oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item); + oap = list_first_entry(&rpc_list, typeof(*oap), oap_rpc_item); crattr = &osc_env_info(env)->oti_req_attr; memset(crattr, 0, sizeof(*crattr)); @@ -2557,10 +2661,11 @@ out: osc_release_ppga(pga, page_count); } /* this should happen rarely and is pretty bad, it makes the - * pending list not follow the dirty order */ - while (!list_empty(ext_list)) { - ext = list_entry(ext_list->next, struct osc_extent, - oe_link); + * pending list not follow the dirty order + */ + while ((ext = list_first_entry_or_null(ext_list, + struct osc_extent, + oe_link)) != NULL) { list_del_init(&ext->oe_link); osc_extent_finish(env, ext, 0, rc); } @@ -2568,6 +2673,34 @@ out: RETURN(rc); } +/* This is to refresh our lock in face of no RPCs. */ +void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start) +{ + struct ptlrpc_request *req; + struct obdo oa; + struct brw_page bpg = { .off = start, .count = 1}; + struct brw_page *pga = &bpg; + int rc; + + memset(&oa, 0, sizeof(oa)); + oa.o_oi = osc->oo_oinfo->loi_oi; + oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS; + /* For updated servers - don't do a read */ + oa.o_flags = OBD_FL_NORPC; + + rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga, + &req, 0); + + /* If we succeeded we ship it off, if not there's no point in doing + * anything. Also no resends. + * No interpret callback, no commit callback. + */ + if (!rc) { + req->rq_no_resend = 1; + ptlrpcd_add_req(req); + } +} + static int osc_set_lock_data(struct ldlm_lock *lock, void *data) { int set = 0; @@ -2634,6 +2767,10 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, struct ost_lvb *lvb = aa->oa_lvb; __u32 lvb_len = sizeof(*lvb); __u64 flags = 0; + struct ldlm_enqueue_info einfo = { + .ei_type = aa->oa_type, + .ei_mode = mode, + }; ENTRY; @@ -2663,9 +2800,8 @@ int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, } /* Complete obtaining the lock procedure. */ - rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1, - aa->oa_mode, aa->oa_flags, lvb, lvb_len, - lockh, rc); + rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags, + lvb, lvb_len, lockh, rc); /* Complete osc stuff. */ rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode, aa->oa_flags, aa->oa_speculative, rc); @@ -2728,7 +2864,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, if (intent != 0) match_flags |= LDLM_FL_BLOCK_GRANTED; mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id, - einfo->ei_type, policy, mode, &lockh, 0); + einfo->ei_type, policy, mode, &lockh); if (mode) { struct ldlm_lock *matched; @@ -2770,23 +2906,6 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK)) RETURN(-ENOLCK); - if (intent) { - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_LDLM_ENQUEUE_LVB); - if (req == NULL) - RETURN(-ENOMEM); - - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, - sizeof *lvb); - ptlrpc_request_set_replen(req); - } - /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */ *flags &= ~LDLM_FL_BLOCK_GRANTED; @@ -2816,16 +2935,12 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, req->rq_interpret_reply = osc_enqueue_interpret; ptlrpc_set_add_req(rqset, req); - } else if (intent) { - ptlrpc_req_finished(req); } RETURN(rc); } rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode, flags, speculative, rc); - if (intent) - ptlrpc_req_finished(req); RETURN(rc); } @@ -2834,7 +2949,7 @@ int osc_match_base(const struct lu_env *env, struct obd_export *exp, struct ldlm_res_id *res_id, enum ldlm_type type, union ldlm_policy_data *policy, enum ldlm_mode mode, __u64 *flags, struct osc_object *obj, - struct lustre_handle *lockh, int unref) + struct lustre_handle *lockh, enum ldlm_match_flags match_flags) { struct obd_device *obd = exp->exp_obd; __u64 lflags = *flags; @@ -2849,15 +2964,10 @@ int osc_match_base(const struct lu_env *env, struct obd_export *exp, policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK; policy->l_extent.end |= ~PAGE_MASK; - /* Next, search for already existing extent locks that will cover us */ - /* If we're trying to read, we also search for an existing PW lock. The - * VFS and page cache already protect us locally, so lots of readers/ - * writers can share a single PW lock. */ - rc = mode; - if (mode == LCK_PR) - rc |= LCK_PW; - rc = ldlm_lock_match(obd->obd_namespace, lflags, - res_id, type, policy, rc, lockh, unref); + /* Next, search for already existing extent locks that will cover us */ + rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0, + res_id, type, policy, mode, lockh, + match_flags); if (rc == 0 || lflags & LDLM_FL_TEST_LOCK) RETURN(rc); @@ -2980,19 +3090,17 @@ static int osc_statfs(const struct lu_env *env, struct obd_export *exp, struct obd_device *obd = class_exp2obd(exp); struct obd_statfs *msfs; struct ptlrpc_request *req; - struct obd_import *imp = NULL; + struct obd_import *imp, *imp0; int rc; ENTRY; - - /*Since the request might also come from lprocfs, so we need - *sync this with client_disconnect_export Bug15684*/ - down_read(&obd->u.cli.cl_sem); - if (obd->u.cli.cl_import) - imp = class_import_get(obd->u.cli.cl_import); - up_read(&obd->u.cli.cl_sem); - if (!imp) - RETURN(-ENODEV); + /*Since the request might also come from lprocfs, so we need + *sync this with client_disconnect_export Bug15684 + */ + with_imp_locked(obd, imp0, rc) + imp = class_import_get(imp0); + if (rc) + RETURN(rc); /* We could possibly pass max_age in the request (as an absolute * timestamp or a "seconds.usec ago") so the target can avoid doing @@ -3562,21 +3670,28 @@ static const struct obd_ops osc_obd_ops = { .o_quotactl = osc_quotactl, }; -static struct shrinker *osc_cache_shrinker; LIST_HEAD(osc_shrink_list); DEFINE_SPINLOCK(osc_shrink_lock); -#ifndef HAVE_SHRINKER_COUNT -static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask)) +#ifdef HAVE_SHRINKER_COUNT +static struct shrinker osc_cache_shrinker = { + .count_objects = osc_cache_shrink_count, + .scan_objects = osc_cache_shrink_scan, + .seeks = DEFAULT_SEEKS, +}; +#else +static int osc_cache_shrink(struct shrinker *shrinker, + struct shrink_control *sc) { - struct shrink_control scv = { - .nr_to_scan = shrink_param(sc, nr_to_scan), - .gfp_mask = shrink_param(sc, gfp_mask) - }; - (void)osc_cache_shrink_scan(shrinker, &scv); + (void)osc_cache_shrink_scan(shrinker, sc); - return osc_cache_shrink_count(shrinker, &scv); + return osc_cache_shrink_count(shrinker, sc); } + +static struct shrinker osc_cache_shrinker = { + .shrink = osc_cache_shrink, + .seeks = DEFAULT_SEEKS, +}; #endif static int __init osc_init(void) @@ -3584,8 +3699,6 @@ static int __init osc_init(void) unsigned int reqpool_size; unsigned int reqsize; int rc; - DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink, - osc_cache_shrink_count, osc_cache_shrink_scan); ENTRY; /* print an address of _any_ initialized kernel symbol from this @@ -3597,16 +3710,18 @@ static int __init osc_init(void) if (rc) RETURN(rc); - rc = class_register_type(&osc_obd_ops, NULL, true, NULL, + rc = class_register_type(&osc_obd_ops, NULL, true, LUSTRE_OSC_NAME, &osc_device_type); if (rc) GOTO(out_kmem, rc); - osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar); + rc = register_shrinker(&osc_cache_shrinker); + if (rc) + GOTO(out_type, rc); /* This is obviously too much memory, only prevent overflow here */ if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) - GOTO(out_type, rc = -EINVAL); + GOTO(out_shrinker, rc = -EINVAL); reqpool_size = osc_reqpool_mem_max << 20; @@ -3627,7 +3742,7 @@ static int __init osc_init(void) ptlrpc_add_rqs_to_pool); if (osc_rq_pool == NULL) - GOTO(out_type, rc = -ENOMEM); + GOTO(out_shrinker, rc = -ENOMEM); rc = osc_start_grant_work(); if (rc != 0) @@ -3637,6 +3752,8 @@ static int __init osc_init(void) out_req_pool: ptlrpc_free_rq_pool(osc_rq_pool); +out_shrinker: + unregister_shrinker(&osc_cache_shrinker); out_type: class_unregister_type(LUSTRE_OSC_NAME); out_kmem: @@ -3648,7 +3765,7 @@ out_kmem: static void __exit osc_exit(void) { osc_stop_grant_work(); - remove_shrinker(osc_cache_shrinker); + unregister_shrinker(&osc_cache_shrinker); class_unregister_type(LUSTRE_OSC_NAME); lu_kmem_fini(osc_caches); ptlrpc_free_rq_pool(osc_rq_pool);