4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
32 #define DEBUG_SUBSYSTEM S_OSC
34 #include <linux/workqueue.h>
35 #include <libcfs/libcfs.h>
36 #include <linux/falloc.h>
37 #include <lprocfs_status.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48 #include <linux/falloc.h>
50 #include "osc_internal.h"
52 atomic_t osc_pool_req_count;
53 unsigned int osc_reqpool_maxreqcount;
54 struct ptlrpc_request_pool *osc_rq_pool;
56 /* max memory used for request pool, unit is MB */
57 static unsigned int osc_reqpool_mem_max = 5;
58 module_param(osc_reqpool_mem_max, uint, 0444);
60 static int osc_idle_timeout = 20;
61 module_param(osc_idle_timeout, uint, 0644);
63 #define osc_grant_args osc_brw_async_args
65 struct osc_setattr_args {
67 obd_enqueue_update_f sa_upcall;
71 struct osc_fsync_args {
72 struct osc_object *fa_obj;
74 obd_enqueue_update_f fa_upcall;
78 struct osc_ladvise_args {
80 obd_enqueue_update_f la_upcall;
84 static void osc_release_ppga(struct brw_page **ppga, size_t count);
85 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
88 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
90 struct ost_body *body;
92 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
95 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
98 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
101 struct ptlrpc_request *req;
102 struct ost_body *body;
106 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
110 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
112 ptlrpc_request_free(req);
116 osc_pack_req_body(req, oa);
118 ptlrpc_request_set_replen(req);
120 rc = ptlrpc_queue_wait(req);
124 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
126 GOTO(out, rc = -EPROTO);
128 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
129 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
131 oa->o_blksize = cli_brw_size(exp->exp_obd);
132 oa->o_valid |= OBD_MD_FLBLKSZ;
136 ptlrpc_req_finished(req);
141 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
144 struct ptlrpc_request *req;
145 struct ost_body *body;
149 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
151 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
155 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
157 ptlrpc_request_free(req);
161 osc_pack_req_body(req, oa);
163 ptlrpc_request_set_replen(req);
165 rc = ptlrpc_queue_wait(req);
169 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
171 GOTO(out, rc = -EPROTO);
173 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
177 ptlrpc_req_finished(req);
182 static int osc_setattr_interpret(const struct lu_env *env,
183 struct ptlrpc_request *req, void *args, int rc)
185 struct osc_setattr_args *sa = args;
186 struct ost_body *body;
193 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
195 GOTO(out, rc = -EPROTO);
197 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
200 rc = sa->sa_upcall(sa->sa_cookie, rc);
204 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
205 obd_enqueue_update_f upcall, void *cookie,
206 struct ptlrpc_request_set *rqset)
208 struct ptlrpc_request *req;
209 struct osc_setattr_args *sa;
214 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
218 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
220 ptlrpc_request_free(req);
224 osc_pack_req_body(req, oa);
226 ptlrpc_request_set_replen(req);
228 /* do mds to ost setattr asynchronously */
230 /* Do not wait for response. */
231 ptlrpcd_add_req(req);
233 req->rq_interpret_reply = osc_setattr_interpret;
235 sa = ptlrpc_req_async_args(sa, req);
237 sa->sa_upcall = upcall;
238 sa->sa_cookie = cookie;
240 ptlrpc_set_add_req(rqset, req);
246 static int osc_ladvise_interpret(const struct lu_env *env,
247 struct ptlrpc_request *req,
250 struct osc_ladvise_args *la = arg;
251 struct ost_body *body;
257 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
259 GOTO(out, rc = -EPROTO);
261 *la->la_oa = body->oa;
263 rc = la->la_upcall(la->la_cookie, rc);
268 * If rqset is NULL, do not wait for response. Upcall and cookie could also
269 * be NULL in this case
271 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
272 struct ladvise_hdr *ladvise_hdr,
273 obd_enqueue_update_f upcall, void *cookie,
274 struct ptlrpc_request_set *rqset)
276 struct ptlrpc_request *req;
277 struct ost_body *body;
278 struct osc_ladvise_args *la;
280 struct lu_ladvise *req_ladvise;
281 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
282 int num_advise = ladvise_hdr->lah_count;
283 struct ladvise_hdr *req_ladvise_hdr;
286 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
290 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
291 num_advise * sizeof(*ladvise));
292 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
294 ptlrpc_request_free(req);
297 req->rq_request_portal = OST_IO_PORTAL;
298 ptlrpc_at_set_req_timeout(req);
300 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
302 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
305 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
306 &RMF_OST_LADVISE_HDR);
307 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
309 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
310 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
311 ptlrpc_request_set_replen(req);
314 /* Do not wait for response. */
315 ptlrpcd_add_req(req);
319 req->rq_interpret_reply = osc_ladvise_interpret;
320 la = ptlrpc_req_async_args(la, req);
322 la->la_upcall = upcall;
323 la->la_cookie = cookie;
325 ptlrpc_set_add_req(rqset, req);
330 static int osc_create(const struct lu_env *env, struct obd_export *exp,
333 struct ptlrpc_request *req;
334 struct ost_body *body;
339 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
340 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
342 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
344 GOTO(out, rc = -ENOMEM);
346 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
348 ptlrpc_request_free(req);
352 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
355 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
357 ptlrpc_request_set_replen(req);
359 rc = ptlrpc_queue_wait(req);
363 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365 GOTO(out_req, rc = -EPROTO);
367 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
368 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
370 oa->o_blksize = cli_brw_size(exp->exp_obd);
371 oa->o_valid |= OBD_MD_FLBLKSZ;
373 CDEBUG(D_HA, "transno: %lld\n",
374 lustre_msg_get_transno(req->rq_repmsg));
376 ptlrpc_req_finished(req);
381 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
382 obd_enqueue_update_f upcall, void *cookie)
384 struct ptlrpc_request *req;
385 struct osc_setattr_args *sa;
386 struct obd_import *imp = class_exp2cliimp(exp);
387 struct ost_body *body;
392 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
396 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
398 ptlrpc_request_free(req);
402 osc_set_io_portal(req);
404 ptlrpc_at_set_req_timeout(req);
406 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
408 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
410 ptlrpc_request_set_replen(req);
412 req->rq_interpret_reply = osc_setattr_interpret;
413 sa = ptlrpc_req_async_args(sa, req);
415 sa->sa_upcall = upcall;
416 sa->sa_cookie = cookie;
418 ptlrpcd_add_req(req);
422 EXPORT_SYMBOL(osc_punch_send);
425 * osc_fallocate_base() - Handles fallocate request.
427 * @exp: Export structure
428 * @oa: Attributes passed to OSS from client (obdo structure)
429 * @upcall: Primary & supplementary group information
430 * @cookie: Exclusive identifier
431 * @rqset: Request list.
432 * @mode: Operation done on given range.
434 * osc_fallocate_base() - Handles fallocate requests only. Only block
435 * allocation or standard preallocate operation is supported currently.
436 * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
437 * is supported via SETATTR request.
439 * Return: Non-zero on failure and O on success.
441 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
442 obd_enqueue_update_f upcall, void *cookie, int mode)
444 struct ptlrpc_request *req;
445 struct osc_setattr_args *sa;
446 struct ost_body *body;
447 struct obd_import *imp = class_exp2cliimp(exp);
451 oa->o_falloc_mode = mode;
452 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
457 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
459 ptlrpc_request_free(req);
463 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
466 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
468 ptlrpc_request_set_replen(req);
470 req->rq_interpret_reply = osc_setattr_interpret;
471 BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
472 sa = ptlrpc_req_async_args(sa, req);
474 sa->sa_upcall = upcall;
475 sa->sa_cookie = cookie;
477 ptlrpcd_add_req(req);
481 EXPORT_SYMBOL(osc_fallocate_base);
483 static int osc_sync_interpret(const struct lu_env *env,
484 struct ptlrpc_request *req, void *args, int rc)
486 struct osc_fsync_args *fa = args;
487 struct ost_body *body;
488 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
489 unsigned long valid = 0;
490 struct cl_object *obj;
496 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
498 CERROR("can't unpack ost_body\n");
499 GOTO(out, rc = -EPROTO);
502 *fa->fa_oa = body->oa;
503 obj = osc2cl(fa->fa_obj);
505 /* Update osc object's blocks attribute */
506 cl_object_attr_lock(obj);
507 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
508 attr->cat_blocks = body->oa.o_blocks;
513 cl_object_attr_update(env, obj, attr, valid);
514 cl_object_attr_unlock(obj);
517 rc = fa->fa_upcall(fa->fa_cookie, rc);
521 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
522 obd_enqueue_update_f upcall, void *cookie,
523 struct ptlrpc_request_set *rqset)
525 struct obd_export *exp = osc_export(obj);
526 struct ptlrpc_request *req;
527 struct ost_body *body;
528 struct osc_fsync_args *fa;
532 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
536 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
538 ptlrpc_request_free(req);
542 /* overload the size and blocks fields in the oa with start/end */
543 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
545 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
547 ptlrpc_request_set_replen(req);
548 req->rq_interpret_reply = osc_sync_interpret;
550 fa = ptlrpc_req_async_args(fa, req);
553 fa->fa_upcall = upcall;
554 fa->fa_cookie = cookie;
556 ptlrpc_set_add_req(rqset, req);
561 /* Find and cancel locally locks matched by @mode in the resource found by
562 * @objid. Found locks are added into @cancel list. Returns the amount of
563 * locks added to @cancels list. */
564 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
565 struct list_head *cancels,
566 enum ldlm_mode mode, __u64 lock_flags)
568 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
569 struct ldlm_res_id res_id;
570 struct ldlm_resource *res;
574 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
575 * export) but disabled through procfs (flag in NS).
577 * This distinguishes from a case when ELC is not supported originally,
578 * when we still want to cancel locks in advance and just cancel them
579 * locally, without sending any RPC. */
580 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
583 ostid_build_res_name(&oa->o_oi, &res_id);
584 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
588 LDLM_RESOURCE_ADDREF(res);
589 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
590 lock_flags, 0, NULL);
591 LDLM_RESOURCE_DELREF(res);
592 ldlm_resource_putref(res);
596 static int osc_destroy_interpret(const struct lu_env *env,
597 struct ptlrpc_request *req, void *args, int rc)
599 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
601 atomic_dec(&cli->cl_destroy_in_flight);
602 wake_up(&cli->cl_destroy_waitq);
607 static int osc_can_send_destroy(struct client_obd *cli)
609 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
610 cli->cl_max_rpcs_in_flight) {
611 /* The destroy request can be sent */
614 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
615 cli->cl_max_rpcs_in_flight) {
617 * The counter has been modified between the two atomic
620 wake_up(&cli->cl_destroy_waitq);
625 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
628 struct client_obd *cli = &exp->exp_obd->u.cli;
629 struct ptlrpc_request *req;
630 struct ost_body *body;
636 CDEBUG(D_INFO, "oa NULL\n");
640 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
641 LDLM_FL_DISCARD_DATA);
643 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
645 ldlm_lock_list_put(&cancels, l_bl_ast, count);
649 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
652 ptlrpc_request_free(req);
656 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
657 ptlrpc_at_set_req_timeout(req);
659 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
661 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
663 ptlrpc_request_set_replen(req);
665 req->rq_interpret_reply = osc_destroy_interpret;
666 if (!osc_can_send_destroy(cli)) {
668 * Wait until the number of on-going destroy RPCs drops
669 * under max_rpc_in_flight
671 rc = l_wait_event_abortable_exclusive(
672 cli->cl_destroy_waitq,
673 osc_can_send_destroy(cli));
675 ptlrpc_req_finished(req);
680 /* Do not wait for response */
681 ptlrpcd_add_req(req);
685 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
688 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
690 LASSERT(!(oa->o_valid & bits));
693 spin_lock(&cli->cl_loi_list_lock);
694 if (cli->cl_ocd_grant_param)
695 oa->o_dirty = cli->cl_dirty_grant;
697 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
698 if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
699 CERROR("dirty %lu > dirty_max %lu\n",
701 cli->cl_dirty_max_pages);
703 } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
704 (long)(obd_max_dirty_pages + 1))) {
705 /* The atomic_read() allowing the atomic_inc() are
706 * not covered by a lock thus they may safely race and trip
707 * this CERROR() unless we add in a small fudge factor (+1). */
708 CERROR("%s: dirty %ld > system dirty_max %ld\n",
709 cli_name(cli), atomic_long_read(&obd_dirty_pages),
710 obd_max_dirty_pages);
712 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
714 CERROR("dirty %lu - dirty_max %lu too big???\n",
715 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
718 unsigned long nrpages;
719 unsigned long undirty;
721 nrpages = cli->cl_max_pages_per_rpc;
722 nrpages *= cli->cl_max_rpcs_in_flight + 1;
723 nrpages = max(nrpages, cli->cl_dirty_max_pages);
724 undirty = nrpages << PAGE_SHIFT;
725 if (cli->cl_ocd_grant_param) {
728 /* take extent tax into account when asking for more
730 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
731 cli->cl_max_extent_pages;
732 undirty += nrextents * cli->cl_grant_extent_tax;
734 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
735 * to add extent tax, etc.
737 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
738 ~(PTLRPC_MAX_BRW_SIZE * 4UL));
740 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
741 /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
742 if (cli->cl_lost_grant > INT_MAX) {
744 "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
745 cli_name(cli), cli->cl_lost_grant);
746 oa->o_dropped = INT_MAX;
748 oa->o_dropped = cli->cl_lost_grant;
750 cli->cl_lost_grant -= oa->o_dropped;
751 spin_unlock(&cli->cl_loi_list_lock);
752 CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
753 " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
754 oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
757 void osc_update_next_shrink(struct client_obd *cli)
759 cli->cl_next_shrink_grant = ktime_get_seconds() +
760 cli->cl_grant_shrink_interval;
762 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
763 cli->cl_next_shrink_grant);
766 static void __osc_update_grant(struct client_obd *cli, u64 grant)
768 spin_lock(&cli->cl_loi_list_lock);
769 cli->cl_avail_grant += grant;
770 spin_unlock(&cli->cl_loi_list_lock);
773 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
775 if (body->oa.o_valid & OBD_MD_FLGRANT) {
776 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
777 __osc_update_grant(cli, body->oa.o_grant);
782 * grant thread data for shrinking space.
784 struct grant_thread_data {
785 struct list_head gtd_clients;
786 struct mutex gtd_mutex;
787 unsigned long gtd_stopped:1;
789 static struct grant_thread_data client_gtd;
791 static int osc_shrink_grant_interpret(const struct lu_env *env,
792 struct ptlrpc_request *req,
795 struct osc_grant_args *aa = args;
796 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
797 struct ost_body *body;
800 __osc_update_grant(cli, aa->aa_oa->o_grant);
804 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
806 osc_update_grant(cli, body);
808 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
814 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
816 spin_lock(&cli->cl_loi_list_lock);
817 oa->o_grant = cli->cl_avail_grant / 4;
818 cli->cl_avail_grant -= oa->o_grant;
819 spin_unlock(&cli->cl_loi_list_lock);
820 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
821 oa->o_valid |= OBD_MD_FLFLAGS;
824 oa->o_flags |= OBD_FL_SHRINK_GRANT;
825 osc_update_next_shrink(cli);
828 /* Shrink the current grant, either from some large amount to enough for a
829 * full set of in-flight RPCs, or if we have already shrunk to that limit
830 * then to enough for a single RPC. This avoids keeping more grant than
831 * needed, and avoids shrinking the grant piecemeal. */
832 static int osc_shrink_grant(struct client_obd *cli)
834 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
835 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
837 spin_lock(&cli->cl_loi_list_lock);
838 if (cli->cl_avail_grant <= target_bytes)
839 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
840 spin_unlock(&cli->cl_loi_list_lock);
842 return osc_shrink_grant_to_target(cli, target_bytes);
845 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
848 struct ost_body *body;
851 spin_lock(&cli->cl_loi_list_lock);
852 /* Don't shrink if we are already above or below the desired limit
853 * We don't want to shrink below a single RPC, as that will negatively
854 * impact block allocation and long-term performance. */
855 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
856 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
858 if (target_bytes >= cli->cl_avail_grant) {
859 spin_unlock(&cli->cl_loi_list_lock);
862 spin_unlock(&cli->cl_loi_list_lock);
868 osc_announce_cached(cli, &body->oa, 0);
870 spin_lock(&cli->cl_loi_list_lock);
871 if (target_bytes >= cli->cl_avail_grant) {
872 /* available grant has changed since target calculation */
873 spin_unlock(&cli->cl_loi_list_lock);
874 GOTO(out_free, rc = 0);
876 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
877 cli->cl_avail_grant = target_bytes;
878 spin_unlock(&cli->cl_loi_list_lock);
879 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
880 body->oa.o_valid |= OBD_MD_FLFLAGS;
881 body->oa.o_flags = 0;
883 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
884 osc_update_next_shrink(cli);
886 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
887 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
888 sizeof(*body), body, NULL);
890 __osc_update_grant(cli, body->oa.o_grant);
896 static int osc_should_shrink_grant(struct client_obd *client)
898 time64_t next_shrink = client->cl_next_shrink_grant;
900 if (client->cl_import == NULL)
903 if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
904 client->cl_import->imp_grant_shrink_disabled) {
905 osc_update_next_shrink(client);
909 if (ktime_get_seconds() >= next_shrink - 5) {
910 /* Get the current RPC size directly, instead of going via:
911 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
912 * Keep comment here so that it can be found by searching. */
913 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
915 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
916 client->cl_avail_grant > brw_size)
919 osc_update_next_shrink(client);
924 #define GRANT_SHRINK_RPC_BATCH 100
926 static struct delayed_work work;
928 static void osc_grant_work_handler(struct work_struct *data)
930 struct client_obd *cli;
932 bool init_next_shrink = true;
933 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
936 mutex_lock(&client_gtd.gtd_mutex);
937 list_for_each_entry(cli, &client_gtd.gtd_clients,
939 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
940 osc_should_shrink_grant(cli)) {
941 osc_shrink_grant(cli);
945 if (!init_next_shrink) {
946 if (cli->cl_next_shrink_grant < next_shrink &&
947 cli->cl_next_shrink_grant > ktime_get_seconds())
948 next_shrink = cli->cl_next_shrink_grant;
950 init_next_shrink = false;
951 next_shrink = cli->cl_next_shrink_grant;
954 mutex_unlock(&client_gtd.gtd_mutex);
956 if (client_gtd.gtd_stopped == 1)
959 if (next_shrink > ktime_get_seconds()) {
960 time64_t delay = next_shrink - ktime_get_seconds();
962 schedule_delayed_work(&work, cfs_time_seconds(delay));
964 schedule_work(&work.work);
968 void osc_schedule_grant_work(void)
970 cancel_delayed_work_sync(&work);
971 schedule_work(&work.work);
975 * Start grant thread for returing grant to server for idle clients.
977 static int osc_start_grant_work(void)
979 client_gtd.gtd_stopped = 0;
980 mutex_init(&client_gtd.gtd_mutex);
981 INIT_LIST_HEAD(&client_gtd.gtd_clients);
983 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
984 schedule_work(&work.work);
989 static void osc_stop_grant_work(void)
991 client_gtd.gtd_stopped = 1;
992 cancel_delayed_work_sync(&work);
995 static void osc_add_grant_list(struct client_obd *client)
997 mutex_lock(&client_gtd.gtd_mutex);
998 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
999 mutex_unlock(&client_gtd.gtd_mutex);
1002 static void osc_del_grant_list(struct client_obd *client)
1004 if (list_empty(&client->cl_grant_chain))
1007 mutex_lock(&client_gtd.gtd_mutex);
1008 list_del_init(&client->cl_grant_chain);
1009 mutex_unlock(&client_gtd.gtd_mutex);
1012 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1015 * ocd_grant is the total grant amount we're expect to hold: if we've
1016 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1017 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1020 * race is tolerable here: if we're evicted, but imp_state already
1021 * left EVICTED state, then cl_dirty_pages must be 0 already.
1023 spin_lock(&cli->cl_loi_list_lock);
1024 cli->cl_avail_grant = ocd->ocd_grant;
1025 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1026 unsigned long consumed = cli->cl_reserved_grant;
1028 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1029 consumed += cli->cl_dirty_grant;
1031 consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1032 if (cli->cl_avail_grant < consumed) {
1033 CERROR("%s: granted %ld but already consumed %ld\n",
1034 cli_name(cli), cli->cl_avail_grant, consumed);
1035 cli->cl_avail_grant = 0;
1037 cli->cl_avail_grant -= consumed;
1041 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1045 /* overhead for each extent insertion */
1046 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1047 /* determine the appropriate chunk size used by osc_extent. */
1048 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1049 ocd->ocd_grant_blkbits);
1050 /* max_pages_per_rpc must be chunk aligned */
1051 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1052 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1053 ~chunk_mask) & chunk_mask;
1054 /* determine maximum extent size, in #pages */
1055 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1056 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1057 cli->cl_ocd_grant_param = 1;
1059 cli->cl_ocd_grant_param = 0;
1060 cli->cl_grant_extent_tax = 0;
1061 cli->cl_chunkbits = PAGE_SHIFT;
1062 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1064 spin_unlock(&cli->cl_loi_list_lock);
1067 "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1069 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1070 cli->cl_max_extent_pages);
1072 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1073 osc_add_grant_list(cli);
1075 EXPORT_SYMBOL(osc_init_grant);
1077 /* We assume that the reason this OSC got a short read is because it read
1078 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1079 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1080 * this stripe never got written at or beyond this stripe offset yet. */
1081 static void handle_short_read(int nob_read, size_t page_count,
1082 struct brw_page **pga)
1087 /* skip bytes read OK */
1088 while (nob_read > 0) {
1089 LASSERT (page_count > 0);
1091 if (pga[i]->count > nob_read) {
1092 /* EOF inside this page */
1093 ptr = kmap(pga[i]->pg) +
1094 (pga[i]->off & ~PAGE_MASK);
1095 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1102 nob_read -= pga[i]->count;
1107 /* zero remaining pages */
1108 while (page_count-- > 0) {
1109 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1110 memset(ptr, 0, pga[i]->count);
1116 static int check_write_rcs(struct ptlrpc_request *req,
1117 int requested_nob, int niocount,
1118 size_t page_count, struct brw_page **pga)
1123 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1124 sizeof(*remote_rcs) *
1126 if (remote_rcs == NULL) {
1127 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1131 /* return error if any niobuf was in error */
1132 for (i = 0; i < niocount; i++) {
1133 if ((int)remote_rcs[i] < 0) {
1134 CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1135 i, remote_rcs[i], req);
1136 return remote_rcs[i];
1139 if (remote_rcs[i] != 0) {
1140 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1141 i, remote_rcs[i], req);
1145 if (req->rq_bulk != NULL &&
1146 req->rq_bulk->bd_nob_transferred != requested_nob) {
1147 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1148 req->rq_bulk->bd_nob_transferred, requested_nob);
1155 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1157 if (p1->flag != p2->flag) {
1158 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1159 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1160 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1162 /* warn if we try to combine flags that we don't know to be
1163 * safe to combine */
1164 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1165 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1166 "report this at https://jira.whamcloud.com/\n",
1167 p1->flag, p2->flag);
1172 return (p1->off + p1->count == p2->off);
1175 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1176 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1177 size_t pg_count, struct brw_page **pga,
1178 int opc, obd_dif_csum_fn *fn,
1182 struct ahash_request *req;
1183 /* Used Adler as the default checksum type on top of DIF tags */
1184 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1185 struct page *__page;
1186 unsigned char *buffer;
1188 unsigned int bufsize;
1190 int used_number = 0;
1196 LASSERT(pg_count > 0);
1198 __page = alloc_page(GFP_KERNEL);
1202 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1205 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1206 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1210 buffer = kmap(__page);
1211 guard_start = (__u16 *)buffer;
1212 guard_number = PAGE_SIZE / sizeof(*guard_start);
1213 while (nob > 0 && pg_count > 0) {
1214 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1216 /* corrupt the data before we compute the checksum, to
1217 * simulate an OST->client data error */
1218 if (unlikely(i == 0 && opc == OST_READ &&
1219 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1220 unsigned char *ptr = kmap(pga[i]->pg);
1221 int off = pga[i]->off & ~PAGE_MASK;
1223 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1228 * The left guard number should be able to hold checksums of a
1231 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1232 pga[i]->off & ~PAGE_MASK,
1234 guard_start + used_number,
1235 guard_number - used_number,
1241 used_number += used;
1242 if (used_number == guard_number) {
1243 cfs_crypto_hash_update_page(req, __page, 0,
1244 used_number * sizeof(*guard_start));
1248 nob -= pga[i]->count;
1256 if (used_number != 0)
1257 cfs_crypto_hash_update_page(req, __page, 0,
1258 used_number * sizeof(*guard_start));
1260 bufsize = sizeof(cksum);
1261 cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1263 /* For sending we only compute the wrong checksum instead
1264 * of corrupting the data so it is still correct on a redo */
1265 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1270 __free_page(__page);
1273 #else /* !CONFIG_CRC_T10DIF */
1274 #define obd_dif_ip_fn NULL
1275 #define obd_dif_crc_fn NULL
1276 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \
1278 #endif /* CONFIG_CRC_T10DIF */
1280 static int osc_checksum_bulk(int nob, size_t pg_count,
1281 struct brw_page **pga, int opc,
1282 enum cksum_types cksum_type,
1286 struct ahash_request *req;
1287 unsigned int bufsize;
1288 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1290 LASSERT(pg_count > 0);
1292 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1294 CERROR("Unable to initialize checksum hash %s\n",
1295 cfs_crypto_hash_name(cfs_alg));
1296 return PTR_ERR(req);
1299 while (nob > 0 && pg_count > 0) {
1300 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1302 /* corrupt the data before we compute the checksum, to
1303 * simulate an OST->client data error */
1304 if (i == 0 && opc == OST_READ &&
1305 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1306 unsigned char *ptr = kmap(pga[i]->pg);
1307 int off = pga[i]->off & ~PAGE_MASK;
1309 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1312 cfs_crypto_hash_update_page(req, pga[i]->pg,
1313 pga[i]->off & ~PAGE_MASK,
1315 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1316 (int)(pga[i]->off & ~PAGE_MASK));
1318 nob -= pga[i]->count;
1323 bufsize = sizeof(*cksum);
1324 cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1326 /* For sending we only compute the wrong checksum instead
1327 * of corrupting the data so it is still correct on a redo */
1328 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1334 static int osc_checksum_bulk_rw(const char *obd_name,
1335 enum cksum_types cksum_type,
1336 int nob, size_t pg_count,
1337 struct brw_page **pga, int opc,
1340 obd_dif_csum_fn *fn = NULL;
1341 int sector_size = 0;
1345 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1348 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1349 opc, fn, sector_size, check_sum);
1351 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1357 static inline void osc_release_bounce_pages(struct brw_page **pga,
1360 #ifdef HAVE_LUSTRE_CRYPTO
1363 for (i = 0; i < page_count; i++) {
1364 /* Bounce pages allocated by a call to
1365 * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1366 * are identified thanks to the PageChecked flag.
1368 if (PageChecked(pga[i]->pg))
1369 llcrypt_finalize_bounce_page(&pga[i]->pg);
1370 pga[i]->count -= pga[i]->bp_count_diff;
1371 pga[i]->off += pga[i]->bp_off_diff;
1377 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1378 u32 page_count, struct brw_page **pga,
1379 struct ptlrpc_request **reqp, int resend)
1381 struct ptlrpc_request *req;
1382 struct ptlrpc_bulk_desc *desc;
1383 struct ost_body *body;
1384 struct obd_ioobj *ioobj;
1385 struct niobuf_remote *niobuf;
1386 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1387 struct osc_brw_async_args *aa;
1388 struct req_capsule *pill;
1389 struct brw_page *pg_prev;
1391 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1392 struct inode *inode = NULL;
1393 bool directio = false;
1397 inode = page2inode(pga[0]->pg);
1398 if (inode == NULL) {
1399 /* Try to get reference to inode from cl_page if we are
1400 * dealing with direct IO, as handled pages are not
1401 * actual page cache pages.
1403 struct osc_async_page *oap = brw_page2oap(pga[0]);
1404 struct cl_page *clpage = oap2cl_page(oap);
1406 inode = clpage->cp_inode;
1411 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1412 RETURN(-ENOMEM); /* Recoverable */
1413 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1414 RETURN(-EINVAL); /* Fatal */
1416 if ((cmd & OBD_BRW_WRITE) != 0) {
1418 req = ptlrpc_request_alloc_pool(cli->cl_import,
1420 &RQF_OST_BRW_WRITE);
1423 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1428 if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1429 for (i = 0; i < page_count; i++) {
1430 struct brw_page *pg = pga[i];
1431 struct page *data_page = NULL;
1432 bool retried = false;
1433 bool lockedbymyself;
1434 u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1435 struct address_space *map_orig = NULL;
1439 if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1440 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1441 LUSTRE_ENCRYPTION_UNIT_SIZE;
1442 /* The page can already be locked when we arrive here.
1443 * This is possible when cl_page_assume/vvp_page_assume
1444 * is stuck on wait_on_page_writeback with page lock
1445 * held. In this case there is no risk for the lock to
1446 * be released while we are doing our encryption
1447 * processing, because writeback against that page will
1448 * end in vvp_page_completion_write/cl_page_completion,
1449 * which means only once the page is fully processed.
1451 lockedbymyself = trylock_page(pg->pg);
1453 map_orig = pg->pg->mapping;
1454 pg->pg->mapping = inode->i_mapping;
1455 index_orig = pg->pg->index;
1456 pg->pg->index = pg->off >> PAGE_SHIFT;
1459 llcrypt_encrypt_pagecache_blocks(pg->pg,
1463 pg->pg->mapping = map_orig;
1464 pg->pg->index = index_orig;
1467 unlock_page(pg->pg);
1468 if (IS_ERR(data_page)) {
1469 rc = PTR_ERR(data_page);
1470 if (rc == -ENOMEM && !retried) {
1475 ptlrpc_request_free(req);
1478 /* Set PageChecked flag on bounce page for
1479 * disambiguation in osc_release_bounce_pages().
1481 SetPageChecked(data_page);
1483 /* there should be no gap in the middle of page array */
1484 if (i == page_count - 1) {
1485 struct osc_async_page *oap = brw_page2oap(pg);
1487 oa->o_size = oap->oap_count +
1488 oap->oap_obj_off + oap->oap_page_off;
1490 /* len is forced to nunits, and relative offset to 0
1491 * so store the old, clear text info
1493 pg->bp_count_diff = nunits - pg->count;
1495 pg->bp_off_diff = pg->off & ~PAGE_MASK;
1496 pg->off = pg->off & PAGE_MASK;
1498 } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1499 for (i = 0; i < page_count; i++) {
1500 struct brw_page *pg = pga[i];
1501 u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1503 if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1504 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1505 LUSTRE_ENCRYPTION_UNIT_SIZE;
1506 /* count/off are forced to cover the whole encryption
1507 * unit size so that all encrypted data is stored on the
1508 * OST, so adjust bp_{count,off}_diff for the size of
1511 pg->bp_count_diff = nunits - pg->count;
1513 pg->bp_off_diff = pg->off & ~PAGE_MASK;
1514 pg->off = pg->off & PAGE_MASK;
1518 for (niocount = i = 1; i < page_count; i++) {
1519 if (!can_merge_pages(pga[i - 1], pga[i]))
1523 pill = &req->rq_pill;
1524 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1526 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1527 niocount * sizeof(*niobuf));
1529 for (i = 0; i < page_count; i++) {
1530 short_io_size += pga[i]->count;
1531 if (!inode || !IS_ENCRYPTED(inode)) {
1532 pga[i]->bp_count_diff = 0;
1533 pga[i]->bp_off_diff = 0;
1537 /* Check if read/write is small enough to be a short io. */
1538 if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1539 !imp_connect_shortio(cli->cl_import))
1542 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1543 opc == OST_READ ? 0 : short_io_size);
1544 if (opc == OST_READ)
1545 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1548 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1550 ptlrpc_request_free(req);
1553 osc_set_io_portal(req);
1555 ptlrpc_at_set_req_timeout(req);
1556 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1558 req->rq_no_retry_einprogress = 1;
1560 if (short_io_size != 0) {
1562 short_io_buf = NULL;
1566 desc = ptlrpc_prep_bulk_imp(req, page_count,
1567 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1568 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1569 PTLRPC_BULK_PUT_SINK),
1571 &ptlrpc_bulk_kiov_pin_ops);
1574 GOTO(out, rc = -ENOMEM);
1575 /* NB request now owns desc and will free it when it gets freed */
1577 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1578 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1579 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1580 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1582 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1584 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1585 * and from_kgid(), because they are asynchronous. Fortunately, variable
1586 * oa contains valid o_uid and o_gid in these two operations.
1587 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1588 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1589 * other process logic */
1590 body->oa.o_uid = oa->o_uid;
1591 body->oa.o_gid = oa->o_gid;
1593 obdo_to_ioobj(oa, ioobj);
1594 ioobj->ioo_bufcnt = niocount;
1595 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1596 * that might be send for this request. The actual number is decided
1597 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1598 * "max - 1" for old client compatibility sending "0", and also so the
1599 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1601 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1603 ioobj_max_brw_set(ioobj, 0);
1605 if (short_io_size != 0) {
1606 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1607 body->oa.o_valid |= OBD_MD_FLFLAGS;
1608 body->oa.o_flags = 0;
1610 body->oa.o_flags |= OBD_FL_SHORT_IO;
1611 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1613 if (opc == OST_WRITE) {
1614 short_io_buf = req_capsule_client_get(pill,
1616 LASSERT(short_io_buf != NULL);
1620 LASSERT(page_count > 0);
1622 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1623 struct brw_page *pg = pga[i];
1624 int poff = pg->off & ~PAGE_MASK;
1626 LASSERT(pg->count > 0);
1627 /* make sure there is no gap in the middle of page array */
1628 LASSERTF(page_count == 1 ||
1629 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1630 ergo(i > 0 && i < page_count - 1,
1631 poff == 0 && pg->count == PAGE_SIZE) &&
1632 ergo(i == page_count - 1, poff == 0)),
1633 "i: %d/%d pg: %p off: %llu, count: %u\n",
1634 i, page_count, pg, pg->off, pg->count);
1635 LASSERTF(i == 0 || pg->off > pg_prev->off,
1636 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1637 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1639 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1640 pg_prev->pg, page_private(pg_prev->pg),
1641 pg_prev->pg->index, pg_prev->off);
1642 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1643 (pg->flag & OBD_BRW_SRVLOCK));
1644 if (short_io_size != 0 && opc == OST_WRITE) {
1645 unsigned char *ptr = kmap_atomic(pg->pg);
1647 LASSERT(short_io_size >= requested_nob + pg->count);
1648 memcpy(short_io_buf + requested_nob,
1652 } else if (short_io_size == 0) {
1653 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1656 requested_nob += pg->count;
1658 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1660 niobuf->rnb_len += pg->count;
1662 niobuf->rnb_offset = pg->off;
1663 niobuf->rnb_len = pg->count;
1664 niobuf->rnb_flags = pg->flag;
1669 LASSERTF((void *)(niobuf - niocount) ==
1670 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1671 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1672 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1674 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1676 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1677 body->oa.o_valid |= OBD_MD_FLFLAGS;
1678 body->oa.o_flags = 0;
1680 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1683 if (osc_should_shrink_grant(cli))
1684 osc_shrink_grant_local(cli, &body->oa);
1686 /* size[REQ_REC_OFF] still sizeof (*body) */
1687 if (opc == OST_WRITE) {
1688 if (cli->cl_checksum &&
1689 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1690 /* store cl_cksum_type in a local variable since
1691 * it can be changed via lprocfs */
1692 enum cksum_types cksum_type = cli->cl_cksum_type;
1694 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1695 body->oa.o_flags = 0;
1697 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1699 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1701 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1702 requested_nob, page_count,
1706 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1710 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1713 /* save this in 'oa', too, for later checking */
1714 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1715 oa->o_flags |= obd_cksum_type_pack(obd_name,
1718 /* clear out the checksum flag, in case this is a
1719 * resend but cl_checksum is no longer set. b=11238 */
1720 oa->o_valid &= ~OBD_MD_FLCKSUM;
1722 oa->o_cksum = body->oa.o_cksum;
1723 /* 1 RC per niobuf */
1724 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1725 sizeof(__u32) * niocount);
1727 if (cli->cl_checksum &&
1728 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1729 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1730 body->oa.o_flags = 0;
1731 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1732 cli->cl_cksum_type);
1733 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1736 /* Client cksum has been already copied to wire obdo in previous
1737 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1738 * resent due to cksum error, this will allow Server to
1739 * check+dump pages on its side */
1741 ptlrpc_request_set_replen(req);
1743 aa = ptlrpc_req_async_args(aa, req);
1745 aa->aa_requested_nob = requested_nob;
1746 aa->aa_nio_count = niocount;
1747 aa->aa_page_count = page_count;
1751 INIT_LIST_HEAD(&aa->aa_oaps);
1754 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1755 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1756 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1757 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1761 ptlrpc_req_finished(req);
1765 char dbgcksum_file_name[PATH_MAX];
1767 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1768 struct brw_page **pga, __u32 server_cksum,
1776 /* will only keep dump of pages on first error for the same range in
1777 * file/fid, not during the resends/retries. */
1778 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1779 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1780 (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
1781 libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1782 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1783 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1784 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1786 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1787 client_cksum, server_cksum);
1788 filp = filp_open(dbgcksum_file_name,
1789 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1793 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1794 "checksum error: rc = %d\n", dbgcksum_file_name,
1797 CERROR("%s: can't open to dump pages with checksum "
1798 "error: rc = %d\n", dbgcksum_file_name, rc);
1802 for (i = 0; i < page_count; i++) {
1803 len = pga[i]->count;
1804 buf = kmap(pga[i]->pg);
1806 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1808 CERROR("%s: wanted to write %u but got %d "
1809 "error\n", dbgcksum_file_name, len, rc);
1814 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1815 dbgcksum_file_name, rc);
1820 rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1822 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1823 filp_close(filp, NULL);
1827 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1828 __u32 client_cksum, __u32 server_cksum,
1829 struct osc_brw_async_args *aa)
1831 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1832 enum cksum_types cksum_type;
1833 obd_dif_csum_fn *fn = NULL;
1834 int sector_size = 0;
1839 if (server_cksum == client_cksum) {
1840 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1844 if (aa->aa_cli->cl_checksum_dump)
1845 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1846 server_cksum, client_cksum);
1848 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1851 switch (cksum_type) {
1852 case OBD_CKSUM_T10IP512:
1856 case OBD_CKSUM_T10IP4K:
1860 case OBD_CKSUM_T10CRC512:
1861 fn = obd_dif_crc_fn;
1864 case OBD_CKSUM_T10CRC4K:
1865 fn = obd_dif_crc_fn;
1873 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1874 aa->aa_page_count, aa->aa_ppga,
1875 OST_WRITE, fn, sector_size,
1878 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1879 aa->aa_ppga, OST_WRITE, cksum_type,
1883 msg = "failed to calculate the client write checksum";
1884 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1885 msg = "the server did not use the checksum type specified in "
1886 "the original request - likely a protocol problem";
1887 else if (new_cksum == server_cksum)
1888 msg = "changed on the client after we checksummed it - "
1889 "likely false positive due to mmap IO (bug 11742)";
1890 else if (new_cksum == client_cksum)
1891 msg = "changed in transit before arrival at OST";
1893 msg = "changed in transit AND doesn't match the original - "
1894 "likely false positive due to mmap IO (bug 11742)";
1896 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1897 DFID " object "DOSTID" extent [%llu-%llu], original "
1898 "client csum %x (type %x), server csum %x (type %x),"
1899 " client csum now %x\n",
1900 obd_name, msg, libcfs_nid2str(peer->nid),
1901 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1902 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1903 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1904 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1905 aa->aa_ppga[aa->aa_page_count - 1]->off +
1906 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1908 obd_cksum_type_unpack(aa->aa_oa->o_flags),
1909 server_cksum, cksum_type, new_cksum);
1913 /* Note rc enters this function as number of bytes transferred */
1914 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1916 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1917 struct client_obd *cli = aa->aa_cli;
1918 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1919 const struct lnet_process_id *peer =
1920 &req->rq_import->imp_connection->c_peer;
1921 struct ost_body *body;
1922 u32 client_cksum = 0;
1923 struct inode *inode;
1924 unsigned int blockbits = 0, blocksize = 0;
1928 if (rc < 0 && rc != -EDQUOT) {
1929 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1933 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1934 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1936 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1940 /* set/clear over quota flag for a uid/gid/projid */
1941 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1942 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1943 unsigned qid[LL_MAXQUOTAS] = {
1944 body->oa.o_uid, body->oa.o_gid,
1945 body->oa.o_projid };
1947 "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1948 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1949 body->oa.o_valid, body->oa.o_flags);
1950 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1954 osc_update_grant(cli, body);
1959 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1960 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1962 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1964 CERROR("%s: unexpected positive size %d\n",
1969 if (req->rq_bulk != NULL &&
1970 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1973 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1974 check_write_checksum(&body->oa, peer, client_cksum,
1975 body->oa.o_cksum, aa))
1978 rc = check_write_rcs(req, aa->aa_requested_nob,
1979 aa->aa_nio_count, aa->aa_page_count,
1984 /* The rest of this function executes only for OST_READs */
1986 if (req->rq_bulk == NULL) {
1987 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1989 LASSERT(rc == req->rq_status);
1991 /* if unwrap_bulk failed, return -EAGAIN to retry */
1992 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1995 GOTO(out, rc = -EAGAIN);
1997 if (rc > aa->aa_requested_nob) {
1998 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1999 rc, aa->aa_requested_nob);
2003 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2004 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2005 rc, req->rq_bulk->bd_nob_transferred);
2009 if (req->rq_bulk == NULL) {
2011 int nob, pg_count, i = 0;
2014 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2015 pg_count = aa->aa_page_count;
2016 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2019 while (nob > 0 && pg_count > 0) {
2021 int count = aa->aa_ppga[i]->count > nob ?
2022 nob : aa->aa_ppga[i]->count;
2024 CDEBUG(D_CACHE, "page %p count %d\n",
2025 aa->aa_ppga[i]->pg, count);
2026 ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2027 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2029 kunmap_atomic((void *) ptr);
2038 if (rc < aa->aa_requested_nob)
2039 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2041 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2042 static int cksum_counter;
2043 u32 server_cksum = body->oa.o_cksum;
2046 enum cksum_types cksum_type;
2047 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2048 body->oa.o_flags : 0;
2050 cksum_type = obd_cksum_type_unpack(o_flags);
2051 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2052 aa->aa_page_count, aa->aa_ppga,
2053 OST_READ, &client_cksum);
2057 if (req->rq_bulk != NULL &&
2058 peer->nid != req->rq_bulk->bd_sender) {
2060 router = libcfs_nid2str(req->rq_bulk->bd_sender);
2063 if (server_cksum != client_cksum) {
2064 struct ost_body *clbody;
2065 u32 page_count = aa->aa_page_count;
2067 clbody = req_capsule_client_get(&req->rq_pill,
2069 if (cli->cl_checksum_dump)
2070 dump_all_bulk_pages(&clbody->oa, page_count,
2071 aa->aa_ppga, server_cksum,
2074 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2075 "%s%s%s inode "DFID" object "DOSTID
2076 " extent [%llu-%llu], client %x, "
2077 "server %x, cksum_type %x\n",
2079 libcfs_nid2str(peer->nid),
2081 clbody->oa.o_valid & OBD_MD_FLFID ?
2082 clbody->oa.o_parent_seq : 0ULL,
2083 clbody->oa.o_valid & OBD_MD_FLFID ?
2084 clbody->oa.o_parent_oid : 0,
2085 clbody->oa.o_valid & OBD_MD_FLFID ?
2086 clbody->oa.o_parent_ver : 0,
2087 POSTID(&body->oa.o_oi),
2088 aa->aa_ppga[0]->off,
2089 aa->aa_ppga[page_count-1]->off +
2090 aa->aa_ppga[page_count-1]->count - 1,
2091 client_cksum, server_cksum,
2094 aa->aa_oa->o_cksum = client_cksum;
2098 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2101 } else if (unlikely(client_cksum)) {
2102 static int cksum_missed;
2105 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2106 CERROR("%s: checksum %u requested from %s but not sent\n",
2107 obd_name, cksum_missed,
2108 libcfs_nid2str(peer->nid));
2113 inode = page2inode(aa->aa_ppga[0]->pg);
2114 if (inode == NULL) {
2115 /* Try to get reference to inode from cl_page if we are
2116 * dealing with direct IO, as handled pages are not
2117 * actual page cache pages.
2119 struct osc_async_page *oap = brw_page2oap(aa->aa_ppga[0]);
2121 inode = oap2cl_page(oap)->cp_inode;
2123 blockbits = inode->i_blkbits;
2124 blocksize = 1 << blockbits;
2127 if (inode && IS_ENCRYPTED(inode)) {
2130 if (!llcrypt_has_encryption_key(inode)) {
2131 CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2134 for (idx = 0; idx < aa->aa_page_count; idx++) {
2135 struct brw_page *pg = aa->aa_ppga[idx];
2136 unsigned int offs = 0;
2138 while (offs < PAGE_SIZE) {
2139 /* do not decrypt if page is all 0s */
2140 if (memchr_inv(page_address(pg->pg) + offs, 0,
2141 LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2142 /* if page is empty forward info to
2143 * upper layers (ll_io_zero_page) by
2144 * clearing PagePrivate2
2147 ClearPagePrivate2(pg->pg);
2152 /* This is direct IO case. Directly call
2153 * decrypt function that takes inode as
2154 * input parameter. Page does not need
2158 ((u64)(pg->off >> PAGE_SHIFT) <<
2159 (PAGE_SHIFT - blockbits)) +
2160 (offs >> blockbits);
2165 LUSTRE_ENCRYPTION_UNIT_SIZE;
2166 i += blocksize, lblk_num++) {
2168 llcrypt_decrypt_block_inplace(
2176 rc = llcrypt_decrypt_pagecache_blocks(
2178 LUSTRE_ENCRYPTION_UNIT_SIZE,
2184 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2191 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2192 aa->aa_oa, &body->oa);
2197 static int osc_brw_redo_request(struct ptlrpc_request *request,
2198 struct osc_brw_async_args *aa, int rc)
2200 struct ptlrpc_request *new_req;
2201 struct osc_brw_async_args *new_aa;
2202 struct osc_async_page *oap;
2205 /* The below message is checked in replay-ost-single.sh test_8ae*/
2206 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2207 "redo for recoverable error %d", rc);
2209 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2210 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2211 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2212 aa->aa_ppga, &new_req, 1);
2216 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2217 if (oap->oap_request != NULL) {
2218 LASSERTF(request == oap->oap_request,
2219 "request %p != oap_request %p\n",
2220 request, oap->oap_request);
2224 * New request takes over pga and oaps from old request.
2225 * Note that copying a list_head doesn't work, need to move it...
2228 new_req->rq_interpret_reply = request->rq_interpret_reply;
2229 new_req->rq_async_args = request->rq_async_args;
2230 new_req->rq_commit_cb = request->rq_commit_cb;
2231 /* cap resend delay to the current request timeout, this is similar to
2232 * what ptlrpc does (see after_reply()) */
2233 if (aa->aa_resends > new_req->rq_timeout)
2234 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2236 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2237 new_req->rq_generation_set = 1;
2238 new_req->rq_import_generation = request->rq_import_generation;
2240 new_aa = ptlrpc_req_async_args(new_aa, new_req);
2242 INIT_LIST_HEAD(&new_aa->aa_oaps);
2243 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2244 INIT_LIST_HEAD(&new_aa->aa_exts);
2245 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2246 new_aa->aa_resends = aa->aa_resends;
2248 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2249 if (oap->oap_request) {
2250 ptlrpc_req_finished(oap->oap_request);
2251 oap->oap_request = ptlrpc_request_addref(new_req);
2255 /* XXX: This code will run into problem if we're going to support
2256 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2257 * and wait for all of them to be finished. We should inherit request
2258 * set from old request. */
2259 ptlrpcd_add_req(new_req);
2261 DEBUG_REQ(D_INFO, new_req, "new request");
2266 * ugh, we want disk allocation on the target to happen in offset order. we'll
2267 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2268 * fine for our small page arrays and doesn't require allocation. its an
2269 * insertion sort that swaps elements that are strides apart, shrinking the
2270 * stride down until its '1' and the array is sorted.
2272 static void sort_brw_pages(struct brw_page **array, int num)
2275 struct brw_page *tmp;
2279 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2284 for (i = stride ; i < num ; i++) {
2287 while (j >= stride && array[j - stride]->off > tmp->off) {
2288 array[j] = array[j - stride];
2293 } while (stride > 1);
2296 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2298 LASSERT(ppga != NULL);
2299 OBD_FREE_PTR_ARRAY_LARGE(ppga, count);
2302 static int brw_interpret(const struct lu_env *env,
2303 struct ptlrpc_request *req, void *args, int rc)
2305 struct osc_brw_async_args *aa = args;
2306 struct osc_extent *ext;
2307 struct osc_extent *tmp;
2308 struct client_obd *cli = aa->aa_cli;
2309 unsigned long transferred = 0;
2313 rc = osc_brw_fini_request(req, rc);
2314 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2316 /* restore clear text pages */
2317 osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2320 * When server returns -EINPROGRESS, client should always retry
2321 * regardless of the number of times the bulk was resent already.
2323 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2324 if (req->rq_import_generation !=
2325 req->rq_import->imp_generation) {
2326 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2327 ""DOSTID", rc = %d.\n",
2328 req->rq_import->imp_obd->obd_name,
2329 POSTID(&aa->aa_oa->o_oi), rc);
2330 } else if (rc == -EINPROGRESS ||
2331 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2332 rc = osc_brw_redo_request(req, aa, rc);
2334 CERROR("%s: too many resent retries for object: "
2335 "%llu:%llu, rc = %d.\n",
2336 req->rq_import->imp_obd->obd_name,
2337 POSTID(&aa->aa_oa->o_oi), rc);
2342 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2347 struct obdo *oa = aa->aa_oa;
2348 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2349 unsigned long valid = 0;
2350 struct cl_object *obj;
2351 struct osc_async_page *last;
2353 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2354 obj = osc2cl(last->oap_obj);
2356 cl_object_attr_lock(obj);
2357 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2358 attr->cat_blocks = oa->o_blocks;
2359 valid |= CAT_BLOCKS;
2361 if (oa->o_valid & OBD_MD_FLMTIME) {
2362 attr->cat_mtime = oa->o_mtime;
2365 if (oa->o_valid & OBD_MD_FLATIME) {
2366 attr->cat_atime = oa->o_atime;
2369 if (oa->o_valid & OBD_MD_FLCTIME) {
2370 attr->cat_ctime = oa->o_ctime;
2374 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2375 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2376 loff_t last_off = last->oap_count + last->oap_obj_off +
2379 /* Change file size if this is an out of quota or
2380 * direct IO write and it extends the file size */
2381 if (loi->loi_lvb.lvb_size < last_off) {
2382 attr->cat_size = last_off;
2385 /* Extend KMS if it's not a lockless write */
2386 if (loi->loi_kms < last_off &&
2387 oap2osc_page(last)->ops_srvlock == 0) {
2388 attr->cat_kms = last_off;
2394 cl_object_attr_update(env, obj, attr, valid);
2395 cl_object_attr_unlock(obj);
2397 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2400 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2401 osc_inc_unstable_pages(req);
2403 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2404 list_del_init(&ext->oe_link);
2405 osc_extent_finish(env, ext, 1,
2406 rc && req->rq_no_delay ? -EAGAIN : rc);
2408 LASSERT(list_empty(&aa->aa_exts));
2409 LASSERT(list_empty(&aa->aa_oaps));
2411 transferred = (req->rq_bulk == NULL ? /* short io */
2412 aa->aa_requested_nob :
2413 req->rq_bulk->bd_nob_transferred);
2415 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2416 ptlrpc_lprocfs_brw(req, transferred);
2418 spin_lock(&cli->cl_loi_list_lock);
2419 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2420 * is called so we know whether to go to sync BRWs or wait for more
2421 * RPCs to complete */
2422 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2423 cli->cl_w_in_flight--;
2425 cli->cl_r_in_flight--;
2426 osc_wake_cache_waiters(cli);
2427 spin_unlock(&cli->cl_loi_list_lock);
2429 osc_io_unplug(env, cli, NULL);
2433 static void brw_commit(struct ptlrpc_request *req)
2435 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2436 * this called via the rq_commit_cb, I need to ensure
2437 * osc_dec_unstable_pages is still called. Otherwise unstable
2438 * pages may be leaked. */
2439 spin_lock(&req->rq_lock);
2440 if (likely(req->rq_unstable)) {
2441 req->rq_unstable = 0;
2442 spin_unlock(&req->rq_lock);
2444 osc_dec_unstable_pages(req);
2446 req->rq_committed = 1;
2447 spin_unlock(&req->rq_lock);
2452 * Build an RPC by the list of extent @ext_list. The caller must ensure
2453 * that the total pages in this list are NOT over max pages per RPC.
2454 * Extents in the list must be in OES_RPC state.
2456 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2457 struct list_head *ext_list, int cmd)
2459 struct ptlrpc_request *req = NULL;
2460 struct osc_extent *ext;
2461 struct brw_page **pga = NULL;
2462 struct osc_brw_async_args *aa = NULL;
2463 struct obdo *oa = NULL;
2464 struct osc_async_page *oap;
2465 struct osc_object *obj = NULL;
2466 struct cl_req_attr *crattr = NULL;
2467 loff_t starting_offset = OBD_OBJECT_EOF;
2468 loff_t ending_offset = 0;
2469 /* '1' for consistency with code that checks !mpflag to restore */
2473 bool soft_sync = false;
2474 bool ndelay = false;
2478 __u32 layout_version = 0;
2479 LIST_HEAD(rpc_list);
2480 struct ost_body *body;
2482 LASSERT(!list_empty(ext_list));
2484 /* add pages into rpc_list to build BRW rpc */
2485 list_for_each_entry(ext, ext_list, oe_link) {
2486 LASSERT(ext->oe_state == OES_RPC);
2487 mem_tight |= ext->oe_memalloc;
2488 grant += ext->oe_grants;
2489 page_count += ext->oe_nr_pages;
2490 layout_version = max(layout_version, ext->oe_layout_version);
2495 soft_sync = osc_over_unstable_soft_limit(cli);
2497 mpflag = memalloc_noreclaim_save();
2499 OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count);
2501 GOTO(out, rc = -ENOMEM);
2503 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2505 GOTO(out, rc = -ENOMEM);
2508 list_for_each_entry(ext, ext_list, oe_link) {
2509 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2511 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2513 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2514 pga[i] = &oap->oap_brw_page;
2515 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2518 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2519 if (starting_offset == OBD_OBJECT_EOF ||
2520 starting_offset > oap->oap_obj_off)
2521 starting_offset = oap->oap_obj_off;
2523 LASSERT(oap->oap_page_off == 0);
2524 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2525 ending_offset = oap->oap_obj_off +
2528 LASSERT(oap->oap_page_off + oap->oap_count ==
2535 /* first page in the list */
2536 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2538 crattr = &osc_env_info(env)->oti_req_attr;
2539 memset(crattr, 0, sizeof(*crattr));
2540 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2541 crattr->cra_flags = ~0ULL;
2542 crattr->cra_page = oap2cl_page(oap);
2543 crattr->cra_oa = oa;
2544 cl_req_attr_set(env, osc2cl(obj), crattr);
2546 if (cmd == OBD_BRW_WRITE) {
2547 oa->o_grant_used = grant;
2548 if (layout_version > 0) {
2549 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2550 PFID(&oa->o_oi.oi_fid), layout_version);
2552 oa->o_layout_version = layout_version;
2553 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2557 sort_brw_pages(pga, page_count);
2558 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2560 CERROR("prep_req failed: %d\n", rc);
2564 req->rq_commit_cb = brw_commit;
2565 req->rq_interpret_reply = brw_interpret;
2566 req->rq_memalloc = mem_tight != 0;
2567 oap->oap_request = ptlrpc_request_addref(req);
2569 req->rq_no_resend = req->rq_no_delay = 1;
2570 /* probably set a shorter timeout value.
2571 * to handle ETIMEDOUT in brw_interpret() correctly. */
2572 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2575 /* Need to update the timestamps after the request is built in case
2576 * we race with setattr (locally or in queue at OST). If OST gets
2577 * later setattr before earlier BRW (as determined by the request xid),
2578 * the OST will not use BRW timestamps. Sadly, there is no obvious
2579 * way to do this in a single call. bug 10150 */
2580 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2581 crattr->cra_oa = &body->oa;
2582 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2583 cl_req_attr_set(env, osc2cl(obj), crattr);
2584 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2586 aa = ptlrpc_req_async_args(aa, req);
2587 INIT_LIST_HEAD(&aa->aa_oaps);
2588 list_splice_init(&rpc_list, &aa->aa_oaps);
2589 INIT_LIST_HEAD(&aa->aa_exts);
2590 list_splice_init(ext_list, &aa->aa_exts);
2592 spin_lock(&cli->cl_loi_list_lock);
2593 starting_offset >>= PAGE_SHIFT;
2594 if (cmd == OBD_BRW_READ) {
2595 cli->cl_r_in_flight++;
2596 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2597 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2598 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2599 starting_offset + 1);
2601 cli->cl_w_in_flight++;
2602 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2603 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2604 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2605 starting_offset + 1);
2607 spin_unlock(&cli->cl_loi_list_lock);
2609 DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2610 page_count, aa, cli->cl_r_in_flight,
2611 cli->cl_w_in_flight);
2612 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2614 ptlrpcd_add_req(req);
2620 memalloc_noreclaim_restore(mpflag);
2623 LASSERT(req == NULL);
2626 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2628 osc_release_bounce_pages(pga, page_count);
2629 osc_release_ppga(pga, page_count);
2631 /* this should happen rarely and is pretty bad, it makes the
2632 * pending list not follow the dirty order */
2633 while (!list_empty(ext_list)) {
2634 ext = list_entry(ext_list->next, struct osc_extent,
2636 list_del_init(&ext->oe_link);
2637 osc_extent_finish(env, ext, 0, rc);
2643 /* This is to refresh our lock in face of no RPCs. */
2644 void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start)
2646 struct ptlrpc_request *req;
2648 struct brw_page bpg = { .off = start, .count = 1};
2649 struct brw_page *pga = &bpg;
2652 memset(&oa, 0, sizeof(oa));
2653 oa.o_oi = osc->oo_oinfo->loi_oi;
2654 oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
2655 /* For updated servers - don't do a read */
2656 oa.o_flags = OBD_FL_NORPC;
2658 rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga,
2661 /* If we succeeded we ship it off, if not there's no point in doing
2662 * anything. Also no resends.
2663 * No interpret callback, no commit callback.
2666 req->rq_no_resend = 1;
2667 ptlrpcd_add_req(req);
2671 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2675 LASSERT(lock != NULL);
2677 lock_res_and_lock(lock);
2679 if (lock->l_ast_data == NULL)
2680 lock->l_ast_data = data;
2681 if (lock->l_ast_data == data)
2684 unlock_res_and_lock(lock);
2689 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2690 void *cookie, struct lustre_handle *lockh,
2691 enum ldlm_mode mode, __u64 *flags, bool speculative,
2694 bool intent = *flags & LDLM_FL_HAS_INTENT;
2698 /* The request was created before ldlm_cli_enqueue call. */
2699 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2700 struct ldlm_reply *rep;
2702 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2703 LASSERT(rep != NULL);
2705 rep->lock_policy_res1 =
2706 ptlrpc_status_ntoh(rep->lock_policy_res1);
2707 if (rep->lock_policy_res1)
2708 errcode = rep->lock_policy_res1;
2710 *flags |= LDLM_FL_LVB_READY;
2711 } else if (errcode == ELDLM_OK) {
2712 *flags |= LDLM_FL_LVB_READY;
2715 /* Call the update callback. */
2716 rc = (*upcall)(cookie, lockh, errcode);
2718 /* release the reference taken in ldlm_cli_enqueue() */
2719 if (errcode == ELDLM_LOCK_MATCHED)
2721 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2722 ldlm_lock_decref(lockh, mode);
2727 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2730 struct osc_enqueue_args *aa = args;
2731 struct ldlm_lock *lock;
2732 struct lustre_handle *lockh = &aa->oa_lockh;
2733 enum ldlm_mode mode = aa->oa_mode;
2734 struct ost_lvb *lvb = aa->oa_lvb;
2735 __u32 lvb_len = sizeof(*lvb);
2737 struct ldlm_enqueue_info einfo = {
2738 .ei_type = aa->oa_type,
2744 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2746 lock = ldlm_handle2lock(lockh);
2747 LASSERTF(lock != NULL,
2748 "lockh %#llx, req %p, aa %p - client evicted?\n",
2749 lockh->cookie, req, aa);
2751 /* Take an additional reference so that a blocking AST that
2752 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2753 * to arrive after an upcall has been executed by
2754 * osc_enqueue_fini(). */
2755 ldlm_lock_addref(lockh, mode);
2757 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2758 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2760 /* Let CP AST to grant the lock first. */
2761 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2763 if (aa->oa_speculative) {
2764 LASSERT(aa->oa_lvb == NULL);
2765 LASSERT(aa->oa_flags == NULL);
2766 aa->oa_flags = &flags;
2769 /* Complete obtaining the lock procedure. */
2770 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
2771 lvb, lvb_len, lockh, rc);
2772 /* Complete osc stuff. */
2773 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2774 aa->oa_flags, aa->oa_speculative, rc);
2776 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2778 ldlm_lock_decref(lockh, mode);
2779 LDLM_LOCK_PUT(lock);
2783 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2784 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2785 * other synchronous requests, however keeping some locks and trying to obtain
2786 * others may take a considerable amount of time in a case of ost failure; and
2787 * when other sync requests do not get released lock from a client, the client
2788 * is evicted from the cluster -- such scenarious make the life difficult, so
2789 * release locks just after they are obtained. */
2790 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2791 __u64 *flags, union ldlm_policy_data *policy,
2792 struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2793 void *cookie, struct ldlm_enqueue_info *einfo,
2794 struct ptlrpc_request_set *rqset, int async,
2797 struct obd_device *obd = exp->exp_obd;
2798 struct lustre_handle lockh = { 0 };
2799 struct ptlrpc_request *req = NULL;
2800 int intent = *flags & LDLM_FL_HAS_INTENT;
2801 __u64 match_flags = *flags;
2802 enum ldlm_mode mode;
2806 /* Filesystem lock extents are extended to page boundaries so that
2807 * dealing with the page cache is a little smoother. */
2808 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2809 policy->l_extent.end |= ~PAGE_MASK;
2811 /* Next, search for already existing extent locks that will cover us */
2812 /* If we're trying to read, we also search for an existing PW lock. The
2813 * VFS and page cache already protect us locally, so lots of readers/
2814 * writers can share a single PW lock.
2816 * There are problems with conversion deadlocks, so instead of
2817 * converting a read lock to a write lock, we'll just enqueue a new
2820 * At some point we should cancel the read lock instead of making them
2821 * send us a blocking callback, but there are problems with canceling
2822 * locks out from other users right now, too. */
2823 mode = einfo->ei_mode;
2824 if (einfo->ei_mode == LCK_PR)
2826 /* Normal lock requests must wait for the LVB to be ready before
2827 * matching a lock; speculative lock requests do not need to,
2828 * because they will not actually use the lock. */
2830 match_flags |= LDLM_FL_LVB_READY;
2832 match_flags |= LDLM_FL_BLOCK_GRANTED;
2833 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2834 einfo->ei_type, policy, mode, &lockh);
2836 struct ldlm_lock *matched;
2838 if (*flags & LDLM_FL_TEST_LOCK)
2841 matched = ldlm_handle2lock(&lockh);
2843 /* This DLM lock request is speculative, and does not
2844 * have an associated IO request. Therefore if there
2845 * is already a DLM lock, it wll just inform the
2846 * caller to cancel the request for this stripe.*/
2847 lock_res_and_lock(matched);
2848 if (ldlm_extent_equal(&policy->l_extent,
2849 &matched->l_policy_data.l_extent))
2853 unlock_res_and_lock(matched);
2855 ldlm_lock_decref(&lockh, mode);
2856 LDLM_LOCK_PUT(matched);
2858 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2859 *flags |= LDLM_FL_LVB_READY;
2861 /* We already have a lock, and it's referenced. */
2862 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2864 ldlm_lock_decref(&lockh, mode);
2865 LDLM_LOCK_PUT(matched);
2868 ldlm_lock_decref(&lockh, mode);
2869 LDLM_LOCK_PUT(matched);
2873 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2876 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2877 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2879 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2880 sizeof(*lvb), LVB_T_OST, &lockh, async);
2883 struct osc_enqueue_args *aa;
2884 aa = ptlrpc_req_async_args(aa, req);
2886 aa->oa_mode = einfo->ei_mode;
2887 aa->oa_type = einfo->ei_type;
2888 lustre_handle_copy(&aa->oa_lockh, &lockh);
2889 aa->oa_upcall = upcall;
2890 aa->oa_cookie = cookie;
2891 aa->oa_speculative = speculative;
2893 aa->oa_flags = flags;
2896 /* speculative locks are essentially to enqueue
2897 * a DLM lock in advance, so we don't care
2898 * about the result of the enqueue. */
2900 aa->oa_flags = NULL;
2903 req->rq_interpret_reply = osc_enqueue_interpret;
2904 ptlrpc_set_add_req(rqset, req);
2909 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2910 flags, speculative, rc);
2915 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2916 struct ldlm_res_id *res_id, enum ldlm_type type,
2917 union ldlm_policy_data *policy, enum ldlm_mode mode,
2918 __u64 *flags, struct osc_object *obj,
2919 struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2921 struct obd_device *obd = exp->exp_obd;
2922 __u64 lflags = *flags;
2926 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2929 /* Filesystem lock extents are extended to page boundaries so that
2930 * dealing with the page cache is a little smoother */
2931 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2932 policy->l_extent.end |= ~PAGE_MASK;
2934 /* Next, search for already existing extent locks that will cover us */
2935 rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2936 res_id, type, policy, mode, lockh,
2938 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2942 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2944 LASSERT(lock != NULL);
2945 if (osc_set_lock_data(lock, obj)) {
2946 lock_res_and_lock(lock);
2947 if (!ldlm_is_lvb_cached(lock)) {
2948 LASSERT(lock->l_ast_data == obj);
2949 osc_lock_lvb_update(env, obj, lock, NULL);
2950 ldlm_set_lvb_cached(lock);
2952 unlock_res_and_lock(lock);
2954 ldlm_lock_decref(lockh, rc);
2957 LDLM_LOCK_PUT(lock);
2962 static int osc_statfs_interpret(const struct lu_env *env,
2963 struct ptlrpc_request *req, void *args, int rc)
2965 struct osc_async_args *aa = args;
2966 struct obd_statfs *msfs;
2971 * The request has in fact never been sent due to issues at
2972 * a higher level (LOV). Exit immediately since the caller
2973 * is aware of the problem and takes care of the clean up.
2977 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2978 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2984 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2986 GOTO(out, rc = -EPROTO);
2988 *aa->aa_oi->oi_osfs = *msfs;
2990 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2995 static int osc_statfs_async(struct obd_export *exp,
2996 struct obd_info *oinfo, time64_t max_age,
2997 struct ptlrpc_request_set *rqset)
2999 struct obd_device *obd = class_exp2obd(exp);
3000 struct ptlrpc_request *req;
3001 struct osc_async_args *aa;
3005 if (obd->obd_osfs_age >= max_age) {
3007 "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
3008 obd->obd_name, &obd->obd_osfs,
3009 obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
3010 obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
3011 spin_lock(&obd->obd_osfs_lock);
3012 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
3013 spin_unlock(&obd->obd_osfs_lock);
3014 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
3015 if (oinfo->oi_cb_up)
3016 oinfo->oi_cb_up(oinfo, 0);
3021 /* We could possibly pass max_age in the request (as an absolute
3022 * timestamp or a "seconds.usec ago") so the target can avoid doing
3023 * extra calls into the filesystem if that isn't necessary (e.g.
3024 * during mount that would help a bit). Having relative timestamps
3025 * is not so great if request processing is slow, while absolute
3026 * timestamps are not ideal because they need time synchronization. */
3027 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3031 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3033 ptlrpc_request_free(req);
3036 ptlrpc_request_set_replen(req);
3037 req->rq_request_portal = OST_CREATE_PORTAL;
3038 ptlrpc_at_set_req_timeout(req);
3040 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3041 /* procfs requests not want stat in wait for avoid deadlock */
3042 req->rq_no_resend = 1;
3043 req->rq_no_delay = 1;
3046 req->rq_interpret_reply = osc_statfs_interpret;
3047 aa = ptlrpc_req_async_args(aa, req);
3050 ptlrpc_set_add_req(rqset, req);
3054 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3055 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3057 struct obd_device *obd = class_exp2obd(exp);
3058 struct obd_statfs *msfs;
3059 struct ptlrpc_request *req;
3060 struct obd_import *imp, *imp0;
3064 /*Since the request might also come from lprocfs, so we need
3065 *sync this with client_disconnect_export Bug15684
3067 with_imp_locked(obd, imp0, rc)
3068 imp = class_import_get(imp0);
3072 /* We could possibly pass max_age in the request (as an absolute
3073 * timestamp or a "seconds.usec ago") so the target can avoid doing
3074 * extra calls into the filesystem if that isn't necessary (e.g.
3075 * during mount that would help a bit). Having relative timestamps
3076 * is not so great if request processing is slow, while absolute
3077 * timestamps are not ideal because they need time synchronization. */
3078 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3080 class_import_put(imp);
3085 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3087 ptlrpc_request_free(req);
3090 ptlrpc_request_set_replen(req);
3091 req->rq_request_portal = OST_CREATE_PORTAL;
3092 ptlrpc_at_set_req_timeout(req);
3094 if (flags & OBD_STATFS_NODELAY) {
3095 /* procfs requests not want stat in wait for avoid deadlock */
3096 req->rq_no_resend = 1;
3097 req->rq_no_delay = 1;
3100 rc = ptlrpc_queue_wait(req);
3104 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3106 GOTO(out, rc = -EPROTO);
3112 ptlrpc_req_finished(req);
3116 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3117 void *karg, void __user *uarg)
3119 struct obd_device *obd = exp->exp_obd;
3120 struct obd_ioctl_data *data = karg;
3124 if (!try_module_get(THIS_MODULE)) {
3125 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3126 module_name(THIS_MODULE));
3130 case OBD_IOC_CLIENT_RECOVER:
3131 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3132 data->ioc_inlbuf1, 0);
3136 case IOC_OSC_SET_ACTIVE:
3137 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3142 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3143 obd->obd_name, cmd, current->comm, rc);
3147 module_put(THIS_MODULE);
3151 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3152 u32 keylen, void *key, u32 vallen, void *val,
3153 struct ptlrpc_request_set *set)
3155 struct ptlrpc_request *req;
3156 struct obd_device *obd = exp->exp_obd;
3157 struct obd_import *imp = class_exp2cliimp(exp);
3162 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3164 if (KEY_IS(KEY_CHECKSUM)) {
3165 if (vallen != sizeof(int))
3167 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3171 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3172 sptlrpc_conf_client_adapt(obd);
3176 if (KEY_IS(KEY_FLUSH_CTX)) {
3177 sptlrpc_import_flush_my_ctx(imp);
3181 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3182 struct client_obd *cli = &obd->u.cli;
3183 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3184 long target = *(long *)val;
3186 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3191 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3194 /* We pass all other commands directly to OST. Since nobody calls osc
3195 methods directly and everybody is supposed to go through LOV, we
3196 assume lov checked invalid values for us.
3197 The only recognised values so far are evict_by_nid and mds_conn.
3198 Even if something bad goes through, we'd get a -EINVAL from OST
3201 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3202 &RQF_OST_SET_GRANT_INFO :
3207 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3208 RCL_CLIENT, keylen);
3209 if (!KEY_IS(KEY_GRANT_SHRINK))
3210 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3211 RCL_CLIENT, vallen);
3212 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3214 ptlrpc_request_free(req);
3218 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3219 memcpy(tmp, key, keylen);
3220 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3223 memcpy(tmp, val, vallen);
3225 if (KEY_IS(KEY_GRANT_SHRINK)) {
3226 struct osc_grant_args *aa;
3229 aa = ptlrpc_req_async_args(aa, req);
3230 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3232 ptlrpc_req_finished(req);
3235 *oa = ((struct ost_body *)val)->oa;
3237 req->rq_interpret_reply = osc_shrink_grant_interpret;
3240 ptlrpc_request_set_replen(req);
3241 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3242 LASSERT(set != NULL);
3243 ptlrpc_set_add_req(set, req);
3244 ptlrpc_check_set(NULL, set);
3246 ptlrpcd_add_req(req);
3251 EXPORT_SYMBOL(osc_set_info_async);
3253 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3254 struct obd_device *obd, struct obd_uuid *cluuid,
3255 struct obd_connect_data *data, void *localdata)
3257 struct client_obd *cli = &obd->u.cli;
3259 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3263 spin_lock(&cli->cl_loi_list_lock);
3264 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3265 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3266 /* restore ocd_grant_blkbits as client page bits */
3267 data->ocd_grant_blkbits = PAGE_SHIFT;
3268 grant += cli->cl_dirty_grant;
3270 grant += cli->cl_dirty_pages << PAGE_SHIFT;
3272 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3273 lost_grant = cli->cl_lost_grant;
3274 cli->cl_lost_grant = 0;
3275 spin_unlock(&cli->cl_loi_list_lock);
3277 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3278 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3279 data->ocd_version, data->ocd_grant, lost_grant);
3284 EXPORT_SYMBOL(osc_reconnect);
3286 int osc_disconnect(struct obd_export *exp)
3288 struct obd_device *obd = class_exp2obd(exp);
3291 rc = client_disconnect_export(exp);
3293 * Initially we put del_shrink_grant before disconnect_export, but it
3294 * causes the following problem if setup (connect) and cleanup
3295 * (disconnect) are tangled together.
3296 * connect p1 disconnect p2
3297 * ptlrpc_connect_import
3298 * ............... class_manual_cleanup
3301 * ptlrpc_connect_interrupt
3303 * add this client to shrink list
3305 * Bang! grant shrink thread trigger the shrink. BUG18662
3307 osc_del_grant_list(&obd->u.cli);
3310 EXPORT_SYMBOL(osc_disconnect);
3312 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3313 struct hlist_node *hnode, void *arg)
3315 struct lu_env *env = arg;
3316 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3317 struct ldlm_lock *lock;
3318 struct osc_object *osc = NULL;
3322 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3323 if (lock->l_ast_data != NULL && osc == NULL) {
3324 osc = lock->l_ast_data;
3325 cl_object_get(osc2cl(osc));
3328 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3329 * by the 2nd round of ldlm_namespace_clean() call in
3330 * osc_import_event(). */
3331 ldlm_clear_cleaned(lock);
3336 osc_object_invalidate(env, osc);
3337 cl_object_put(env, osc2cl(osc));
3342 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3344 static int osc_import_event(struct obd_device *obd,
3345 struct obd_import *imp,
3346 enum obd_import_event event)
3348 struct client_obd *cli;
3352 LASSERT(imp->imp_obd == obd);
3355 case IMP_EVENT_DISCON: {
3357 spin_lock(&cli->cl_loi_list_lock);
3358 cli->cl_avail_grant = 0;
3359 cli->cl_lost_grant = 0;
3360 spin_unlock(&cli->cl_loi_list_lock);
3363 case IMP_EVENT_INACTIVE: {
3364 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3367 case IMP_EVENT_INVALIDATE: {
3368 struct ldlm_namespace *ns = obd->obd_namespace;
3372 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3374 env = cl_env_get(&refcheck);
3376 osc_io_unplug(env, &obd->u.cli, NULL);
3378 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3379 osc_ldlm_resource_invalidate,
3381 cl_env_put(env, &refcheck);
3383 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3388 case IMP_EVENT_ACTIVE: {
3389 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3392 case IMP_EVENT_OCD: {
3393 struct obd_connect_data *ocd = &imp->imp_connect_data;
3395 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3396 osc_init_grant(&obd->u.cli, ocd);
3399 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3400 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3402 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3405 case IMP_EVENT_DEACTIVATE: {
3406 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3409 case IMP_EVENT_ACTIVATE: {
3410 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3414 CERROR("Unknown import event %d\n", event);
3421 * Determine whether the lock can be canceled before replaying the lock
3422 * during recovery, see bug16774 for detailed information.
3424 * \retval zero the lock can't be canceled
3425 * \retval other ok to cancel
3427 static int osc_cancel_weight(struct ldlm_lock *lock)
3430 * Cancel all unused and granted extent lock.
3432 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3433 ldlm_is_granted(lock) &&
3434 osc_ldlm_weigh_ast(lock) == 0)
3440 static int brw_queue_work(const struct lu_env *env, void *data)
3442 struct client_obd *cli = data;
3444 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3446 osc_io_unplug(env, cli, NULL);
3450 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3452 struct client_obd *cli = &obd->u.cli;
3458 rc = ptlrpcd_addref();
3462 rc = client_obd_setup(obd, lcfg);
3464 GOTO(out_ptlrpcd, rc);
3467 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3468 if (IS_ERR(handler))
3469 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3470 cli->cl_writeback_work = handler;
3472 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3473 if (IS_ERR(handler))
3474 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3475 cli->cl_lru_work = handler;
3477 rc = osc_quota_setup(obd);
3479 GOTO(out_ptlrpcd_work, rc);
3481 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3482 osc_update_next_shrink(cli);
3487 if (cli->cl_writeback_work != NULL) {
3488 ptlrpcd_destroy_work(cli->cl_writeback_work);
3489 cli->cl_writeback_work = NULL;
3491 if (cli->cl_lru_work != NULL) {
3492 ptlrpcd_destroy_work(cli->cl_lru_work);
3493 cli->cl_lru_work = NULL;
3495 client_obd_cleanup(obd);
3500 EXPORT_SYMBOL(osc_setup_common);
3502 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3504 struct client_obd *cli = &obd->u.cli;
3512 rc = osc_setup_common(obd, lcfg);
3516 rc = osc_tunables_init(obd);
3521 * We try to control the total number of requests with a upper limit
3522 * osc_reqpool_maxreqcount. There might be some race which will cause
3523 * over-limit allocation, but it is fine.
3525 req_count = atomic_read(&osc_pool_req_count);
3526 if (req_count < osc_reqpool_maxreqcount) {
3527 adding = cli->cl_max_rpcs_in_flight + 2;
3528 if (req_count + adding > osc_reqpool_maxreqcount)
3529 adding = osc_reqpool_maxreqcount - req_count;
3531 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3532 atomic_add(added, &osc_pool_req_count);
3535 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3537 spin_lock(&osc_shrink_lock);
3538 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3539 spin_unlock(&osc_shrink_lock);
3540 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3541 cli->cl_import->imp_idle_debug = D_HA;
3546 int osc_precleanup_common(struct obd_device *obd)
3548 struct client_obd *cli = &obd->u.cli;
3552 * for echo client, export may be on zombie list, wait for
3553 * zombie thread to cull it, because cli.cl_import will be
3554 * cleared in client_disconnect_export():
3555 * class_export_destroy() -> obd_cleanup() ->
3556 * echo_device_free() -> echo_client_cleanup() ->
3557 * obd_disconnect() -> osc_disconnect() ->
3558 * client_disconnect_export()
3560 obd_zombie_barrier();
3561 if (cli->cl_writeback_work) {
3562 ptlrpcd_destroy_work(cli->cl_writeback_work);
3563 cli->cl_writeback_work = NULL;
3566 if (cli->cl_lru_work) {
3567 ptlrpcd_destroy_work(cli->cl_lru_work);
3568 cli->cl_lru_work = NULL;
3571 obd_cleanup_client_import(obd);
3574 EXPORT_SYMBOL(osc_precleanup_common);
3576 static int osc_precleanup(struct obd_device *obd)
3580 osc_precleanup_common(obd);
3582 ptlrpc_lprocfs_unregister_obd(obd);
3586 int osc_cleanup_common(struct obd_device *obd)
3588 struct client_obd *cli = &obd->u.cli;
3593 spin_lock(&osc_shrink_lock);
3594 list_del(&cli->cl_shrink_list);
3595 spin_unlock(&osc_shrink_lock);
3598 if (cli->cl_cache != NULL) {
3599 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3600 spin_lock(&cli->cl_cache->ccc_lru_lock);
3601 list_del_init(&cli->cl_lru_osc);
3602 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3603 cli->cl_lru_left = NULL;
3604 cl_cache_decref(cli->cl_cache);
3605 cli->cl_cache = NULL;
3608 /* free memory of osc quota cache */
3609 osc_quota_cleanup(obd);
3611 rc = client_obd_cleanup(obd);
3616 EXPORT_SYMBOL(osc_cleanup_common);
3618 static const struct obd_ops osc_obd_ops = {
3619 .o_owner = THIS_MODULE,
3620 .o_setup = osc_setup,
3621 .o_precleanup = osc_precleanup,
3622 .o_cleanup = osc_cleanup_common,
3623 .o_add_conn = client_import_add_conn,
3624 .o_del_conn = client_import_del_conn,
3625 .o_connect = client_connect_import,
3626 .o_reconnect = osc_reconnect,
3627 .o_disconnect = osc_disconnect,
3628 .o_statfs = osc_statfs,
3629 .o_statfs_async = osc_statfs_async,
3630 .o_create = osc_create,
3631 .o_destroy = osc_destroy,
3632 .o_getattr = osc_getattr,
3633 .o_setattr = osc_setattr,
3634 .o_iocontrol = osc_iocontrol,
3635 .o_set_info_async = osc_set_info_async,
3636 .o_import_event = osc_import_event,
3637 .o_quotactl = osc_quotactl,
3640 LIST_HEAD(osc_shrink_list);
3641 DEFINE_SPINLOCK(osc_shrink_lock);
3643 #ifdef HAVE_SHRINKER_COUNT
3644 static struct shrinker osc_cache_shrinker = {
3645 .count_objects = osc_cache_shrink_count,
3646 .scan_objects = osc_cache_shrink_scan,
3647 .seeks = DEFAULT_SEEKS,
3650 static int osc_cache_shrink(struct shrinker *shrinker,
3651 struct shrink_control *sc)
3653 (void)osc_cache_shrink_scan(shrinker, sc);
3655 return osc_cache_shrink_count(shrinker, sc);
3658 static struct shrinker osc_cache_shrinker = {
3659 .shrink = osc_cache_shrink,
3660 .seeks = DEFAULT_SEEKS,
3664 static int __init osc_init(void)
3666 unsigned int reqpool_size;
3667 unsigned int reqsize;
3671 /* print an address of _any_ initialized kernel symbol from this
3672 * module, to allow debugging with gdb that doesn't support data
3673 * symbols from modules.*/
3674 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3676 rc = lu_kmem_init(osc_caches);
3680 rc = class_register_type(&osc_obd_ops, NULL, true,
3681 LUSTRE_OSC_NAME, &osc_device_type);
3685 rc = register_shrinker(&osc_cache_shrinker);
3689 /* This is obviously too much memory, only prevent overflow here */
3690 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3691 GOTO(out_shrinker, rc = -EINVAL);
3693 reqpool_size = osc_reqpool_mem_max << 20;
3696 while (reqsize < OST_IO_MAXREQSIZE)
3697 reqsize = reqsize << 1;
3700 * We don't enlarge the request count in OSC pool according to
3701 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3702 * tried after normal allocation failed. So a small OSC pool won't
3703 * cause much performance degression in most of cases.
3705 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3707 atomic_set(&osc_pool_req_count, 0);
3708 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3709 ptlrpc_add_rqs_to_pool);
3711 if (osc_rq_pool == NULL)
3712 GOTO(out_shrinker, rc = -ENOMEM);
3714 rc = osc_start_grant_work();
3716 GOTO(out_req_pool, rc);
3721 ptlrpc_free_rq_pool(osc_rq_pool);
3723 unregister_shrinker(&osc_cache_shrinker);
3725 class_unregister_type(LUSTRE_OSC_NAME);
3727 lu_kmem_fini(osc_caches);
3732 static void __exit osc_exit(void)
3734 osc_stop_grant_work();
3735 unregister_shrinker(&osc_cache_shrinker);
3736 class_unregister_type(LUSTRE_OSC_NAME);
3737 lu_kmem_fini(osc_caches);
3738 ptlrpc_free_rq_pool(osc_rq_pool);
3741 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3742 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3743 MODULE_VERSION(LUSTRE_VERSION_STRING);
3744 MODULE_LICENSE("GPL");
3746 module_init(osc_init);
3747 module_exit(osc_exit);