4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_debug.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_ha.h>
43 #include <uapi/linux/lustre/lustre_ioctl.h>
44 #include <lustre_net.h>
45 #include <lustre_obdo.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50 #include <linux/falloc.h>
52 #include "osc_internal.h"
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
62 static int osc_idle_timeout = 20;
63 module_param(osc_idle_timeout, uint, 0644);
65 #define osc_grant_args osc_brw_async_args
67 struct osc_setattr_args {
69 obd_enqueue_update_f sa_upcall;
73 struct osc_fsync_args {
74 struct osc_object *fa_obj;
76 obd_enqueue_update_f fa_upcall;
80 struct osc_ladvise_args {
82 obd_enqueue_update_f la_upcall;
86 static void osc_release_ppga(struct brw_page **ppga, size_t count);
87 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
90 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
92 struct ost_body *body;
94 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
97 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
100 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
103 struct ptlrpc_request *req;
104 struct ost_body *body;
108 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
112 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
114 ptlrpc_request_free(req);
118 osc_pack_req_body(req, oa);
120 ptlrpc_request_set_replen(req);
122 rc = ptlrpc_queue_wait(req);
126 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
128 GOTO(out, rc = -EPROTO);
130 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
133 oa->o_blksize = cli_brw_size(exp->exp_obd);
134 oa->o_valid |= OBD_MD_FLBLKSZ;
138 ptlrpc_req_finished(req);
143 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
146 struct ptlrpc_request *req;
147 struct ost_body *body;
151 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
153 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
157 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
159 ptlrpc_request_free(req);
163 osc_pack_req_body(req, oa);
165 ptlrpc_request_set_replen(req);
167 rc = ptlrpc_queue_wait(req);
171 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
173 GOTO(out, rc = -EPROTO);
175 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
179 ptlrpc_req_finished(req);
184 static int osc_setattr_interpret(const struct lu_env *env,
185 struct ptlrpc_request *req, void *args, int rc)
187 struct osc_setattr_args *sa = args;
188 struct ost_body *body;
195 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
197 GOTO(out, rc = -EPROTO);
199 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
202 rc = sa->sa_upcall(sa->sa_cookie, rc);
206 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
207 obd_enqueue_update_f upcall, void *cookie,
208 struct ptlrpc_request_set *rqset)
210 struct ptlrpc_request *req;
211 struct osc_setattr_args *sa;
216 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
220 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
222 ptlrpc_request_free(req);
226 osc_pack_req_body(req, oa);
228 ptlrpc_request_set_replen(req);
230 /* do mds to ost setattr asynchronously */
232 /* Do not wait for response. */
233 ptlrpcd_add_req(req);
235 req->rq_interpret_reply = osc_setattr_interpret;
237 sa = ptlrpc_req_async_args(sa, req);
239 sa->sa_upcall = upcall;
240 sa->sa_cookie = cookie;
242 ptlrpc_set_add_req(rqset, req);
248 static int osc_ladvise_interpret(const struct lu_env *env,
249 struct ptlrpc_request *req,
252 struct osc_ladvise_args *la = arg;
253 struct ost_body *body;
259 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
261 GOTO(out, rc = -EPROTO);
263 *la->la_oa = body->oa;
265 rc = la->la_upcall(la->la_cookie, rc);
270 * If rqset is NULL, do not wait for response. Upcall and cookie could also
271 * be NULL in this case
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274 struct ladvise_hdr *ladvise_hdr,
275 obd_enqueue_update_f upcall, void *cookie,
276 struct ptlrpc_request_set *rqset)
278 struct ptlrpc_request *req;
279 struct ost_body *body;
280 struct osc_ladvise_args *la;
282 struct lu_ladvise *req_ladvise;
283 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
284 int num_advise = ladvise_hdr->lah_count;
285 struct ladvise_hdr *req_ladvise_hdr;
288 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
292 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293 num_advise * sizeof(*ladvise));
294 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
296 ptlrpc_request_free(req);
299 req->rq_request_portal = OST_IO_PORTAL;
300 ptlrpc_at_set_req_timeout(req);
302 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
304 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
307 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308 &RMF_OST_LADVISE_HDR);
309 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
311 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313 ptlrpc_request_set_replen(req);
316 /* Do not wait for response. */
317 ptlrpcd_add_req(req);
321 req->rq_interpret_reply = osc_ladvise_interpret;
322 la = ptlrpc_req_async_args(la, req);
324 la->la_upcall = upcall;
325 la->la_cookie = cookie;
327 ptlrpc_set_add_req(rqset, req);
332 static int osc_create(const struct lu_env *env, struct obd_export *exp,
335 struct ptlrpc_request *req;
336 struct ost_body *body;
341 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
342 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
344 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
346 GOTO(out, rc = -ENOMEM);
348 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
350 ptlrpc_request_free(req);
354 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
357 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
359 ptlrpc_request_set_replen(req);
361 rc = ptlrpc_queue_wait(req);
365 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
367 GOTO(out_req, rc = -EPROTO);
369 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
370 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
372 oa->o_blksize = cli_brw_size(exp->exp_obd);
373 oa->o_valid |= OBD_MD_FLBLKSZ;
375 CDEBUG(D_HA, "transno: %lld\n",
376 lustre_msg_get_transno(req->rq_repmsg));
378 ptlrpc_req_finished(req);
383 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
384 obd_enqueue_update_f upcall, void *cookie)
386 struct ptlrpc_request *req;
387 struct osc_setattr_args *sa;
388 struct obd_import *imp = class_exp2cliimp(exp);
389 struct ost_body *body;
394 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
398 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
400 ptlrpc_request_free(req);
404 osc_set_io_portal(req);
406 ptlrpc_at_set_req_timeout(req);
408 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
410 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
412 ptlrpc_request_set_replen(req);
414 req->rq_interpret_reply = osc_setattr_interpret;
415 sa = ptlrpc_req_async_args(sa, req);
417 sa->sa_upcall = upcall;
418 sa->sa_cookie = cookie;
420 ptlrpcd_add_req(req);
424 EXPORT_SYMBOL(osc_punch_send);
427 * osc_fallocate_base() - Handles fallocate request.
429 * @exp: Export structure
430 * @oa: Attributes passed to OSS from client (obdo structure)
431 * @upcall: Primary & supplementary group information
432 * @cookie: Exclusive identifier
433 * @rqset: Request list.
434 * @mode: Operation done on given range.
436 * osc_fallocate_base() - Handles fallocate requests only. Only block
437 * allocation or standard preallocate operation is supported currently.
438 * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
439 * is supported via SETATTR request.
441 * Return: Non-zero on failure and O on success.
443 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
444 obd_enqueue_update_f upcall, void *cookie, int mode)
446 struct ptlrpc_request *req;
447 struct osc_setattr_args *sa;
448 struct ost_body *body;
449 struct obd_import *imp = class_exp2cliimp(exp);
454 * Only mode == 0 (which is standard prealloc) is supported now.
455 * Punch is not supported yet.
457 if (mode & ~FALLOC_FL_KEEP_SIZE)
459 oa->o_falloc_mode = mode;
461 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
466 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
468 ptlrpc_request_free(req);
472 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
475 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
477 ptlrpc_request_set_replen(req);
479 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
480 BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
481 sa = ptlrpc_req_async_args(sa, req);
483 sa->sa_upcall = upcall;
484 sa->sa_cookie = cookie;
486 ptlrpcd_add_req(req);
491 static int osc_sync_interpret(const struct lu_env *env,
492 struct ptlrpc_request *req, void *args, int rc)
494 struct osc_fsync_args *fa = args;
495 struct ost_body *body;
496 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
497 unsigned long valid = 0;
498 struct cl_object *obj;
504 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
506 CERROR("can't unpack ost_body\n");
507 GOTO(out, rc = -EPROTO);
510 *fa->fa_oa = body->oa;
511 obj = osc2cl(fa->fa_obj);
513 /* Update osc object's blocks attribute */
514 cl_object_attr_lock(obj);
515 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
516 attr->cat_blocks = body->oa.o_blocks;
521 cl_object_attr_update(env, obj, attr, valid);
522 cl_object_attr_unlock(obj);
525 rc = fa->fa_upcall(fa->fa_cookie, rc);
529 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
530 obd_enqueue_update_f upcall, void *cookie,
531 struct ptlrpc_request_set *rqset)
533 struct obd_export *exp = osc_export(obj);
534 struct ptlrpc_request *req;
535 struct ost_body *body;
536 struct osc_fsync_args *fa;
540 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
544 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
546 ptlrpc_request_free(req);
550 /* overload the size and blocks fields in the oa with start/end */
551 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
553 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
555 ptlrpc_request_set_replen(req);
556 req->rq_interpret_reply = osc_sync_interpret;
558 fa = ptlrpc_req_async_args(fa, req);
561 fa->fa_upcall = upcall;
562 fa->fa_cookie = cookie;
564 ptlrpc_set_add_req(rqset, req);
569 /* Find and cancel locally locks matched by @mode in the resource found by
570 * @objid. Found locks are added into @cancel list. Returns the amount of
571 * locks added to @cancels list. */
572 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
573 struct list_head *cancels,
574 enum ldlm_mode mode, __u64 lock_flags)
576 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
577 struct ldlm_res_id res_id;
578 struct ldlm_resource *res;
582 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
583 * export) but disabled through procfs (flag in NS).
585 * This distinguishes from a case when ELC is not supported originally,
586 * when we still want to cancel locks in advance and just cancel them
587 * locally, without sending any RPC. */
588 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
591 ostid_build_res_name(&oa->o_oi, &res_id);
592 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
596 LDLM_RESOURCE_ADDREF(res);
597 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
598 lock_flags, 0, NULL);
599 LDLM_RESOURCE_DELREF(res);
600 ldlm_resource_putref(res);
604 static int osc_destroy_interpret(const struct lu_env *env,
605 struct ptlrpc_request *req, void *args, int rc)
607 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
609 atomic_dec(&cli->cl_destroy_in_flight);
610 wake_up(&cli->cl_destroy_waitq);
615 static int osc_can_send_destroy(struct client_obd *cli)
617 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
618 cli->cl_max_rpcs_in_flight) {
619 /* The destroy request can be sent */
622 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
623 cli->cl_max_rpcs_in_flight) {
625 * The counter has been modified between the two atomic
628 wake_up(&cli->cl_destroy_waitq);
633 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
636 struct client_obd *cli = &exp->exp_obd->u.cli;
637 struct ptlrpc_request *req;
638 struct ost_body *body;
644 CDEBUG(D_INFO, "oa NULL\n");
648 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
649 LDLM_FL_DISCARD_DATA);
651 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
653 ldlm_lock_list_put(&cancels, l_bl_ast, count);
657 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
660 ptlrpc_request_free(req);
664 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
665 ptlrpc_at_set_req_timeout(req);
667 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
669 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
671 ptlrpc_request_set_replen(req);
673 req->rq_interpret_reply = osc_destroy_interpret;
674 if (!osc_can_send_destroy(cli)) {
676 * Wait until the number of on-going destroy RPCs drops
677 * under max_rpc_in_flight
679 rc = l_wait_event_abortable_exclusive(
680 cli->cl_destroy_waitq,
681 osc_can_send_destroy(cli));
683 ptlrpc_req_finished(req);
688 /* Do not wait for response */
689 ptlrpcd_add_req(req);
693 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
696 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
698 LASSERT(!(oa->o_valid & bits));
701 spin_lock(&cli->cl_loi_list_lock);
702 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
703 oa->o_dirty = cli->cl_dirty_grant;
705 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
706 if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
707 CERROR("dirty %lu > dirty_max %lu\n",
709 cli->cl_dirty_max_pages);
711 } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
712 (long)(obd_max_dirty_pages + 1))) {
713 /* The atomic_read() allowing the atomic_inc() are
714 * not covered by a lock thus they may safely race and trip
715 * this CERROR() unless we add in a small fudge factor (+1). */
716 CERROR("%s: dirty %ld > system dirty_max %ld\n",
717 cli_name(cli), atomic_long_read(&obd_dirty_pages),
718 obd_max_dirty_pages);
720 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
722 CERROR("dirty %lu - dirty_max %lu too big???\n",
723 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
726 unsigned long nrpages;
727 unsigned long undirty;
729 nrpages = cli->cl_max_pages_per_rpc;
730 nrpages *= cli->cl_max_rpcs_in_flight + 1;
731 nrpages = max(nrpages, cli->cl_dirty_max_pages);
732 undirty = nrpages << PAGE_SHIFT;
733 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
737 /* take extent tax into account when asking for more
739 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
740 cli->cl_max_extent_pages;
741 undirty += nrextents * cli->cl_grant_extent_tax;
743 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
744 * to add extent tax, etc.
746 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
747 ~(PTLRPC_MAX_BRW_SIZE * 4UL));
749 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
750 /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
751 if (cli->cl_lost_grant > INT_MAX) {
753 "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
754 cli_name(cli), cli->cl_lost_grant);
755 oa->o_dropped = INT_MAX;
757 oa->o_dropped = cli->cl_lost_grant;
759 cli->cl_lost_grant -= oa->o_dropped;
760 spin_unlock(&cli->cl_loi_list_lock);
761 CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
762 " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
763 oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
766 void osc_update_next_shrink(struct client_obd *cli)
768 cli->cl_next_shrink_grant = ktime_get_seconds() +
769 cli->cl_grant_shrink_interval;
771 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
772 cli->cl_next_shrink_grant);
775 static void __osc_update_grant(struct client_obd *cli, u64 grant)
777 spin_lock(&cli->cl_loi_list_lock);
778 cli->cl_avail_grant += grant;
779 spin_unlock(&cli->cl_loi_list_lock);
782 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
784 if (body->oa.o_valid & OBD_MD_FLGRANT) {
785 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
786 __osc_update_grant(cli, body->oa.o_grant);
791 * grant thread data for shrinking space.
793 struct grant_thread_data {
794 struct list_head gtd_clients;
795 struct mutex gtd_mutex;
796 unsigned long gtd_stopped:1;
798 static struct grant_thread_data client_gtd;
800 static int osc_shrink_grant_interpret(const struct lu_env *env,
801 struct ptlrpc_request *req,
804 struct osc_grant_args *aa = args;
805 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
806 struct ost_body *body;
809 __osc_update_grant(cli, aa->aa_oa->o_grant);
813 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
815 osc_update_grant(cli, body);
817 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
823 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
825 spin_lock(&cli->cl_loi_list_lock);
826 oa->o_grant = cli->cl_avail_grant / 4;
827 cli->cl_avail_grant -= oa->o_grant;
828 spin_unlock(&cli->cl_loi_list_lock);
829 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
830 oa->o_valid |= OBD_MD_FLFLAGS;
833 oa->o_flags |= OBD_FL_SHRINK_GRANT;
834 osc_update_next_shrink(cli);
837 /* Shrink the current grant, either from some large amount to enough for a
838 * full set of in-flight RPCs, or if we have already shrunk to that limit
839 * then to enough for a single RPC. This avoids keeping more grant than
840 * needed, and avoids shrinking the grant piecemeal. */
841 static int osc_shrink_grant(struct client_obd *cli)
843 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
844 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
846 spin_lock(&cli->cl_loi_list_lock);
847 if (cli->cl_avail_grant <= target_bytes)
848 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
849 spin_unlock(&cli->cl_loi_list_lock);
851 return osc_shrink_grant_to_target(cli, target_bytes);
854 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
857 struct ost_body *body;
860 spin_lock(&cli->cl_loi_list_lock);
861 /* Don't shrink if we are already above or below the desired limit
862 * We don't want to shrink below a single RPC, as that will negatively
863 * impact block allocation and long-term performance. */
864 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
865 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
867 if (target_bytes >= cli->cl_avail_grant) {
868 spin_unlock(&cli->cl_loi_list_lock);
871 spin_unlock(&cli->cl_loi_list_lock);
877 osc_announce_cached(cli, &body->oa, 0);
879 spin_lock(&cli->cl_loi_list_lock);
880 if (target_bytes >= cli->cl_avail_grant) {
881 /* available grant has changed since target calculation */
882 spin_unlock(&cli->cl_loi_list_lock);
883 GOTO(out_free, rc = 0);
885 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
886 cli->cl_avail_grant = target_bytes;
887 spin_unlock(&cli->cl_loi_list_lock);
888 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
889 body->oa.o_valid |= OBD_MD_FLFLAGS;
890 body->oa.o_flags = 0;
892 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
893 osc_update_next_shrink(cli);
895 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
896 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
897 sizeof(*body), body, NULL);
899 __osc_update_grant(cli, body->oa.o_grant);
905 static int osc_should_shrink_grant(struct client_obd *client)
907 time64_t next_shrink = client->cl_next_shrink_grant;
909 if (client->cl_import == NULL)
912 if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
913 client->cl_import->imp_grant_shrink_disabled) {
914 osc_update_next_shrink(client);
918 if (ktime_get_seconds() >= next_shrink - 5) {
919 /* Get the current RPC size directly, instead of going via:
920 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
921 * Keep comment here so that it can be found by searching. */
922 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
924 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
925 client->cl_avail_grant > brw_size)
928 osc_update_next_shrink(client);
933 #define GRANT_SHRINK_RPC_BATCH 100
935 static struct delayed_work work;
937 static void osc_grant_work_handler(struct work_struct *data)
939 struct client_obd *cli;
941 bool init_next_shrink = true;
942 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
945 mutex_lock(&client_gtd.gtd_mutex);
946 list_for_each_entry(cli, &client_gtd.gtd_clients,
948 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
949 osc_should_shrink_grant(cli)) {
950 osc_shrink_grant(cli);
954 if (!init_next_shrink) {
955 if (cli->cl_next_shrink_grant < next_shrink &&
956 cli->cl_next_shrink_grant > ktime_get_seconds())
957 next_shrink = cli->cl_next_shrink_grant;
959 init_next_shrink = false;
960 next_shrink = cli->cl_next_shrink_grant;
963 mutex_unlock(&client_gtd.gtd_mutex);
965 if (client_gtd.gtd_stopped == 1)
968 if (next_shrink > ktime_get_seconds()) {
969 time64_t delay = next_shrink - ktime_get_seconds();
971 schedule_delayed_work(&work, cfs_time_seconds(delay));
973 schedule_work(&work.work);
977 void osc_schedule_grant_work(void)
979 cancel_delayed_work_sync(&work);
980 schedule_work(&work.work);
984 * Start grant thread for returing grant to server for idle clients.
986 static int osc_start_grant_work(void)
988 client_gtd.gtd_stopped = 0;
989 mutex_init(&client_gtd.gtd_mutex);
990 INIT_LIST_HEAD(&client_gtd.gtd_clients);
992 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
993 schedule_work(&work.work);
998 static void osc_stop_grant_work(void)
1000 client_gtd.gtd_stopped = 1;
1001 cancel_delayed_work_sync(&work);
1004 static void osc_add_grant_list(struct client_obd *client)
1006 mutex_lock(&client_gtd.gtd_mutex);
1007 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1008 mutex_unlock(&client_gtd.gtd_mutex);
1011 static void osc_del_grant_list(struct client_obd *client)
1013 if (list_empty(&client->cl_grant_chain))
1016 mutex_lock(&client_gtd.gtd_mutex);
1017 list_del_init(&client->cl_grant_chain);
1018 mutex_unlock(&client_gtd.gtd_mutex);
1021 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1024 * ocd_grant is the total grant amount we're expect to hold: if we've
1025 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1026 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1029 * race is tolerable here: if we're evicted, but imp_state already
1030 * left EVICTED state, then cl_dirty_pages must be 0 already.
1032 spin_lock(&cli->cl_loi_list_lock);
1033 cli->cl_avail_grant = ocd->ocd_grant;
1034 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1035 unsigned long consumed = cli->cl_reserved_grant;
1037 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1038 consumed += cli->cl_dirty_grant;
1040 consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1041 if (cli->cl_avail_grant < consumed) {
1042 CERROR("%s: granted %ld but already consumed %ld\n",
1043 cli_name(cli), cli->cl_avail_grant, consumed);
1044 cli->cl_avail_grant = 0;
1046 cli->cl_avail_grant -= consumed;
1050 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1054 /* overhead for each extent insertion */
1055 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1056 /* determine the appropriate chunk size used by osc_extent. */
1057 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1058 ocd->ocd_grant_blkbits);
1059 /* max_pages_per_rpc must be chunk aligned */
1060 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1061 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1062 ~chunk_mask) & chunk_mask;
1063 /* determine maximum extent size, in #pages */
1064 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1065 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
1066 if (cli->cl_max_extent_pages == 0)
1067 cli->cl_max_extent_pages = 1;
1069 cli->cl_grant_extent_tax = 0;
1070 cli->cl_chunkbits = PAGE_SHIFT;
1071 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1073 spin_unlock(&cli->cl_loi_list_lock);
1076 "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1078 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1079 cli->cl_max_extent_pages);
1081 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1082 osc_add_grant_list(cli);
1084 EXPORT_SYMBOL(osc_init_grant);
1086 /* We assume that the reason this OSC got a short read is because it read
1087 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1088 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1089 * this stripe never got written at or beyond this stripe offset yet. */
1090 static void handle_short_read(int nob_read, size_t page_count,
1091 struct brw_page **pga)
1096 /* skip bytes read OK */
1097 while (nob_read > 0) {
1098 LASSERT (page_count > 0);
1100 if (pga[i]->count > nob_read) {
1101 /* EOF inside this page */
1102 ptr = kmap(pga[i]->pg) +
1103 (pga[i]->off & ~PAGE_MASK);
1104 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1111 nob_read -= pga[i]->count;
1116 /* zero remaining pages */
1117 while (page_count-- > 0) {
1118 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1119 memset(ptr, 0, pga[i]->count);
1125 static int check_write_rcs(struct ptlrpc_request *req,
1126 int requested_nob, int niocount,
1127 size_t page_count, struct brw_page **pga)
1132 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1133 sizeof(*remote_rcs) *
1135 if (remote_rcs == NULL) {
1136 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1140 /* return error if any niobuf was in error */
1141 for (i = 0; i < niocount; i++) {
1142 if ((int)remote_rcs[i] < 0) {
1143 CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1144 i, remote_rcs[i], req);
1145 return remote_rcs[i];
1148 if (remote_rcs[i] != 0) {
1149 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1150 i, remote_rcs[i], req);
1154 if (req->rq_bulk != NULL &&
1155 req->rq_bulk->bd_nob_transferred != requested_nob) {
1156 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1157 req->rq_bulk->bd_nob_transferred, requested_nob);
1164 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1166 if (p1->flag != p2->flag) {
1167 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1168 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1169 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1171 /* warn if we try to combine flags that we don't know to be
1172 * safe to combine */
1173 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1174 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1175 "report this at https://jira.whamcloud.com/\n",
1176 p1->flag, p2->flag);
1181 return (p1->off + p1->count == p2->off);
1184 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1185 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1186 size_t pg_count, struct brw_page **pga,
1187 int opc, obd_dif_csum_fn *fn,
1191 struct ahash_request *req;
1192 /* Used Adler as the default checksum type on top of DIF tags */
1193 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1194 struct page *__page;
1195 unsigned char *buffer;
1197 unsigned int bufsize;
1199 int used_number = 0;
1205 LASSERT(pg_count > 0);
1207 __page = alloc_page(GFP_KERNEL);
1211 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1214 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1215 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1219 buffer = kmap(__page);
1220 guard_start = (__u16 *)buffer;
1221 guard_number = PAGE_SIZE / sizeof(*guard_start);
1222 while (nob > 0 && pg_count > 0) {
1223 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1225 /* corrupt the data before we compute the checksum, to
1226 * simulate an OST->client data error */
1227 if (unlikely(i == 0 && opc == OST_READ &&
1228 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1229 unsigned char *ptr = kmap(pga[i]->pg);
1230 int off = pga[i]->off & ~PAGE_MASK;
1232 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1237 * The left guard number should be able to hold checksums of a
1240 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1241 pga[i]->off & ~PAGE_MASK,
1243 guard_start + used_number,
1244 guard_number - used_number,
1250 used_number += used;
1251 if (used_number == guard_number) {
1252 cfs_crypto_hash_update_page(req, __page, 0,
1253 used_number * sizeof(*guard_start));
1257 nob -= pga[i]->count;
1265 if (used_number != 0)
1266 cfs_crypto_hash_update_page(req, __page, 0,
1267 used_number * sizeof(*guard_start));
1269 bufsize = sizeof(cksum);
1270 cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1272 /* For sending we only compute the wrong checksum instead
1273 * of corrupting the data so it is still correct on a redo */
1274 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1279 __free_page(__page);
1282 #else /* !CONFIG_CRC_T10DIF */
1283 #define obd_dif_ip_fn NULL
1284 #define obd_dif_crc_fn NULL
1285 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \
1287 #endif /* CONFIG_CRC_T10DIF */
1289 static int osc_checksum_bulk(int nob, size_t pg_count,
1290 struct brw_page **pga, int opc,
1291 enum cksum_types cksum_type,
1295 struct ahash_request *req;
1296 unsigned int bufsize;
1297 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1299 LASSERT(pg_count > 0);
1301 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1303 CERROR("Unable to initialize checksum hash %s\n",
1304 cfs_crypto_hash_name(cfs_alg));
1305 return PTR_ERR(req);
1308 while (nob > 0 && pg_count > 0) {
1309 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1311 /* corrupt the data before we compute the checksum, to
1312 * simulate an OST->client data error */
1313 if (i == 0 && opc == OST_READ &&
1314 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1315 unsigned char *ptr = kmap(pga[i]->pg);
1316 int off = pga[i]->off & ~PAGE_MASK;
1318 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1321 cfs_crypto_hash_update_page(req, pga[i]->pg,
1322 pga[i]->off & ~PAGE_MASK,
1324 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1325 (int)(pga[i]->off & ~PAGE_MASK));
1327 nob -= pga[i]->count;
1332 bufsize = sizeof(*cksum);
1333 cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1335 /* For sending we only compute the wrong checksum instead
1336 * of corrupting the data so it is still correct on a redo */
1337 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1343 static int osc_checksum_bulk_rw(const char *obd_name,
1344 enum cksum_types cksum_type,
1345 int nob, size_t pg_count,
1346 struct brw_page **pga, int opc,
1349 obd_dif_csum_fn *fn = NULL;
1350 int sector_size = 0;
1354 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1357 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1358 opc, fn, sector_size, check_sum);
1360 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1366 static inline void osc_release_bounce_pages(struct brw_page **pga,
1369 #ifdef HAVE_LUSTRE_CRYPTO
1372 for (i = 0; i < page_count; i++) {
1373 /* Bounce pages allocated by a call to
1374 * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1375 * are identified thanks to the PageChecked flag.
1377 if (PageChecked(pga[i]->pg))
1378 llcrypt_finalize_bounce_page(&pga[i]->pg);
1379 pga[i]->count -= pga[i]->bp_count_diff;
1380 pga[i]->off += pga[i]->bp_off_diff;
1386 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1387 u32 page_count, struct brw_page **pga,
1388 struct ptlrpc_request **reqp, int resend)
1390 struct ptlrpc_request *req;
1391 struct ptlrpc_bulk_desc *desc;
1392 struct ost_body *body;
1393 struct obd_ioobj *ioobj;
1394 struct niobuf_remote *niobuf;
1395 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1396 struct osc_brw_async_args *aa;
1397 struct req_capsule *pill;
1398 struct brw_page *pg_prev;
1400 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1401 struct inode *inode;
1404 inode = page2inode(pga[0]->pg);
1405 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1406 RETURN(-ENOMEM); /* Recoverable */
1407 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1408 RETURN(-EINVAL); /* Fatal */
1410 if ((cmd & OBD_BRW_WRITE) != 0) {
1412 req = ptlrpc_request_alloc_pool(cli->cl_import,
1414 &RQF_OST_BRW_WRITE);
1417 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1422 if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1423 for (i = 0; i < page_count; i++) {
1424 struct brw_page *pg = pga[i];
1425 struct page *data_page = NULL;
1426 bool retried = false;
1427 bool lockedbymyself;
1428 u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1431 if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1432 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1433 LUSTRE_ENCRYPTION_UNIT_SIZE;
1434 /* The page can already be locked when we arrive here.
1435 * This is possible when cl_page_assume/vvp_page_assume
1436 * is stuck on wait_on_page_writeback with page lock
1437 * held. In this case there is no risk for the lock to
1438 * be released while we are doing our encryption
1439 * processing, because writeback against that page will
1440 * end in vvp_page_completion_write/cl_page_completion,
1441 * which means only once the page is fully processed.
1443 lockedbymyself = trylock_page(pg->pg);
1445 llcrypt_encrypt_pagecache_blocks(pg->pg,
1449 unlock_page(pg->pg);
1450 if (IS_ERR(data_page)) {
1451 rc = PTR_ERR(data_page);
1452 if (rc == -ENOMEM && !retried) {
1457 ptlrpc_request_free(req);
1460 /* Set PageChecked flag on bounce page for
1461 * disambiguation in osc_release_bounce_pages().
1463 SetPageChecked(data_page);
1465 /* there should be no gap in the middle of page array */
1466 if (i == page_count - 1) {
1467 struct osc_async_page *oap = brw_page2oap(pg);
1469 oa->o_size = oap->oap_count +
1470 oap->oap_obj_off + oap->oap_page_off;
1472 /* len is forced to nunits, and relative offset to 0
1473 * so store the old, clear text info
1475 pg->bp_count_diff = nunits - pg->count;
1477 pg->bp_off_diff = pg->off & ~PAGE_MASK;
1478 pg->off = pg->off & PAGE_MASK;
1480 } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1481 for (i = 0; i < page_count; i++) {
1482 struct brw_page *pg = pga[i];
1483 u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1485 if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1486 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1487 LUSTRE_ENCRYPTION_UNIT_SIZE;
1488 /* count/off are forced to cover the whole encryption
1489 * unit size so that all encrypted data is stored on the
1490 * OST, so adjust bp_{count,off}_diff for the size of
1493 pg->bp_count_diff = nunits - pg->count;
1495 pg->bp_off_diff = pg->off & ~PAGE_MASK;
1496 pg->off = pg->off & PAGE_MASK;
1500 for (niocount = i = 1; i < page_count; i++) {
1501 if (!can_merge_pages(pga[i - 1], pga[i]))
1505 pill = &req->rq_pill;
1506 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1508 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1509 niocount * sizeof(*niobuf));
1511 for (i = 0; i < page_count; i++) {
1512 short_io_size += pga[i]->count;
1513 if (!inode || !IS_ENCRYPTED(inode)) {
1514 pga[i]->bp_count_diff = 0;
1515 pga[i]->bp_off_diff = 0;
1519 /* Check if read/write is small enough to be a short io. */
1520 if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1521 !imp_connect_shortio(cli->cl_import))
1524 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1525 opc == OST_READ ? 0 : short_io_size);
1526 if (opc == OST_READ)
1527 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1530 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1532 ptlrpc_request_free(req);
1535 osc_set_io_portal(req);
1537 ptlrpc_at_set_req_timeout(req);
1538 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1540 req->rq_no_retry_einprogress = 1;
1542 if (short_io_size != 0) {
1544 short_io_buf = NULL;
1548 desc = ptlrpc_prep_bulk_imp(req, page_count,
1549 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1550 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1551 PTLRPC_BULK_PUT_SINK),
1553 &ptlrpc_bulk_kiov_pin_ops);
1556 GOTO(out, rc = -ENOMEM);
1557 /* NB request now owns desc and will free it when it gets freed */
1559 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1560 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1561 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1562 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1564 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1566 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1567 * and from_kgid(), because they are asynchronous. Fortunately, variable
1568 * oa contains valid o_uid and o_gid in these two operations.
1569 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1570 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1571 * other process logic */
1572 body->oa.o_uid = oa->o_uid;
1573 body->oa.o_gid = oa->o_gid;
1575 obdo_to_ioobj(oa, ioobj);
1576 ioobj->ioo_bufcnt = niocount;
1577 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1578 * that might be send for this request. The actual number is decided
1579 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1580 * "max - 1" for old client compatibility sending "0", and also so the
1581 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1583 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1585 ioobj_max_brw_set(ioobj, 0);
1587 if (short_io_size != 0) {
1588 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1589 body->oa.o_valid |= OBD_MD_FLFLAGS;
1590 body->oa.o_flags = 0;
1592 body->oa.o_flags |= OBD_FL_SHORT_IO;
1593 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1595 if (opc == OST_WRITE) {
1596 short_io_buf = req_capsule_client_get(pill,
1598 LASSERT(short_io_buf != NULL);
1602 LASSERT(page_count > 0);
1604 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1605 struct brw_page *pg = pga[i];
1606 int poff = pg->off & ~PAGE_MASK;
1608 LASSERT(pg->count > 0);
1609 /* make sure there is no gap in the middle of page array */
1610 LASSERTF(page_count == 1 ||
1611 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1612 ergo(i > 0 && i < page_count - 1,
1613 poff == 0 && pg->count == PAGE_SIZE) &&
1614 ergo(i == page_count - 1, poff == 0)),
1615 "i: %d/%d pg: %p off: %llu, count: %u\n",
1616 i, page_count, pg, pg->off, pg->count);
1617 LASSERTF(i == 0 || pg->off > pg_prev->off,
1618 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1619 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1621 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1622 pg_prev->pg, page_private(pg_prev->pg),
1623 pg_prev->pg->index, pg_prev->off);
1624 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1625 (pg->flag & OBD_BRW_SRVLOCK));
1626 if (short_io_size != 0 && opc == OST_WRITE) {
1627 unsigned char *ptr = kmap_atomic(pg->pg);
1629 LASSERT(short_io_size >= requested_nob + pg->count);
1630 memcpy(short_io_buf + requested_nob,
1634 } else if (short_io_size == 0) {
1635 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1638 requested_nob += pg->count;
1640 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1642 niobuf->rnb_len += pg->count;
1644 niobuf->rnb_offset = pg->off;
1645 niobuf->rnb_len = pg->count;
1646 niobuf->rnb_flags = pg->flag;
1651 LASSERTF((void *)(niobuf - niocount) ==
1652 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1653 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1654 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1656 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1658 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1659 body->oa.o_valid |= OBD_MD_FLFLAGS;
1660 body->oa.o_flags = 0;
1662 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1665 if (osc_should_shrink_grant(cli))
1666 osc_shrink_grant_local(cli, &body->oa);
1668 /* size[REQ_REC_OFF] still sizeof (*body) */
1669 if (opc == OST_WRITE) {
1670 if (cli->cl_checksum &&
1671 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1672 /* store cl_cksum_type in a local variable since
1673 * it can be changed via lprocfs */
1674 enum cksum_types cksum_type = cli->cl_cksum_type;
1676 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1677 body->oa.o_flags = 0;
1679 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1681 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1683 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1684 requested_nob, page_count,
1688 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1692 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1695 /* save this in 'oa', too, for later checking */
1696 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1697 oa->o_flags |= obd_cksum_type_pack(obd_name,
1700 /* clear out the checksum flag, in case this is a
1701 * resend but cl_checksum is no longer set. b=11238 */
1702 oa->o_valid &= ~OBD_MD_FLCKSUM;
1704 oa->o_cksum = body->oa.o_cksum;
1705 /* 1 RC per niobuf */
1706 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1707 sizeof(__u32) * niocount);
1709 if (cli->cl_checksum &&
1710 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1711 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1712 body->oa.o_flags = 0;
1713 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1714 cli->cl_cksum_type);
1715 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1718 /* Client cksum has been already copied to wire obdo in previous
1719 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1720 * resent due to cksum error, this will allow Server to
1721 * check+dump pages on its side */
1723 ptlrpc_request_set_replen(req);
1725 aa = ptlrpc_req_async_args(aa, req);
1727 aa->aa_requested_nob = requested_nob;
1728 aa->aa_nio_count = niocount;
1729 aa->aa_page_count = page_count;
1733 INIT_LIST_HEAD(&aa->aa_oaps);
1736 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1737 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1738 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1739 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1743 ptlrpc_req_finished(req);
1747 char dbgcksum_file_name[PATH_MAX];
1749 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1750 struct brw_page **pga, __u32 server_cksum,
1758 /* will only keep dump of pages on first error for the same range in
1759 * file/fid, not during the resends/retries. */
1760 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1761 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1762 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1763 libcfs_debug_file_path_arr :
1764 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1765 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1766 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1767 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1769 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1770 client_cksum, server_cksum);
1771 filp = filp_open(dbgcksum_file_name,
1772 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1776 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1777 "checksum error: rc = %d\n", dbgcksum_file_name,
1780 CERROR("%s: can't open to dump pages with checksum "
1781 "error: rc = %d\n", dbgcksum_file_name, rc);
1785 for (i = 0; i < page_count; i++) {
1786 len = pga[i]->count;
1787 buf = kmap(pga[i]->pg);
1789 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1791 CERROR("%s: wanted to write %u but got %d "
1792 "error\n", dbgcksum_file_name, len, rc);
1797 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1798 dbgcksum_file_name, rc);
1803 rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1805 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1806 filp_close(filp, NULL);
1810 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1811 __u32 client_cksum, __u32 server_cksum,
1812 struct osc_brw_async_args *aa)
1814 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1815 enum cksum_types cksum_type;
1816 obd_dif_csum_fn *fn = NULL;
1817 int sector_size = 0;
1822 if (server_cksum == client_cksum) {
1823 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1827 if (aa->aa_cli->cl_checksum_dump)
1828 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1829 server_cksum, client_cksum);
1831 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1834 switch (cksum_type) {
1835 case OBD_CKSUM_T10IP512:
1839 case OBD_CKSUM_T10IP4K:
1843 case OBD_CKSUM_T10CRC512:
1844 fn = obd_dif_crc_fn;
1847 case OBD_CKSUM_T10CRC4K:
1848 fn = obd_dif_crc_fn;
1856 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1857 aa->aa_page_count, aa->aa_ppga,
1858 OST_WRITE, fn, sector_size,
1861 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1862 aa->aa_ppga, OST_WRITE, cksum_type,
1866 msg = "failed to calculate the client write checksum";
1867 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1868 msg = "the server did not use the checksum type specified in "
1869 "the original request - likely a protocol problem";
1870 else if (new_cksum == server_cksum)
1871 msg = "changed on the client after we checksummed it - "
1872 "likely false positive due to mmap IO (bug 11742)";
1873 else if (new_cksum == client_cksum)
1874 msg = "changed in transit before arrival at OST";
1876 msg = "changed in transit AND doesn't match the original - "
1877 "likely false positive due to mmap IO (bug 11742)";
1879 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1880 DFID " object "DOSTID" extent [%llu-%llu], original "
1881 "client csum %x (type %x), server csum %x (type %x),"
1882 " client csum now %x\n",
1883 obd_name, msg, libcfs_nid2str(peer->nid),
1884 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1885 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1886 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1887 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1888 aa->aa_ppga[aa->aa_page_count - 1]->off +
1889 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1891 obd_cksum_type_unpack(aa->aa_oa->o_flags),
1892 server_cksum, cksum_type, new_cksum);
1896 /* Note rc enters this function as number of bytes transferred */
1897 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1899 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1900 struct client_obd *cli = aa->aa_cli;
1901 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1902 const struct lnet_process_id *peer =
1903 &req->rq_import->imp_connection->c_peer;
1904 struct ost_body *body;
1905 u32 client_cksum = 0;
1906 struct inode *inode;
1910 if (rc < 0 && rc != -EDQUOT) {
1911 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1915 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1916 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1918 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1922 /* set/clear over quota flag for a uid/gid/projid */
1923 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1924 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1925 unsigned qid[LL_MAXQUOTAS] = {
1926 body->oa.o_uid, body->oa.o_gid,
1927 body->oa.o_projid };
1929 "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1930 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1931 body->oa.o_valid, body->oa.o_flags);
1932 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1936 osc_update_grant(cli, body);
1941 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1942 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1944 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1946 CERROR("%s: unexpected positive size %d\n",
1951 if (req->rq_bulk != NULL &&
1952 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1955 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1956 check_write_checksum(&body->oa, peer, client_cksum,
1957 body->oa.o_cksum, aa))
1960 rc = check_write_rcs(req, aa->aa_requested_nob,
1961 aa->aa_nio_count, aa->aa_page_count,
1966 /* The rest of this function executes only for OST_READs */
1968 if (req->rq_bulk == NULL) {
1969 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1971 LASSERT(rc == req->rq_status);
1973 /* if unwrap_bulk failed, return -EAGAIN to retry */
1974 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1977 GOTO(out, rc = -EAGAIN);
1979 if (rc > aa->aa_requested_nob) {
1980 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1981 rc, aa->aa_requested_nob);
1985 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1986 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1987 rc, req->rq_bulk->bd_nob_transferred);
1991 if (req->rq_bulk == NULL) {
1993 int nob, pg_count, i = 0;
1996 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1997 pg_count = aa->aa_page_count;
1998 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2001 while (nob > 0 && pg_count > 0) {
2003 int count = aa->aa_ppga[i]->count > nob ?
2004 nob : aa->aa_ppga[i]->count;
2006 CDEBUG(D_CACHE, "page %p count %d\n",
2007 aa->aa_ppga[i]->pg, count);
2008 ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2009 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2011 kunmap_atomic((void *) ptr);
2020 if (rc < aa->aa_requested_nob)
2021 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2023 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2024 static int cksum_counter;
2025 u32 server_cksum = body->oa.o_cksum;
2028 enum cksum_types cksum_type;
2029 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2030 body->oa.o_flags : 0;
2032 cksum_type = obd_cksum_type_unpack(o_flags);
2033 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2034 aa->aa_page_count, aa->aa_ppga,
2035 OST_READ, &client_cksum);
2039 if (req->rq_bulk != NULL &&
2040 peer->nid != req->rq_bulk->bd_sender) {
2042 router = libcfs_nid2str(req->rq_bulk->bd_sender);
2045 if (server_cksum != client_cksum) {
2046 struct ost_body *clbody;
2047 u32 page_count = aa->aa_page_count;
2049 clbody = req_capsule_client_get(&req->rq_pill,
2051 if (cli->cl_checksum_dump)
2052 dump_all_bulk_pages(&clbody->oa, page_count,
2053 aa->aa_ppga, server_cksum,
2056 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2057 "%s%s%s inode "DFID" object "DOSTID
2058 " extent [%llu-%llu], client %x, "
2059 "server %x, cksum_type %x\n",
2061 libcfs_nid2str(peer->nid),
2063 clbody->oa.o_valid & OBD_MD_FLFID ?
2064 clbody->oa.o_parent_seq : 0ULL,
2065 clbody->oa.o_valid & OBD_MD_FLFID ?
2066 clbody->oa.o_parent_oid : 0,
2067 clbody->oa.o_valid & OBD_MD_FLFID ?
2068 clbody->oa.o_parent_ver : 0,
2069 POSTID(&body->oa.o_oi),
2070 aa->aa_ppga[0]->off,
2071 aa->aa_ppga[page_count-1]->off +
2072 aa->aa_ppga[page_count-1]->count - 1,
2073 client_cksum, server_cksum,
2076 aa->aa_oa->o_cksum = client_cksum;
2080 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2083 } else if (unlikely(client_cksum)) {
2084 static int cksum_missed;
2087 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2088 CERROR("%s: checksum %u requested from %s but not sent\n",
2089 obd_name, cksum_missed,
2090 libcfs_nid2str(peer->nid));
2095 inode = page2inode(aa->aa_ppga[0]->pg);
2096 if (inode && IS_ENCRYPTED(inode)) {
2099 if (!llcrypt_has_encryption_key(inode)) {
2100 CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2103 for (idx = 0; idx < aa->aa_page_count; idx++) {
2104 struct brw_page *pg = aa->aa_ppga[idx];
2105 unsigned int offs = 0;
2107 while (offs < PAGE_SIZE) {
2108 /* do not decrypt if page is all 0s */
2109 if (memchr_inv(page_address(pg->pg) + offs, 0,
2110 LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2111 /* if page is empty forward info to
2112 * upper layers (ll_io_zero_page) by
2113 * clearing PagePrivate2
2116 ClearPagePrivate2(pg->pg);
2120 /* The page is already locked when we arrive here,
2121 * except when we deal with a twisted page for
2122 * specific Direct IO support, in which case
2123 * PageChecked flag is set on page.
2125 if (PageChecked(pg->pg))
2127 rc = llcrypt_decrypt_pagecache_blocks(pg->pg,
2128 LUSTRE_ENCRYPTION_UNIT_SIZE,
2130 if (PageChecked(pg->pg))
2131 unlock_page(pg->pg);
2135 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2142 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2143 aa->aa_oa, &body->oa);
2148 static int osc_brw_redo_request(struct ptlrpc_request *request,
2149 struct osc_brw_async_args *aa, int rc)
2151 struct ptlrpc_request *new_req;
2152 struct osc_brw_async_args *new_aa;
2153 struct osc_async_page *oap;
2156 /* The below message is checked in replay-ost-single.sh test_8ae*/
2157 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2158 "redo for recoverable error %d", rc);
2160 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2161 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2162 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2163 aa->aa_ppga, &new_req, 1);
2167 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2168 if (oap->oap_request != NULL) {
2169 LASSERTF(request == oap->oap_request,
2170 "request %p != oap_request %p\n",
2171 request, oap->oap_request);
2175 * New request takes over pga and oaps from old request.
2176 * Note that copying a list_head doesn't work, need to move it...
2179 new_req->rq_interpret_reply = request->rq_interpret_reply;
2180 new_req->rq_async_args = request->rq_async_args;
2181 new_req->rq_commit_cb = request->rq_commit_cb;
2182 /* cap resend delay to the current request timeout, this is similar to
2183 * what ptlrpc does (see after_reply()) */
2184 if (aa->aa_resends > new_req->rq_timeout)
2185 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2187 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2188 new_req->rq_generation_set = 1;
2189 new_req->rq_import_generation = request->rq_import_generation;
2191 new_aa = ptlrpc_req_async_args(new_aa, new_req);
2193 INIT_LIST_HEAD(&new_aa->aa_oaps);
2194 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2195 INIT_LIST_HEAD(&new_aa->aa_exts);
2196 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2197 new_aa->aa_resends = aa->aa_resends;
2199 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2200 if (oap->oap_request) {
2201 ptlrpc_req_finished(oap->oap_request);
2202 oap->oap_request = ptlrpc_request_addref(new_req);
2206 /* XXX: This code will run into problem if we're going to support
2207 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2208 * and wait for all of them to be finished. We should inherit request
2209 * set from old request. */
2210 ptlrpcd_add_req(new_req);
2212 DEBUG_REQ(D_INFO, new_req, "new request");
2217 * ugh, we want disk allocation on the target to happen in offset order. we'll
2218 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2219 * fine for our small page arrays and doesn't require allocation. its an
2220 * insertion sort that swaps elements that are strides apart, shrinking the
2221 * stride down until its '1' and the array is sorted.
2223 static void sort_brw_pages(struct brw_page **array, int num)
2226 struct brw_page *tmp;
2230 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2235 for (i = stride ; i < num ; i++) {
2238 while (j >= stride && array[j - stride]->off > tmp->off) {
2239 array[j] = array[j - stride];
2244 } while (stride > 1);
2247 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2249 LASSERT(ppga != NULL);
2250 OBD_FREE_PTR_ARRAY(ppga, count);
2253 static int brw_interpret(const struct lu_env *env,
2254 struct ptlrpc_request *req, void *args, int rc)
2256 struct osc_brw_async_args *aa = args;
2257 struct osc_extent *ext;
2258 struct osc_extent *tmp;
2259 struct client_obd *cli = aa->aa_cli;
2260 unsigned long transferred = 0;
2264 rc = osc_brw_fini_request(req, rc);
2265 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2267 /* restore clear text pages */
2268 osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2271 * When server returns -EINPROGRESS, client should always retry
2272 * regardless of the number of times the bulk was resent already.
2274 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2275 if (req->rq_import_generation !=
2276 req->rq_import->imp_generation) {
2277 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2278 ""DOSTID", rc = %d.\n",
2279 req->rq_import->imp_obd->obd_name,
2280 POSTID(&aa->aa_oa->o_oi), rc);
2281 } else if (rc == -EINPROGRESS ||
2282 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2283 rc = osc_brw_redo_request(req, aa, rc);
2285 CERROR("%s: too many resent retries for object: "
2286 "%llu:%llu, rc = %d.\n",
2287 req->rq_import->imp_obd->obd_name,
2288 POSTID(&aa->aa_oa->o_oi), rc);
2293 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2298 struct obdo *oa = aa->aa_oa;
2299 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2300 unsigned long valid = 0;
2301 struct cl_object *obj;
2302 struct osc_async_page *last;
2304 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2305 obj = osc2cl(last->oap_obj);
2307 cl_object_attr_lock(obj);
2308 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2309 attr->cat_blocks = oa->o_blocks;
2310 valid |= CAT_BLOCKS;
2312 if (oa->o_valid & OBD_MD_FLMTIME) {
2313 attr->cat_mtime = oa->o_mtime;
2316 if (oa->o_valid & OBD_MD_FLATIME) {
2317 attr->cat_atime = oa->o_atime;
2320 if (oa->o_valid & OBD_MD_FLCTIME) {
2321 attr->cat_ctime = oa->o_ctime;
2325 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2326 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2327 loff_t last_off = last->oap_count + last->oap_obj_off +
2330 /* Change file size if this is an out of quota or
2331 * direct IO write and it extends the file size */
2332 if (loi->loi_lvb.lvb_size < last_off) {
2333 attr->cat_size = last_off;
2336 /* Extend KMS if it's not a lockless write */
2337 if (loi->loi_kms < last_off &&
2338 oap2osc_page(last)->ops_srvlock == 0) {
2339 attr->cat_kms = last_off;
2345 cl_object_attr_update(env, obj, attr, valid);
2346 cl_object_attr_unlock(obj);
2348 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2351 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2352 osc_inc_unstable_pages(req);
2354 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2355 list_del_init(&ext->oe_link);
2356 osc_extent_finish(env, ext, 1,
2357 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2359 LASSERT(list_empty(&aa->aa_exts));
2360 LASSERT(list_empty(&aa->aa_oaps));
2362 transferred = (req->rq_bulk == NULL ? /* short io */
2363 aa->aa_requested_nob :
2364 req->rq_bulk->bd_nob_transferred);
2366 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2367 ptlrpc_lprocfs_brw(req, transferred);
2369 spin_lock(&cli->cl_loi_list_lock);
2370 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2371 * is called so we know whether to go to sync BRWs or wait for more
2372 * RPCs to complete */
2373 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2374 cli->cl_w_in_flight--;
2376 cli->cl_r_in_flight--;
2377 osc_wake_cache_waiters(cli);
2378 spin_unlock(&cli->cl_loi_list_lock);
2380 osc_io_unplug(env, cli, NULL);
2384 static void brw_commit(struct ptlrpc_request *req)
2386 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2387 * this called via the rq_commit_cb, I need to ensure
2388 * osc_dec_unstable_pages is still called. Otherwise unstable
2389 * pages may be leaked. */
2390 spin_lock(&req->rq_lock);
2391 if (likely(req->rq_unstable)) {
2392 req->rq_unstable = 0;
2393 spin_unlock(&req->rq_lock);
2395 osc_dec_unstable_pages(req);
2397 req->rq_committed = 1;
2398 spin_unlock(&req->rq_lock);
2403 * Build an RPC by the list of extent @ext_list. The caller must ensure
2404 * that the total pages in this list are NOT over max pages per RPC.
2405 * Extents in the list must be in OES_RPC state.
2407 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2408 struct list_head *ext_list, int cmd)
2410 struct ptlrpc_request *req = NULL;
2411 struct osc_extent *ext;
2412 struct brw_page **pga = NULL;
2413 struct osc_brw_async_args *aa = NULL;
2414 struct obdo *oa = NULL;
2415 struct osc_async_page *oap;
2416 struct osc_object *obj = NULL;
2417 struct cl_req_attr *crattr = NULL;
2418 loff_t starting_offset = OBD_OBJECT_EOF;
2419 loff_t ending_offset = 0;
2420 /* '1' for consistency with code that checks !mpflag to restore */
2424 bool soft_sync = false;
2425 bool ndelay = false;
2429 __u32 layout_version = 0;
2430 LIST_HEAD(rpc_list);
2431 struct ost_body *body;
2433 LASSERT(!list_empty(ext_list));
2435 /* add pages into rpc_list to build BRW rpc */
2436 list_for_each_entry(ext, ext_list, oe_link) {
2437 LASSERT(ext->oe_state == OES_RPC);
2438 mem_tight |= ext->oe_memalloc;
2439 grant += ext->oe_grants;
2440 page_count += ext->oe_nr_pages;
2441 layout_version = max(layout_version, ext->oe_layout_version);
2446 soft_sync = osc_over_unstable_soft_limit(cli);
2448 mpflag = memalloc_noreclaim_save();
2450 OBD_ALLOC_PTR_ARRAY(pga, page_count);
2452 GOTO(out, rc = -ENOMEM);
2454 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2456 GOTO(out, rc = -ENOMEM);
2459 list_for_each_entry(ext, ext_list, oe_link) {
2460 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2462 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2464 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2465 pga[i] = &oap->oap_brw_page;
2466 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2469 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2470 if (starting_offset == OBD_OBJECT_EOF ||
2471 starting_offset > oap->oap_obj_off)
2472 starting_offset = oap->oap_obj_off;
2474 LASSERT(oap->oap_page_off == 0);
2475 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2476 ending_offset = oap->oap_obj_off +
2479 LASSERT(oap->oap_page_off + oap->oap_count ==
2486 /* first page in the list */
2487 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2489 crattr = &osc_env_info(env)->oti_req_attr;
2490 memset(crattr, 0, sizeof(*crattr));
2491 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2492 crattr->cra_flags = ~0ULL;
2493 crattr->cra_page = oap2cl_page(oap);
2494 crattr->cra_oa = oa;
2495 cl_req_attr_set(env, osc2cl(obj), crattr);
2497 if (cmd == OBD_BRW_WRITE) {
2498 oa->o_grant_used = grant;
2499 if (layout_version > 0) {
2500 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2501 PFID(&oa->o_oi.oi_fid), layout_version);
2503 oa->o_layout_version = layout_version;
2504 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2508 sort_brw_pages(pga, page_count);
2509 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2511 CERROR("prep_req failed: %d\n", rc);
2515 req->rq_commit_cb = brw_commit;
2516 req->rq_interpret_reply = brw_interpret;
2517 req->rq_memalloc = mem_tight != 0;
2518 oap->oap_request = ptlrpc_request_addref(req);
2520 req->rq_no_resend = req->rq_no_delay = 1;
2521 /* probably set a shorter timeout value.
2522 * to handle ETIMEDOUT in brw_interpret() correctly. */
2523 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2526 /* Need to update the timestamps after the request is built in case
2527 * we race with setattr (locally or in queue at OST). If OST gets
2528 * later setattr before earlier BRW (as determined by the request xid),
2529 * the OST will not use BRW timestamps. Sadly, there is no obvious
2530 * way to do this in a single call. bug 10150 */
2531 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2532 crattr->cra_oa = &body->oa;
2533 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2534 cl_req_attr_set(env, osc2cl(obj), crattr);
2535 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2537 aa = ptlrpc_req_async_args(aa, req);
2538 INIT_LIST_HEAD(&aa->aa_oaps);
2539 list_splice_init(&rpc_list, &aa->aa_oaps);
2540 INIT_LIST_HEAD(&aa->aa_exts);
2541 list_splice_init(ext_list, &aa->aa_exts);
2543 spin_lock(&cli->cl_loi_list_lock);
2544 starting_offset >>= PAGE_SHIFT;
2545 if (cmd == OBD_BRW_READ) {
2546 cli->cl_r_in_flight++;
2547 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2548 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2549 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2550 starting_offset + 1);
2552 cli->cl_w_in_flight++;
2553 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2554 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2555 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2556 starting_offset + 1);
2558 spin_unlock(&cli->cl_loi_list_lock);
2560 DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2561 page_count, aa, cli->cl_r_in_flight,
2562 cli->cl_w_in_flight);
2563 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2565 ptlrpcd_add_req(req);
2571 memalloc_noreclaim_restore(mpflag);
2574 LASSERT(req == NULL);
2577 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2579 osc_release_bounce_pages(pga, page_count);
2580 osc_release_ppga(pga, page_count);
2582 /* this should happen rarely and is pretty bad, it makes the
2583 * pending list not follow the dirty order */
2584 while (!list_empty(ext_list)) {
2585 ext = list_entry(ext_list->next, struct osc_extent,
2587 list_del_init(&ext->oe_link);
2588 osc_extent_finish(env, ext, 0, rc);
2594 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2598 LASSERT(lock != NULL);
2600 lock_res_and_lock(lock);
2602 if (lock->l_ast_data == NULL)
2603 lock->l_ast_data = data;
2604 if (lock->l_ast_data == data)
2607 unlock_res_and_lock(lock);
2612 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2613 void *cookie, struct lustre_handle *lockh,
2614 enum ldlm_mode mode, __u64 *flags, bool speculative,
2617 bool intent = *flags & LDLM_FL_HAS_INTENT;
2621 /* The request was created before ldlm_cli_enqueue call. */
2622 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2623 struct ldlm_reply *rep;
2625 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2626 LASSERT(rep != NULL);
2628 rep->lock_policy_res1 =
2629 ptlrpc_status_ntoh(rep->lock_policy_res1);
2630 if (rep->lock_policy_res1)
2631 errcode = rep->lock_policy_res1;
2633 *flags |= LDLM_FL_LVB_READY;
2634 } else if (errcode == ELDLM_OK) {
2635 *flags |= LDLM_FL_LVB_READY;
2638 /* Call the update callback. */
2639 rc = (*upcall)(cookie, lockh, errcode);
2641 /* release the reference taken in ldlm_cli_enqueue() */
2642 if (errcode == ELDLM_LOCK_MATCHED)
2644 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2645 ldlm_lock_decref(lockh, mode);
2650 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2653 struct osc_enqueue_args *aa = args;
2654 struct ldlm_lock *lock;
2655 struct lustre_handle *lockh = &aa->oa_lockh;
2656 enum ldlm_mode mode = aa->oa_mode;
2657 struct ost_lvb *lvb = aa->oa_lvb;
2658 __u32 lvb_len = sizeof(*lvb);
2663 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2665 lock = ldlm_handle2lock(lockh);
2666 LASSERTF(lock != NULL,
2667 "lockh %#llx, req %p, aa %p - client evicted?\n",
2668 lockh->cookie, req, aa);
2670 /* Take an additional reference so that a blocking AST that
2671 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2672 * to arrive after an upcall has been executed by
2673 * osc_enqueue_fini(). */
2674 ldlm_lock_addref(lockh, mode);
2676 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2677 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2679 /* Let CP AST to grant the lock first. */
2680 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2682 if (aa->oa_speculative) {
2683 LASSERT(aa->oa_lvb == NULL);
2684 LASSERT(aa->oa_flags == NULL);
2685 aa->oa_flags = &flags;
2688 /* Complete obtaining the lock procedure. */
2689 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2690 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2692 /* Complete osc stuff. */
2693 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2694 aa->oa_flags, aa->oa_speculative, rc);
2696 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2698 ldlm_lock_decref(lockh, mode);
2699 LDLM_LOCK_PUT(lock);
2703 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2704 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2705 * other synchronous requests, however keeping some locks and trying to obtain
2706 * others may take a considerable amount of time in a case of ost failure; and
2707 * when other sync requests do not get released lock from a client, the client
2708 * is evicted from the cluster -- such scenarious make the life difficult, so
2709 * release locks just after they are obtained. */
2710 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2711 __u64 *flags, union ldlm_policy_data *policy,
2712 struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2713 void *cookie, struct ldlm_enqueue_info *einfo,
2714 struct ptlrpc_request_set *rqset, int async,
2717 struct obd_device *obd = exp->exp_obd;
2718 struct lustre_handle lockh = { 0 };
2719 struct ptlrpc_request *req = NULL;
2720 int intent = *flags & LDLM_FL_HAS_INTENT;
2721 __u64 match_flags = *flags;
2722 enum ldlm_mode mode;
2726 /* Filesystem lock extents are extended to page boundaries so that
2727 * dealing with the page cache is a little smoother. */
2728 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2729 policy->l_extent.end |= ~PAGE_MASK;
2731 /* Next, search for already existing extent locks that will cover us */
2732 /* If we're trying to read, we also search for an existing PW lock. The
2733 * VFS and page cache already protect us locally, so lots of readers/
2734 * writers can share a single PW lock.
2736 * There are problems with conversion deadlocks, so instead of
2737 * converting a read lock to a write lock, we'll just enqueue a new
2740 * At some point we should cancel the read lock instead of making them
2741 * send us a blocking callback, but there are problems with canceling
2742 * locks out from other users right now, too. */
2743 mode = einfo->ei_mode;
2744 if (einfo->ei_mode == LCK_PR)
2746 /* Normal lock requests must wait for the LVB to be ready before
2747 * matching a lock; speculative lock requests do not need to,
2748 * because they will not actually use the lock. */
2750 match_flags |= LDLM_FL_LVB_READY;
2752 match_flags |= LDLM_FL_BLOCK_GRANTED;
2753 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2754 einfo->ei_type, policy, mode, &lockh);
2756 struct ldlm_lock *matched;
2758 if (*flags & LDLM_FL_TEST_LOCK)
2761 matched = ldlm_handle2lock(&lockh);
2763 /* This DLM lock request is speculative, and does not
2764 * have an associated IO request. Therefore if there
2765 * is already a DLM lock, it wll just inform the
2766 * caller to cancel the request for this stripe.*/
2767 lock_res_and_lock(matched);
2768 if (ldlm_extent_equal(&policy->l_extent,
2769 &matched->l_policy_data.l_extent))
2773 unlock_res_and_lock(matched);
2775 ldlm_lock_decref(&lockh, mode);
2776 LDLM_LOCK_PUT(matched);
2778 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2779 *flags |= LDLM_FL_LVB_READY;
2781 /* We already have a lock, and it's referenced. */
2782 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2784 ldlm_lock_decref(&lockh, mode);
2785 LDLM_LOCK_PUT(matched);
2788 ldlm_lock_decref(&lockh, mode);
2789 LDLM_LOCK_PUT(matched);
2793 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2797 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2798 &RQF_LDLM_ENQUEUE_LVB);
2802 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2804 ptlrpc_request_free(req);
2808 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2810 ptlrpc_request_set_replen(req);
2813 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2814 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2816 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2817 sizeof(*lvb), LVB_T_OST, &lockh, async);
2820 struct osc_enqueue_args *aa;
2821 aa = ptlrpc_req_async_args(aa, req);
2823 aa->oa_mode = einfo->ei_mode;
2824 aa->oa_type = einfo->ei_type;
2825 lustre_handle_copy(&aa->oa_lockh, &lockh);
2826 aa->oa_upcall = upcall;
2827 aa->oa_cookie = cookie;
2828 aa->oa_speculative = speculative;
2830 aa->oa_flags = flags;
2833 /* speculative locks are essentially to enqueue
2834 * a DLM lock in advance, so we don't care
2835 * about the result of the enqueue. */
2837 aa->oa_flags = NULL;
2840 req->rq_interpret_reply = osc_enqueue_interpret;
2841 ptlrpc_set_add_req(rqset, req);
2842 } else if (intent) {
2843 ptlrpc_req_finished(req);
2848 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2849 flags, speculative, rc);
2851 ptlrpc_req_finished(req);
2856 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2857 struct ldlm_res_id *res_id, enum ldlm_type type,
2858 union ldlm_policy_data *policy, enum ldlm_mode mode,
2859 __u64 *flags, struct osc_object *obj,
2860 struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2862 struct obd_device *obd = exp->exp_obd;
2863 __u64 lflags = *flags;
2867 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2870 /* Filesystem lock extents are extended to page boundaries so that
2871 * dealing with the page cache is a little smoother */
2872 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2873 policy->l_extent.end |= ~PAGE_MASK;
2875 /* Next, search for already existing extent locks that will cover us */
2876 /* If we're trying to read, we also search for an existing PW lock. The
2877 * VFS and page cache already protect us locally, so lots of readers/
2878 * writers can share a single PW lock. */
2883 rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2884 res_id, type, policy, rc, lockh,
2886 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2890 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2892 LASSERT(lock != NULL);
2893 if (osc_set_lock_data(lock, obj)) {
2894 lock_res_and_lock(lock);
2895 if (!ldlm_is_lvb_cached(lock)) {
2896 LASSERT(lock->l_ast_data == obj);
2897 osc_lock_lvb_update(env, obj, lock, NULL);
2898 ldlm_set_lvb_cached(lock);
2900 unlock_res_and_lock(lock);
2902 ldlm_lock_decref(lockh, rc);
2905 LDLM_LOCK_PUT(lock);
2910 static int osc_statfs_interpret(const struct lu_env *env,
2911 struct ptlrpc_request *req, void *args, int rc)
2913 struct osc_async_args *aa = args;
2914 struct obd_statfs *msfs;
2919 * The request has in fact never been sent due to issues at
2920 * a higher level (LOV). Exit immediately since the caller
2921 * is aware of the problem and takes care of the clean up.
2925 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2926 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2932 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2934 GOTO(out, rc = -EPROTO);
2936 *aa->aa_oi->oi_osfs = *msfs;
2938 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2943 static int osc_statfs_async(struct obd_export *exp,
2944 struct obd_info *oinfo, time64_t max_age,
2945 struct ptlrpc_request_set *rqset)
2947 struct obd_device *obd = class_exp2obd(exp);
2948 struct ptlrpc_request *req;
2949 struct osc_async_args *aa;
2953 if (obd->obd_osfs_age >= max_age) {
2955 "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2956 obd->obd_name, &obd->obd_osfs,
2957 obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2958 obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2959 spin_lock(&obd->obd_osfs_lock);
2960 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2961 spin_unlock(&obd->obd_osfs_lock);
2962 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2963 if (oinfo->oi_cb_up)
2964 oinfo->oi_cb_up(oinfo, 0);
2969 /* We could possibly pass max_age in the request (as an absolute
2970 * timestamp or a "seconds.usec ago") so the target can avoid doing
2971 * extra calls into the filesystem if that isn't necessary (e.g.
2972 * during mount that would help a bit). Having relative timestamps
2973 * is not so great if request processing is slow, while absolute
2974 * timestamps are not ideal because they need time synchronization. */
2975 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2979 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2981 ptlrpc_request_free(req);
2984 ptlrpc_request_set_replen(req);
2985 req->rq_request_portal = OST_CREATE_PORTAL;
2986 ptlrpc_at_set_req_timeout(req);
2988 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2989 /* procfs requests not want stat in wait for avoid deadlock */
2990 req->rq_no_resend = 1;
2991 req->rq_no_delay = 1;
2994 req->rq_interpret_reply = osc_statfs_interpret;
2995 aa = ptlrpc_req_async_args(aa, req);
2998 ptlrpc_set_add_req(rqset, req);
3002 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3003 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3005 struct obd_device *obd = class_exp2obd(exp);
3006 struct obd_statfs *msfs;
3007 struct ptlrpc_request *req;
3008 struct obd_import *imp = NULL;
3013 /*Since the request might also come from lprocfs, so we need
3014 *sync this with client_disconnect_export Bug15684*/
3015 down_read(&obd->u.cli.cl_sem);
3016 if (obd->u.cli.cl_import)
3017 imp = class_import_get(obd->u.cli.cl_import);
3018 up_read(&obd->u.cli.cl_sem);
3022 /* We could possibly pass max_age in the request (as an absolute
3023 * timestamp or a "seconds.usec ago") so the target can avoid doing
3024 * extra calls into the filesystem if that isn't necessary (e.g.
3025 * during mount that would help a bit). Having relative timestamps
3026 * is not so great if request processing is slow, while absolute
3027 * timestamps are not ideal because they need time synchronization. */
3028 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3030 class_import_put(imp);
3035 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3037 ptlrpc_request_free(req);
3040 ptlrpc_request_set_replen(req);
3041 req->rq_request_portal = OST_CREATE_PORTAL;
3042 ptlrpc_at_set_req_timeout(req);
3044 if (flags & OBD_STATFS_NODELAY) {
3045 /* procfs requests not want stat in wait for avoid deadlock */
3046 req->rq_no_resend = 1;
3047 req->rq_no_delay = 1;
3050 rc = ptlrpc_queue_wait(req);
3054 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3056 GOTO(out, rc = -EPROTO);
3062 ptlrpc_req_finished(req);
3066 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3067 void *karg, void __user *uarg)
3069 struct obd_device *obd = exp->exp_obd;
3070 struct obd_ioctl_data *data = karg;
3074 if (!try_module_get(THIS_MODULE)) {
3075 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3076 module_name(THIS_MODULE));
3080 case OBD_IOC_CLIENT_RECOVER:
3081 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3082 data->ioc_inlbuf1, 0);
3086 case IOC_OSC_SET_ACTIVE:
3087 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3092 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3093 obd->obd_name, cmd, current->comm, rc);
3097 module_put(THIS_MODULE);
3101 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3102 u32 keylen, void *key, u32 vallen, void *val,
3103 struct ptlrpc_request_set *set)
3105 struct ptlrpc_request *req;
3106 struct obd_device *obd = exp->exp_obd;
3107 struct obd_import *imp = class_exp2cliimp(exp);
3112 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3114 if (KEY_IS(KEY_CHECKSUM)) {
3115 if (vallen != sizeof(int))
3117 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3121 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3122 sptlrpc_conf_client_adapt(obd);
3126 if (KEY_IS(KEY_FLUSH_CTX)) {
3127 sptlrpc_import_flush_my_ctx(imp);
3131 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3132 struct client_obd *cli = &obd->u.cli;
3133 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3134 long target = *(long *)val;
3136 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3141 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3144 /* We pass all other commands directly to OST. Since nobody calls osc
3145 methods directly and everybody is supposed to go through LOV, we
3146 assume lov checked invalid values for us.
3147 The only recognised values so far are evict_by_nid and mds_conn.
3148 Even if something bad goes through, we'd get a -EINVAL from OST
3151 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3152 &RQF_OST_SET_GRANT_INFO :
3157 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3158 RCL_CLIENT, keylen);
3159 if (!KEY_IS(KEY_GRANT_SHRINK))
3160 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3161 RCL_CLIENT, vallen);
3162 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3164 ptlrpc_request_free(req);
3168 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3169 memcpy(tmp, key, keylen);
3170 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3173 memcpy(tmp, val, vallen);
3175 if (KEY_IS(KEY_GRANT_SHRINK)) {
3176 struct osc_grant_args *aa;
3179 aa = ptlrpc_req_async_args(aa, req);
3180 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3182 ptlrpc_req_finished(req);
3185 *oa = ((struct ost_body *)val)->oa;
3187 req->rq_interpret_reply = osc_shrink_grant_interpret;
3190 ptlrpc_request_set_replen(req);
3191 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3192 LASSERT(set != NULL);
3193 ptlrpc_set_add_req(set, req);
3194 ptlrpc_check_set(NULL, set);
3196 ptlrpcd_add_req(req);
3201 EXPORT_SYMBOL(osc_set_info_async);
3203 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3204 struct obd_device *obd, struct obd_uuid *cluuid,
3205 struct obd_connect_data *data, void *localdata)
3207 struct client_obd *cli = &obd->u.cli;
3209 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3213 spin_lock(&cli->cl_loi_list_lock);
3214 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3215 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3216 /* restore ocd_grant_blkbits as client page bits */
3217 data->ocd_grant_blkbits = PAGE_SHIFT;
3218 grant += cli->cl_dirty_grant;
3220 grant += cli->cl_dirty_pages << PAGE_SHIFT;
3222 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3223 lost_grant = cli->cl_lost_grant;
3224 cli->cl_lost_grant = 0;
3225 spin_unlock(&cli->cl_loi_list_lock);
3227 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3228 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3229 data->ocd_version, data->ocd_grant, lost_grant);
3234 EXPORT_SYMBOL(osc_reconnect);
3236 int osc_disconnect(struct obd_export *exp)
3238 struct obd_device *obd = class_exp2obd(exp);
3241 rc = client_disconnect_export(exp);
3243 * Initially we put del_shrink_grant before disconnect_export, but it
3244 * causes the following problem if setup (connect) and cleanup
3245 * (disconnect) are tangled together.
3246 * connect p1 disconnect p2
3247 * ptlrpc_connect_import
3248 * ............... class_manual_cleanup
3251 * ptlrpc_connect_interrupt
3253 * add this client to shrink list
3255 * Bang! grant shrink thread trigger the shrink. BUG18662
3257 osc_del_grant_list(&obd->u.cli);
3260 EXPORT_SYMBOL(osc_disconnect);
3262 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3263 struct hlist_node *hnode, void *arg)
3265 struct lu_env *env = arg;
3266 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3267 struct ldlm_lock *lock;
3268 struct osc_object *osc = NULL;
3272 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3273 if (lock->l_ast_data != NULL && osc == NULL) {
3274 osc = lock->l_ast_data;
3275 cl_object_get(osc2cl(osc));
3278 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3279 * by the 2nd round of ldlm_namespace_clean() call in
3280 * osc_import_event(). */
3281 ldlm_clear_cleaned(lock);
3286 osc_object_invalidate(env, osc);
3287 cl_object_put(env, osc2cl(osc));
3292 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3294 static int osc_import_event(struct obd_device *obd,
3295 struct obd_import *imp,
3296 enum obd_import_event event)
3298 struct client_obd *cli;
3302 LASSERT(imp->imp_obd == obd);
3305 case IMP_EVENT_DISCON: {
3307 spin_lock(&cli->cl_loi_list_lock);
3308 cli->cl_avail_grant = 0;
3309 cli->cl_lost_grant = 0;
3310 spin_unlock(&cli->cl_loi_list_lock);
3313 case IMP_EVENT_INACTIVE: {
3314 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3317 case IMP_EVENT_INVALIDATE: {
3318 struct ldlm_namespace *ns = obd->obd_namespace;
3322 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3324 env = cl_env_get(&refcheck);
3326 osc_io_unplug(env, &obd->u.cli, NULL);
3328 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3329 osc_ldlm_resource_invalidate,
3331 cl_env_put(env, &refcheck);
3333 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3338 case IMP_EVENT_ACTIVE: {
3339 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3342 case IMP_EVENT_OCD: {
3343 struct obd_connect_data *ocd = &imp->imp_connect_data;
3345 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3346 osc_init_grant(&obd->u.cli, ocd);
3349 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3350 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3352 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3355 case IMP_EVENT_DEACTIVATE: {
3356 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3359 case IMP_EVENT_ACTIVATE: {
3360 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3364 CERROR("Unknown import event %d\n", event);
3371 * Determine whether the lock can be canceled before replaying the lock
3372 * during recovery, see bug16774 for detailed information.
3374 * \retval zero the lock can't be canceled
3375 * \retval other ok to cancel
3377 static int osc_cancel_weight(struct ldlm_lock *lock)
3380 * Cancel all unused and granted extent lock.
3382 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3383 ldlm_is_granted(lock) &&
3384 osc_ldlm_weigh_ast(lock) == 0)
3390 static int brw_queue_work(const struct lu_env *env, void *data)
3392 struct client_obd *cli = data;
3394 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3396 osc_io_unplug(env, cli, NULL);
3400 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3402 struct client_obd *cli = &obd->u.cli;
3408 rc = ptlrpcd_addref();
3412 rc = client_obd_setup(obd, lcfg);
3414 GOTO(out_ptlrpcd, rc);
3417 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3418 if (IS_ERR(handler))
3419 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3420 cli->cl_writeback_work = handler;
3422 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3423 if (IS_ERR(handler))
3424 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3425 cli->cl_lru_work = handler;
3427 rc = osc_quota_setup(obd);
3429 GOTO(out_ptlrpcd_work, rc);
3431 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3432 osc_update_next_shrink(cli);
3437 if (cli->cl_writeback_work != NULL) {
3438 ptlrpcd_destroy_work(cli->cl_writeback_work);
3439 cli->cl_writeback_work = NULL;
3441 if (cli->cl_lru_work != NULL) {
3442 ptlrpcd_destroy_work(cli->cl_lru_work);
3443 cli->cl_lru_work = NULL;
3445 client_obd_cleanup(obd);
3450 EXPORT_SYMBOL(osc_setup_common);
3452 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3454 struct client_obd *cli = &obd->u.cli;
3462 rc = osc_setup_common(obd, lcfg);
3466 rc = osc_tunables_init(obd);
3471 * We try to control the total number of requests with a upper limit
3472 * osc_reqpool_maxreqcount. There might be some race which will cause
3473 * over-limit allocation, but it is fine.
3475 req_count = atomic_read(&osc_pool_req_count);
3476 if (req_count < osc_reqpool_maxreqcount) {
3477 adding = cli->cl_max_rpcs_in_flight + 2;
3478 if (req_count + adding > osc_reqpool_maxreqcount)
3479 adding = osc_reqpool_maxreqcount - req_count;
3481 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3482 atomic_add(added, &osc_pool_req_count);
3485 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3487 spin_lock(&osc_shrink_lock);
3488 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3489 spin_unlock(&osc_shrink_lock);
3490 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3491 cli->cl_import->imp_idle_debug = D_HA;
3496 int osc_precleanup_common(struct obd_device *obd)
3498 struct client_obd *cli = &obd->u.cli;
3502 * for echo client, export may be on zombie list, wait for
3503 * zombie thread to cull it, because cli.cl_import will be
3504 * cleared in client_disconnect_export():
3505 * class_export_destroy() -> obd_cleanup() ->
3506 * echo_device_free() -> echo_client_cleanup() ->
3507 * obd_disconnect() -> osc_disconnect() ->
3508 * client_disconnect_export()
3510 obd_zombie_barrier();
3511 if (cli->cl_writeback_work) {
3512 ptlrpcd_destroy_work(cli->cl_writeback_work);
3513 cli->cl_writeback_work = NULL;
3516 if (cli->cl_lru_work) {
3517 ptlrpcd_destroy_work(cli->cl_lru_work);
3518 cli->cl_lru_work = NULL;
3521 obd_cleanup_client_import(obd);
3524 EXPORT_SYMBOL(osc_precleanup_common);
3526 static int osc_precleanup(struct obd_device *obd)
3530 osc_precleanup_common(obd);
3532 ptlrpc_lprocfs_unregister_obd(obd);
3536 int osc_cleanup_common(struct obd_device *obd)
3538 struct client_obd *cli = &obd->u.cli;
3543 spin_lock(&osc_shrink_lock);
3544 list_del(&cli->cl_shrink_list);
3545 spin_unlock(&osc_shrink_lock);
3548 if (cli->cl_cache != NULL) {
3549 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3550 spin_lock(&cli->cl_cache->ccc_lru_lock);
3551 list_del_init(&cli->cl_lru_osc);
3552 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3553 cli->cl_lru_left = NULL;
3554 cl_cache_decref(cli->cl_cache);
3555 cli->cl_cache = NULL;
3558 /* free memory of osc quota cache */
3559 osc_quota_cleanup(obd);
3561 rc = client_obd_cleanup(obd);
3566 EXPORT_SYMBOL(osc_cleanup_common);
3568 static const struct obd_ops osc_obd_ops = {
3569 .o_owner = THIS_MODULE,
3570 .o_setup = osc_setup,
3571 .o_precleanup = osc_precleanup,
3572 .o_cleanup = osc_cleanup_common,
3573 .o_add_conn = client_import_add_conn,
3574 .o_del_conn = client_import_del_conn,
3575 .o_connect = client_connect_import,
3576 .o_reconnect = osc_reconnect,
3577 .o_disconnect = osc_disconnect,
3578 .o_statfs = osc_statfs,
3579 .o_statfs_async = osc_statfs_async,
3580 .o_create = osc_create,
3581 .o_destroy = osc_destroy,
3582 .o_getattr = osc_getattr,
3583 .o_setattr = osc_setattr,
3584 .o_iocontrol = osc_iocontrol,
3585 .o_set_info_async = osc_set_info_async,
3586 .o_import_event = osc_import_event,
3587 .o_quotactl = osc_quotactl,
3590 static struct shrinker *osc_cache_shrinker;
3591 LIST_HEAD(osc_shrink_list);
3592 DEFINE_SPINLOCK(osc_shrink_lock);
3594 #ifndef HAVE_SHRINKER_COUNT
3595 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3597 struct shrink_control scv = {
3598 .nr_to_scan = shrink_param(sc, nr_to_scan),
3599 .gfp_mask = shrink_param(sc, gfp_mask)
3601 (void)osc_cache_shrink_scan(shrinker, &scv);
3603 return osc_cache_shrink_count(shrinker, &scv);
3607 static int __init osc_init(void)
3609 unsigned int reqpool_size;
3610 unsigned int reqsize;
3612 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3613 osc_cache_shrink_count, osc_cache_shrink_scan);
3616 /* print an address of _any_ initialized kernel symbol from this
3617 * module, to allow debugging with gdb that doesn't support data
3618 * symbols from modules.*/
3619 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3621 rc = lu_kmem_init(osc_caches);
3625 rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3626 LUSTRE_OSC_NAME, &osc_device_type);
3630 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3632 /* This is obviously too much memory, only prevent overflow here */
3633 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3634 GOTO(out_type, rc = -EINVAL);
3636 reqpool_size = osc_reqpool_mem_max << 20;
3639 while (reqsize < OST_IO_MAXREQSIZE)
3640 reqsize = reqsize << 1;
3643 * We don't enlarge the request count in OSC pool according to
3644 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3645 * tried after normal allocation failed. So a small OSC pool won't
3646 * cause much performance degression in most of cases.
3648 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3650 atomic_set(&osc_pool_req_count, 0);
3651 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3652 ptlrpc_add_rqs_to_pool);
3654 if (osc_rq_pool == NULL)
3655 GOTO(out_type, rc = -ENOMEM);
3657 rc = osc_start_grant_work();
3659 GOTO(out_req_pool, rc);
3664 ptlrpc_free_rq_pool(osc_rq_pool);
3666 class_unregister_type(LUSTRE_OSC_NAME);
3668 lu_kmem_fini(osc_caches);
3673 static void __exit osc_exit(void)
3675 osc_stop_grant_work();
3676 remove_shrinker(osc_cache_shrinker);
3677 class_unregister_type(LUSTRE_OSC_NAME);
3678 lu_kmem_fini(osc_caches);
3679 ptlrpc_free_rq_pool(osc_rq_pool);
3682 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3683 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3684 MODULE_VERSION(LUSTRE_VERSION_STRING);
3685 MODULE_LICENSE("GPL");
3687 module_init(osc_init);
3688 module_exit(osc_exit);