4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
32 #define DEBUG_SUBSYSTEM S_OSC
34 #include <linux/workqueue.h>
35 #include <libcfs/libcfs.h>
36 #include <linux/falloc.h>
37 #include <lprocfs_status.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_ioctl_old.h>
43 #include <lustre_net.h>
44 #include <lustre_obdo.h>
45 #include <lustre_osc.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
50 #include "osc_internal.h"
51 #include <lnet/lnet_rdma.h>
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
61 static int osc_idle_timeout = 20;
62 module_param(osc_idle_timeout, uint, 0644);
64 #define osc_grant_args osc_brw_async_args
66 struct osc_setattr_args {
68 obd_enqueue_update_f sa_upcall;
72 struct osc_fsync_args {
73 struct osc_object *fa_obj;
75 obd_enqueue_update_f fa_upcall;
79 struct osc_ladvise_args {
81 obd_enqueue_update_f la_upcall;
85 static void osc_release_ppga(struct brw_page **ppga, size_t count);
86 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
89 static void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
91 struct ost_body *body;
93 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
96 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
99 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
102 struct ptlrpc_request *req;
103 struct ost_body *body;
107 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
111 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
113 ptlrpc_request_free(req);
117 osc_pack_req_body(req, oa);
119 ptlrpc_request_set_replen(req);
121 rc = ptlrpc_queue_wait(req);
125 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
127 GOTO(out, rc = -EPROTO);
129 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
130 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
132 oa->o_blksize = cli_brw_size(exp->exp_obd);
133 oa->o_valid |= OBD_MD_FLBLKSZ;
137 ptlrpc_req_finished(req);
142 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
145 struct ptlrpc_request *req;
146 struct ost_body *body;
150 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
152 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
156 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
158 ptlrpc_request_free(req);
162 osc_pack_req_body(req, oa);
164 ptlrpc_request_set_replen(req);
166 rc = ptlrpc_queue_wait(req);
170 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
172 GOTO(out, rc = -EPROTO);
174 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
178 ptlrpc_req_finished(req);
183 static int osc_setattr_interpret(const struct lu_env *env,
184 struct ptlrpc_request *req, void *args, int rc)
186 struct osc_setattr_args *sa = args;
187 struct ost_body *body;
194 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196 GOTO(out, rc = -EPROTO);
198 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
201 rc = sa->sa_upcall(sa->sa_cookie, rc);
205 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
206 obd_enqueue_update_f upcall, void *cookie,
207 struct ptlrpc_request_set *rqset)
209 struct ptlrpc_request *req;
210 struct osc_setattr_args *sa;
215 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
219 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
221 ptlrpc_request_free(req);
225 osc_pack_req_body(req, oa);
227 ptlrpc_request_set_replen(req);
229 /* do mds to ost setattr asynchronously */
231 /* Do not wait for response. */
232 ptlrpcd_add_req(req);
234 req->rq_interpret_reply = osc_setattr_interpret;
236 sa = ptlrpc_req_async_args(sa, req);
238 sa->sa_upcall = upcall;
239 sa->sa_cookie = cookie;
241 ptlrpc_set_add_req(rqset, req);
247 static int osc_ladvise_interpret(const struct lu_env *env,
248 struct ptlrpc_request *req,
251 struct osc_ladvise_args *la = arg;
252 struct ost_body *body;
258 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260 GOTO(out, rc = -EPROTO);
262 *la->la_oa = body->oa;
264 rc = la->la_upcall(la->la_cookie, rc);
269 * If rqset is NULL, do not wait for response. Upcall and cookie could also
270 * be NULL in this case
272 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
273 struct ladvise_hdr *ladvise_hdr,
274 obd_enqueue_update_f upcall, void *cookie,
275 struct ptlrpc_request_set *rqset)
277 struct ptlrpc_request *req;
278 struct ost_body *body;
279 struct osc_ladvise_args *la;
281 struct lu_ladvise *req_ladvise;
282 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
283 int num_advise = ladvise_hdr->lah_count;
284 struct ladvise_hdr *req_ladvise_hdr;
287 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
291 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
292 num_advise * sizeof(*ladvise));
293 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295 ptlrpc_request_free(req);
298 req->rq_request_portal = OST_IO_PORTAL;
299 ptlrpc_at_set_req_timeout(req);
301 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
306 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
307 &RMF_OST_LADVISE_HDR);
308 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
311 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
312 ptlrpc_request_set_replen(req);
315 /* Do not wait for response. */
316 ptlrpcd_add_req(req);
320 req->rq_interpret_reply = osc_ladvise_interpret;
321 la = ptlrpc_req_async_args(la, req);
323 la->la_upcall = upcall;
324 la->la_cookie = cookie;
326 ptlrpc_set_add_req(rqset, req);
331 static int osc_create(const struct lu_env *env, struct obd_export *exp,
334 struct ptlrpc_request *req;
335 struct ost_body *body;
340 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
341 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
343 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
345 GOTO(out, rc = -ENOMEM);
347 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
349 ptlrpc_request_free(req);
353 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
356 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
358 ptlrpc_request_set_replen(req);
360 rc = ptlrpc_queue_wait(req);
364 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366 GOTO(out_req, rc = -EPROTO);
368 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
369 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
371 oa->o_blksize = cli_brw_size(exp->exp_obd);
372 oa->o_valid |= OBD_MD_FLBLKSZ;
374 CDEBUG(D_HA, "transno: %lld\n",
375 lustre_msg_get_transno(req->rq_repmsg));
377 ptlrpc_req_finished(req);
382 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
383 obd_enqueue_update_f upcall, void *cookie)
385 struct ptlrpc_request *req;
386 struct osc_setattr_args *sa;
387 struct obd_import *imp = class_exp2cliimp(exp);
388 struct ost_body *body;
393 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
397 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
399 ptlrpc_request_free(req);
403 osc_set_io_portal(req);
405 ptlrpc_at_set_req_timeout(req);
407 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
409 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
411 ptlrpc_request_set_replen(req);
413 req->rq_interpret_reply = osc_setattr_interpret;
414 sa = ptlrpc_req_async_args(sa, req);
416 sa->sa_upcall = upcall;
417 sa->sa_cookie = cookie;
419 ptlrpcd_add_req(req);
423 EXPORT_SYMBOL(osc_punch_send);
426 * osc_fallocate_base() - Handles fallocate request.
428 * @exp: Export structure
429 * @oa: Attributes passed to OSS from client (obdo structure)
430 * @upcall: Primary & supplementary group information
431 * @cookie: Exclusive identifier
432 * @rqset: Request list.
433 * @mode: Operation done on given range.
435 * osc_fallocate_base() - Handles fallocate requests only. Only block
436 * allocation or standard preallocate operation is supported currently.
437 * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
438 * is supported via SETATTR request.
440 * Return: Non-zero on failure and O on success.
442 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
443 obd_enqueue_update_f upcall, void *cookie, int mode)
445 struct ptlrpc_request *req;
446 struct osc_setattr_args *sa;
447 struct ost_body *body;
448 struct obd_import *imp = class_exp2cliimp(exp);
452 oa->o_falloc_mode = mode;
453 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
458 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
460 ptlrpc_request_free(req);
464 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
467 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
469 ptlrpc_request_set_replen(req);
471 req->rq_interpret_reply = osc_setattr_interpret;
472 BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
473 sa = ptlrpc_req_async_args(sa, req);
475 sa->sa_upcall = upcall;
476 sa->sa_cookie = cookie;
478 ptlrpcd_add_req(req);
482 EXPORT_SYMBOL(osc_fallocate_base);
484 static int osc_sync_interpret(const struct lu_env *env,
485 struct ptlrpc_request *req, void *args, int rc)
487 struct osc_fsync_args *fa = args;
488 struct ost_body *body;
489 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
490 unsigned long valid = 0;
491 struct cl_object *obj;
497 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
499 CERROR("can't unpack ost_body\n");
500 GOTO(out, rc = -EPROTO);
503 *fa->fa_oa = body->oa;
504 obj = osc2cl(fa->fa_obj);
506 /* Update osc object's blocks attribute */
507 cl_object_attr_lock(obj);
508 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
509 attr->cat_blocks = body->oa.o_blocks;
514 cl_object_attr_update(env, obj, attr, valid);
515 cl_object_attr_unlock(obj);
518 rc = fa->fa_upcall(fa->fa_cookie, rc);
522 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
523 obd_enqueue_update_f upcall, void *cookie,
524 struct ptlrpc_request_set *rqset)
526 struct obd_export *exp = osc_export(obj);
527 struct ptlrpc_request *req;
528 struct ost_body *body;
529 struct osc_fsync_args *fa;
533 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
537 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
539 ptlrpc_request_free(req);
543 /* overload the size and blocks fields in the oa with start/end */
544 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
548 ptlrpc_request_set_replen(req);
549 req->rq_interpret_reply = osc_sync_interpret;
551 fa = ptlrpc_req_async_args(fa, req);
554 fa->fa_upcall = upcall;
555 fa->fa_cookie = cookie;
557 ptlrpc_set_add_req(rqset, req);
562 /* Find and cancel locally locks matched by @mode in the resource found by
563 * @objid. Found locks are added into @cancel list. Returns the amount of
564 * locks added to @cancels list. */
565 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
566 struct list_head *cancels,
567 enum ldlm_mode mode, __u64 lock_flags)
569 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
570 struct ldlm_res_id res_id;
571 struct ldlm_resource *res;
575 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
576 * export) but disabled through procfs (flag in NS).
578 * This distinguishes from a case when ELC is not supported originally,
579 * when we still want to cancel locks in advance and just cancel them
580 * locally, without sending any RPC. */
581 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
584 ostid_build_res_name(&oa->o_oi, &res_id);
585 res = ldlm_resource_get(ns, &res_id, 0, 0);
589 LDLM_RESOURCE_ADDREF(res);
590 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
591 lock_flags, 0, NULL);
592 LDLM_RESOURCE_DELREF(res);
593 ldlm_resource_putref(res);
597 static int osc_destroy_interpret(const struct lu_env *env,
598 struct ptlrpc_request *req, void *args, int rc)
600 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
602 atomic_dec(&cli->cl_destroy_in_flight);
603 wake_up(&cli->cl_destroy_waitq);
608 static int osc_can_send_destroy(struct client_obd *cli)
610 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
611 cli->cl_max_rpcs_in_flight) {
612 /* The destroy request can be sent */
615 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
616 cli->cl_max_rpcs_in_flight) {
618 * The counter has been modified between the two atomic
621 wake_up(&cli->cl_destroy_waitq);
626 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
629 struct client_obd *cli = &exp->exp_obd->u.cli;
630 struct ptlrpc_request *req;
631 struct ost_body *body;
637 CDEBUG(D_INFO, "oa NULL\n");
641 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
642 LDLM_FL_DISCARD_DATA);
644 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
646 ldlm_lock_list_put(&cancels, l_bl_ast, count);
650 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
653 ptlrpc_request_free(req);
657 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
658 ptlrpc_at_set_req_timeout(req);
660 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
662 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
664 ptlrpc_request_set_replen(req);
666 req->rq_interpret_reply = osc_destroy_interpret;
667 if (!osc_can_send_destroy(cli)) {
669 * Wait until the number of on-going destroy RPCs drops
670 * under max_rpc_in_flight
672 rc = l_wait_event_abortable_exclusive(
673 cli->cl_destroy_waitq,
674 osc_can_send_destroy(cli));
676 ptlrpc_req_finished(req);
681 /* Do not wait for response */
682 ptlrpcd_add_req(req);
686 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
689 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
691 LASSERT(!(oa->o_valid & bits));
694 spin_lock(&cli->cl_loi_list_lock);
695 if (cli->cl_ocd_grant_param)
696 oa->o_dirty = cli->cl_dirty_grant;
698 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
699 if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
700 CERROR("dirty %lu > dirty_max %lu\n",
702 cli->cl_dirty_max_pages);
704 } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
705 (long)(obd_max_dirty_pages + 1))) {
706 /* The atomic_read() allowing the atomic_inc() are
707 * not covered by a lock thus they may safely race and trip
708 * this CERROR() unless we add in a small fudge factor (+1). */
709 CERROR("%s: dirty %ld > system dirty_max %ld\n",
710 cli_name(cli), atomic_long_read(&obd_dirty_pages),
711 obd_max_dirty_pages);
713 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
715 CERROR("dirty %lu - dirty_max %lu too big???\n",
716 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
719 unsigned long nrpages;
720 unsigned long undirty;
722 nrpages = cli->cl_max_pages_per_rpc;
723 nrpages *= cli->cl_max_rpcs_in_flight + 1;
724 nrpages = max(nrpages, cli->cl_dirty_max_pages);
725 undirty = nrpages << PAGE_SHIFT;
726 if (cli->cl_ocd_grant_param) {
729 /* take extent tax into account when asking for more
731 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
732 cli->cl_max_extent_pages;
733 undirty += nrextents * cli->cl_grant_extent_tax;
735 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
736 * to add extent tax, etc.
738 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
739 ~(PTLRPC_MAX_BRW_SIZE * 4UL));
741 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
742 /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
743 if (cli->cl_lost_grant > INT_MAX) {
745 "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
746 cli_name(cli), cli->cl_lost_grant);
747 oa->o_dropped = INT_MAX;
749 oa->o_dropped = cli->cl_lost_grant;
751 cli->cl_lost_grant -= oa->o_dropped;
752 spin_unlock(&cli->cl_loi_list_lock);
753 CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
754 " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
755 oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
758 void osc_update_next_shrink(struct client_obd *cli)
760 cli->cl_next_shrink_grant = ktime_get_seconds() +
761 cli->cl_grant_shrink_interval;
763 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
764 cli->cl_next_shrink_grant);
766 EXPORT_SYMBOL(osc_update_next_shrink);
768 static void __osc_update_grant(struct client_obd *cli, u64 grant)
770 spin_lock(&cli->cl_loi_list_lock);
771 cli->cl_avail_grant += grant;
772 spin_unlock(&cli->cl_loi_list_lock);
775 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
777 if (body->oa.o_valid & OBD_MD_FLGRANT) {
778 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
779 __osc_update_grant(cli, body->oa.o_grant);
784 * grant thread data for shrinking space.
786 struct grant_thread_data {
787 struct list_head gtd_clients;
788 struct mutex gtd_mutex;
789 unsigned long gtd_stopped:1;
791 static struct grant_thread_data client_gtd;
793 static int osc_shrink_grant_interpret(const struct lu_env *env,
794 struct ptlrpc_request *req,
797 struct osc_grant_args *aa = args;
798 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
799 struct ost_body *body;
802 __osc_update_grant(cli, aa->aa_oa->o_grant);
806 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
808 osc_update_grant(cli, body);
810 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
816 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
818 spin_lock(&cli->cl_loi_list_lock);
819 oa->o_grant = cli->cl_avail_grant / 4;
820 cli->cl_avail_grant -= oa->o_grant;
821 spin_unlock(&cli->cl_loi_list_lock);
822 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
823 oa->o_valid |= OBD_MD_FLFLAGS;
826 oa->o_flags |= OBD_FL_SHRINK_GRANT;
827 osc_update_next_shrink(cli);
830 /* Shrink the current grant, either from some large amount to enough for a
831 * full set of in-flight RPCs, or if we have already shrunk to that limit
832 * then to enough for a single RPC. This avoids keeping more grant than
833 * needed, and avoids shrinking the grant piecemeal. */
834 static int osc_shrink_grant(struct client_obd *cli)
836 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
837 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
839 spin_lock(&cli->cl_loi_list_lock);
840 if (cli->cl_avail_grant <= target_bytes)
841 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
842 spin_unlock(&cli->cl_loi_list_lock);
844 return osc_shrink_grant_to_target(cli, target_bytes);
847 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
850 struct ost_body *body;
853 spin_lock(&cli->cl_loi_list_lock);
854 /* Don't shrink if we are already above or below the desired limit
855 * We don't want to shrink below a single RPC, as that will negatively
856 * impact block allocation and long-term performance. */
857 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
858 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
860 if (target_bytes >= cli->cl_avail_grant) {
861 spin_unlock(&cli->cl_loi_list_lock);
864 spin_unlock(&cli->cl_loi_list_lock);
870 osc_announce_cached(cli, &body->oa, 0);
872 spin_lock(&cli->cl_loi_list_lock);
873 if (target_bytes >= cli->cl_avail_grant) {
874 /* available grant has changed since target calculation */
875 spin_unlock(&cli->cl_loi_list_lock);
876 GOTO(out_free, rc = 0);
878 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
879 cli->cl_avail_grant = target_bytes;
880 spin_unlock(&cli->cl_loi_list_lock);
881 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
882 body->oa.o_valid |= OBD_MD_FLFLAGS;
883 body->oa.o_flags = 0;
885 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
886 osc_update_next_shrink(cli);
888 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
889 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
890 sizeof(*body), body, NULL);
892 __osc_update_grant(cli, body->oa.o_grant);
898 static int osc_should_shrink_grant(struct client_obd *client)
900 time64_t next_shrink = client->cl_next_shrink_grant;
902 if (client->cl_import == NULL)
905 if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
906 client->cl_import->imp_grant_shrink_disabled) {
907 osc_update_next_shrink(client);
911 if (ktime_get_seconds() >= next_shrink - 5) {
912 /* Get the current RPC size directly, instead of going via:
913 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
914 * Keep comment here so that it can be found by searching. */
915 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
917 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
918 client->cl_avail_grant > brw_size)
921 osc_update_next_shrink(client);
926 #define GRANT_SHRINK_RPC_BATCH 100
928 static struct delayed_work work;
930 static void osc_grant_work_handler(struct work_struct *data)
932 struct client_obd *cli;
934 bool init_next_shrink = true;
935 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
938 mutex_lock(&client_gtd.gtd_mutex);
939 list_for_each_entry(cli, &client_gtd.gtd_clients,
941 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
942 osc_should_shrink_grant(cli)) {
943 osc_shrink_grant(cli);
947 if (!init_next_shrink) {
948 if (cli->cl_next_shrink_grant < next_shrink &&
949 cli->cl_next_shrink_grant > ktime_get_seconds())
950 next_shrink = cli->cl_next_shrink_grant;
952 init_next_shrink = false;
953 next_shrink = cli->cl_next_shrink_grant;
956 mutex_unlock(&client_gtd.gtd_mutex);
958 if (client_gtd.gtd_stopped == 1)
961 if (next_shrink > ktime_get_seconds()) {
962 time64_t delay = next_shrink - ktime_get_seconds();
964 schedule_delayed_work(&work, cfs_time_seconds(delay));
966 schedule_work(&work.work);
970 void osc_schedule_grant_work(void)
972 cancel_delayed_work_sync(&work);
973 schedule_work(&work.work);
975 EXPORT_SYMBOL(osc_schedule_grant_work);
978 * Start grant thread for returing grant to server for idle clients.
980 static int osc_start_grant_work(void)
982 client_gtd.gtd_stopped = 0;
983 mutex_init(&client_gtd.gtd_mutex);
984 INIT_LIST_HEAD(&client_gtd.gtd_clients);
986 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
987 schedule_work(&work.work);
992 static void osc_stop_grant_work(void)
994 client_gtd.gtd_stopped = 1;
995 cancel_delayed_work_sync(&work);
998 static void osc_add_grant_list(struct client_obd *client)
1000 mutex_lock(&client_gtd.gtd_mutex);
1001 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1002 mutex_unlock(&client_gtd.gtd_mutex);
1005 static void osc_del_grant_list(struct client_obd *client)
1007 if (list_empty(&client->cl_grant_chain))
1010 mutex_lock(&client_gtd.gtd_mutex);
1011 list_del_init(&client->cl_grant_chain);
1012 mutex_unlock(&client_gtd.gtd_mutex);
1015 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1018 * ocd_grant is the total grant amount we're expect to hold: if we've
1019 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1020 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1023 * race is tolerable here: if we're evicted, but imp_state already
1024 * left EVICTED state, then cl_dirty_pages must be 0 already.
1026 spin_lock(&cli->cl_loi_list_lock);
1027 cli->cl_avail_grant = ocd->ocd_grant;
1028 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1029 unsigned long consumed = cli->cl_reserved_grant;
1031 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1032 consumed += cli->cl_dirty_grant;
1034 consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1035 if (cli->cl_avail_grant < consumed) {
1036 CERROR("%s: granted %ld but already consumed %ld\n",
1037 cli_name(cli), cli->cl_avail_grant, consumed);
1038 cli->cl_avail_grant = 0;
1040 cli->cl_avail_grant -= consumed;
1044 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1048 /* overhead for each extent insertion */
1049 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1050 /* determine the appropriate chunk size used by osc_extent. */
1051 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1052 ocd->ocd_grant_blkbits);
1053 /* max_pages_per_rpc must be chunk aligned */
1054 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1055 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1056 ~chunk_mask) & chunk_mask;
1057 /* determine maximum extent size, in #pages */
1058 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1059 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1060 cli->cl_ocd_grant_param = 1;
1062 cli->cl_ocd_grant_param = 0;
1063 cli->cl_grant_extent_tax = 0;
1064 cli->cl_chunkbits = PAGE_SHIFT;
1065 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1067 spin_unlock(&cli->cl_loi_list_lock);
1070 "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1072 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1073 cli->cl_max_extent_pages);
1075 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1076 osc_add_grant_list(cli);
1078 EXPORT_SYMBOL(osc_init_grant);
1080 /* We assume that the reason this OSC got a short read is because it read
1081 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1082 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1083 * this stripe never got written at or beyond this stripe offset yet. */
1084 static void handle_short_read(int nob_read, size_t page_count,
1085 struct brw_page **pga)
1090 /* skip bytes read OK */
1091 while (nob_read > 0) {
1092 LASSERT (page_count > 0);
1094 if (pga[i]->bp_count > nob_read) {
1095 /* EOF inside this page */
1096 ptr = kmap(pga[i]->bp_page) +
1097 (pga[i]->bp_off & ~PAGE_MASK);
1098 memset(ptr + nob_read, 0, pga[i]->bp_count - nob_read);
1099 kunmap(pga[i]->bp_page);
1105 nob_read -= pga[i]->bp_count;
1110 /* zero remaining pages */
1111 while (page_count-- > 0) {
1112 ptr = kmap(pga[i]->bp_page) + (pga[i]->bp_off & ~PAGE_MASK);
1113 memset(ptr, 0, pga[i]->bp_count);
1114 kunmap(pga[i]->bp_page);
1119 static int check_write_rcs(struct ptlrpc_request *req,
1120 int requested_nob, int niocount,
1121 size_t page_count, struct brw_page **pga)
1126 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1127 sizeof(*remote_rcs) *
1129 if (remote_rcs == NULL) {
1130 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1134 /* return error if any niobuf was in error */
1135 for (i = 0; i < niocount; i++) {
1136 if ((int)remote_rcs[i] < 0) {
1137 CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1138 i, remote_rcs[i], req);
1139 return remote_rcs[i];
1142 if (remote_rcs[i] != 0) {
1143 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1144 i, remote_rcs[i], req);
1148 if (req->rq_bulk != NULL &&
1149 req->rq_bulk->bd_nob_transferred != requested_nob) {
1150 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1151 req->rq_bulk->bd_nob_transferred, requested_nob);
1158 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1160 if (p1->bp_flag != p2->bp_flag) {
1161 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1162 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1163 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC |
1164 OBD_BRW_SYS_RESOURCE);
1166 /* warn if we try to combine flags that we don't know to be
1167 * safe to combine */
1168 if (unlikely((p1->bp_flag & mask) != (p2->bp_flag & mask))) {
1169 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1170 "report this at https://jira.whamcloud.com/\n",
1171 p1->bp_flag, p2->bp_flag);
1176 return (p1->bp_off + p1->bp_count == p2->bp_off);
1179 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1180 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1181 size_t pg_count, struct brw_page **pga,
1182 int opc, obd_dif_csum_fn *fn,
1184 u32 *check_sum, bool resend)
1186 struct ahash_request *req;
1187 /* Used Adler as the default checksum type on top of DIF tags */
1188 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1189 struct page *__page;
1190 unsigned char *buffer;
1191 __be16 *guard_start;
1193 int used_number = 0;
1196 unsigned int bufsize = sizeof(cksum);
1200 LASSERT(pg_count > 0);
1202 __page = alloc_page(GFP_KERNEL);
1206 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1209 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1210 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1214 buffer = kmap(__page);
1215 guard_start = (__be16 *)buffer;
1216 guard_number = PAGE_SIZE / sizeof(*guard_start);
1217 CDEBUG(D_PAGE | (resend ? D_HA : 0),
1218 "GRD tags per page=%u, resend=%u, bytes=%u, pages=%zu\n",
1219 guard_number, resend, nob, pg_count);
1221 while (nob > 0 && pg_count > 0) {
1222 int off = pga[i]->bp_off & ~PAGE_MASK;
1223 unsigned int count =
1224 pga[i]->bp_count > nob ? nob : pga[i]->bp_count;
1225 int guards_needed = DIV_ROUND_UP(off + count, sector_size) -
1226 (off / sector_size);
1228 if (guards_needed > guard_number - used_number) {
1229 cfs_crypto_hash_update_page(req, __page, 0,
1230 used_number * sizeof(*guard_start));
1234 /* corrupt the data before we compute the checksum, to
1235 * simulate an OST->client data error */
1236 if (unlikely(i == 0 && opc == OST_READ &&
1237 CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1238 unsigned char *ptr = kmap(pga[i]->bp_page);
1240 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1241 kunmap(pga[i]->bp_page);
1245 * The left guard number should be able to hold checksums of a
1248 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->bp_page,
1249 pga[i]->bp_off & ~PAGE_MASK,
1251 guard_start + used_number,
1252 guard_number - used_number,
1255 if (unlikely(resend))
1256 CDEBUG(D_PAGE | D_HA,
1257 "pga[%u]: used %u off %llu+%u gen checksum: %*phN\n",
1258 i, used, pga[i]->bp_off & ~PAGE_MASK, count,
1259 (int)(used * sizeof(*guard_start)),
1260 guard_start + used_number);
1264 used_number += used;
1265 nob -= pga[i]->bp_count;
1273 if (used_number != 0)
1274 cfs_crypto_hash_update_page(req, __page, 0,
1275 used_number * sizeof(*guard_start));
1278 rc2 = cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1282 /* For sending we only compute the wrong checksum instead
1283 * of corrupting the data so it is still correct on a redo */
1284 if (opc == OST_WRITE &&
1285 CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1291 __free_page(__page);
1294 #else /* !CONFIG_CRC_T10DIF */
1295 #define obd_dif_ip_fn NULL
1296 #define obd_dif_crc_fn NULL
1297 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum, re) \
1299 #endif /* CONFIG_CRC_T10DIF */
1301 static int osc_checksum_bulk(int nob, size_t pg_count,
1302 struct brw_page **pga, int opc,
1303 enum cksum_types cksum_type,
1307 struct ahash_request *req;
1308 unsigned int bufsize;
1309 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1311 LASSERT(pg_count > 0);
1313 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1315 CERROR("Unable to initialize checksum hash %s\n",
1316 cfs_crypto_hash_name(cfs_alg));
1317 return PTR_ERR(req);
1320 while (nob > 0 && pg_count > 0) {
1321 unsigned int count =
1322 pga[i]->bp_count > nob ? nob : pga[i]->bp_count;
1324 /* corrupt the data before we compute the checksum, to
1325 * simulate an OST->client data error */
1326 if (i == 0 && opc == OST_READ &&
1327 CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1328 unsigned char *ptr = kmap(pga[i]->bp_page);
1329 int off = pga[i]->bp_off & ~PAGE_MASK;
1331 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1332 kunmap(pga[i]->bp_page);
1334 cfs_crypto_hash_update_page(req, pga[i]->bp_page,
1335 pga[i]->bp_off & ~PAGE_MASK,
1337 LL_CDEBUG_PAGE(D_PAGE, pga[i]->bp_page, "off %d\n",
1338 (int)(pga[i]->bp_off & ~PAGE_MASK));
1340 nob -= pga[i]->bp_count;
1345 bufsize = sizeof(*cksum);
1346 cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1348 /* For sending we only compute the wrong checksum instead
1349 * of corrupting the data so it is still correct on a redo */
1350 if (opc == OST_WRITE && CFS_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1356 static int osc_checksum_bulk_rw(const char *obd_name,
1357 enum cksum_types cksum_type,
1358 int nob, size_t pg_count,
1359 struct brw_page **pga, int opc,
1360 u32 *check_sum, bool resend)
1362 obd_dif_csum_fn *fn = NULL;
1363 int sector_size = 0;
1367 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1370 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1371 opc, fn, sector_size, check_sum,
1374 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1380 #ifdef CONFIG_LL_ENCRYPTION
1382 * osc_encrypt_pagecache_blocks() - overlay to llcrypt_encrypt_pagecache_blocks
1383 * @srcpage: The locked pagecache page containing the block(s) to encrypt
1384 * @dstpage: The page to put encryption result
1385 * @len: Total size of the block(s) to encrypt. Must be a nonzero
1386 * multiple of the filesystem's block size.
1387 * @offs: Byte offset within @page of the first block to encrypt. Must be
1388 * a multiple of the filesystem's block size.
1389 * @gfp_flags: Memory allocation flags
1391 * This overlay function is necessary to be able to provide our own bounce page.
1393 static struct page *osc_encrypt_pagecache_blocks(struct page *srcpage,
1394 struct page *dstpage,
1400 const struct inode *inode = srcpage->mapping->host;
1401 const unsigned int blockbits = inode->i_blkbits;
1402 const unsigned int blocksize = 1 << blockbits;
1403 u64 lblk_num = ((u64)srcpage->index << (PAGE_SHIFT - blockbits)) +
1404 (offs >> blockbits);
1408 if (unlikely(!dstpage))
1409 return llcrypt_encrypt_pagecache_blocks(srcpage, len, offs,
1412 if (WARN_ON_ONCE(!PageLocked(srcpage)))
1413 return ERR_PTR(-EINVAL);
1415 if (WARN_ON_ONCE(len <= 0 || !IS_ALIGNED(len | offs, blocksize)))
1416 return ERR_PTR(-EINVAL);
1418 /* Set PagePrivate2 for disambiguation in
1419 * osc_finalize_bounce_page().
1420 * It means cipher page was not allocated by llcrypt.
1422 SetPagePrivate2(dstpage);
1424 for (i = offs; i < offs + len; i += blocksize, lblk_num++) {
1425 err = llcrypt_encrypt_block(inode, srcpage, dstpage, blocksize,
1426 i, lblk_num, gfp_flags);
1428 return ERR_PTR(err);
1430 SetPagePrivate(dstpage);
1431 set_page_private(dstpage, (unsigned long)srcpage);
1436 * osc_finalize_bounce_page() - overlay to llcrypt_finalize_bounce_page
1438 * This overlay function is necessary to handle bounce pages
1439 * allocated by ourselves.
1441 static inline void osc_finalize_bounce_page(struct page **pagep)
1443 struct page *page = *pagep;
1445 ClearPageChecked(page);
1446 /* PagePrivate2 was set in osc_encrypt_pagecache_blocks
1447 * to indicate the cipher page was allocated by ourselves.
1448 * So we must not free it via llcrypt.
1450 if (unlikely(!page || !PagePrivate2(page)))
1451 return llcrypt_finalize_bounce_page(pagep);
1453 if (llcrypt_is_bounce_page(page)) {
1454 *pagep = llcrypt_pagecache_page(page);
1455 ClearPagePrivate2(page);
1456 set_page_private(page, (unsigned long)NULL);
1457 ClearPagePrivate(page);
1460 #else /* !CONFIG_LL_ENCRYPTION */
1461 #define osc_encrypt_pagecache_blocks(srcpage, dstpage, len, offs, gfp_flags) \
1462 llcrypt_encrypt_pagecache_blocks(srcpage, len, offs, gfp_flags)
1463 #define osc_finalize_bounce_page(page) llcrypt_finalize_bounce_page(page)
1466 static inline void osc_release_bounce_pages(struct brw_page **pga,
1469 #ifdef HAVE_LUSTRE_CRYPTO
1470 struct page **pa = NULL;
1476 #ifdef CONFIG_LL_ENCRYPTION
1477 if (PageChecked(pga[0]->bp_page)) {
1478 OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
1484 for (i = 0; i < page_count; i++) {
1485 /* Bounce pages used by osc_encrypt_pagecache_blocks()
1486 * called from osc_brw_prep_request()
1487 * are identified thanks to the PageChecked flag.
1489 if (PageChecked(pga[i]->bp_page)) {
1491 pa[j++] = pga[i]->bp_page;
1492 osc_finalize_bounce_page(&pga[i]->bp_page);
1494 pga[i]->bp_count -= pga[i]->bp_count_diff;
1495 pga[i]->bp_off += pga[i]->bp_off_diff;
1499 obd_pool_put_pages_array(pa, j);
1500 OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
1506 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1507 u32 page_count, struct brw_page **pga,
1508 struct ptlrpc_request **reqp, int resend)
1510 struct ptlrpc_request *req;
1511 struct ptlrpc_bulk_desc *desc;
1512 struct ost_body *body;
1513 struct obd_ioobj *ioobj;
1514 struct niobuf_remote *niobuf;
1515 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1516 struct osc_brw_async_args *aa;
1517 struct req_capsule *pill;
1518 struct brw_page *pg_prev;
1520 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1521 struct inode *inode = NULL;
1522 bool directio = false;
1524 bool enable_checksum = true;
1525 struct cl_page *clpage;
1528 if (pga[0]->bp_page) {
1529 clpage = oap2cl_page(brw_page2oap(pga[0]));
1530 inode = clpage->cp_inode;
1531 if (clpage->cp_type == CPT_TRANSIENT)
1534 if (CFS_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1535 RETURN(-ENOMEM); /* Recoverable */
1536 if (CFS_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1537 RETURN(-EINVAL); /* Fatal */
1539 if ((cmd & OBD_BRW_WRITE) != 0) {
1541 req = ptlrpc_request_alloc_pool(cli->cl_import,
1543 &RQF_OST_BRW_WRITE);
1546 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1551 if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode) &&
1552 llcrypt_has_encryption_key(inode)) {
1553 struct page **pa = NULL;
1555 #ifdef CONFIG_LL_ENCRYPTION
1556 OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
1558 ptlrpc_request_free(req);
1562 rc = obd_pool_get_pages_array(pa, page_count);
1564 CDEBUG(D_SEC, "failed to allocate from enc pool: %d\n",
1566 ptlrpc_request_free(req);
1571 for (i = 0; i < page_count; i++) {
1572 struct brw_page *brwpg = pga[i];
1573 struct page *data_page = NULL;
1574 bool retried = false;
1575 bool lockedbymyself;
1577 (brwpg->bp_off & ~PAGE_MASK) + brwpg->bp_count;
1578 struct address_space *map_orig = NULL;
1582 nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
1583 /* The page can already be locked when we arrive here.
1584 * This is possible when cl_page_assume/vvp_page_assume
1585 * is stuck on wait_on_page_writeback with page lock
1586 * held. In this case there is no risk for the lock to
1587 * be released while we are doing our encryption
1588 * processing, because writeback against that page will
1589 * end in vvp_page_completion_write/cl_page_completion,
1590 * which means only once the page is fully processed.
1592 lockedbymyself = trylock_page(brwpg->bp_page);
1594 map_orig = brwpg->bp_page->mapping;
1595 brwpg->bp_page->mapping = inode->i_mapping;
1596 index_orig = brwpg->bp_page->index;
1597 clpage = oap2cl_page(brw_page2oap(brwpg));
1598 brwpg->bp_page->index = clpage->cp_page_index;
1601 osc_encrypt_pagecache_blocks(brwpg->bp_page,
1606 brwpg->bp_page->mapping = map_orig;
1607 brwpg->bp_page->index = index_orig;
1610 unlock_page(brwpg->bp_page);
1611 if (IS_ERR(data_page)) {
1612 rc = PTR_ERR(data_page);
1613 if (rc == -ENOMEM && !retried) {
1619 obd_pool_put_pages_array(pa + i,
1621 OBD_FREE_PTR_ARRAY_LARGE(pa,
1624 ptlrpc_request_free(req);
1627 /* Set PageChecked flag on bounce page for
1628 * disambiguation in osc_release_bounce_pages().
1630 SetPageChecked(data_page);
1631 brwpg->bp_page = data_page;
1632 /* there should be no gap in the middle of page array */
1633 if (i == page_count - 1) {
1634 struct osc_async_page *oap =
1635 brw_page2oap(brwpg);
1637 oa->o_size = oap->oap_count +
1638 oap->oap_obj_off + oap->oap_page_off;
1640 /* len is forced to nunits, and relative offset to 0
1641 * so store the old, clear text info
1643 brwpg->bp_count_diff = nunits - brwpg->bp_count;
1644 brwpg->bp_count = nunits;
1645 brwpg->bp_off_diff = brwpg->bp_off & ~PAGE_MASK;
1646 brwpg->bp_off = brwpg->bp_off & PAGE_MASK;
1650 OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
1651 } else if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1652 struct osc_async_page *oap = brw_page2oap(pga[0]);
1653 struct cl_page *clpage = oap2cl_page(oap);
1654 struct cl_object *clobj = clpage->cp_obj;
1655 struct cl_attr attr = { 0 };
1659 env = cl_env_get(&refcheck);
1662 ptlrpc_request_free(req);
1666 cl_object_attr_lock(clobj);
1667 rc = cl_object_attr_get(env, clobj, &attr);
1668 cl_object_attr_unlock(clobj);
1669 cl_env_put(env, &refcheck);
1671 ptlrpc_request_free(req);
1675 oa->o_size = attr.cat_size;
1676 } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode) &&
1677 llcrypt_has_encryption_key(inode)) {
1678 for (i = 0; i < page_count; i++) {
1679 struct brw_page *pg = pga[i];
1680 u32 nunits = (pg->bp_off & ~PAGE_MASK) + pg->bp_count;
1682 nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
1683 /* count/off are forced to cover the whole encryption
1684 * unit size so that all encrypted data is stored on the
1685 * OST, so adjust bp_{count,off}_diff for the size of
1688 pg->bp_count_diff = nunits - pg->bp_count;
1689 pg->bp_count = nunits;
1690 pg->bp_off_diff = pg->bp_off & ~PAGE_MASK;
1691 pg->bp_off = pg->bp_off & PAGE_MASK;
1695 for (niocount = i = 1; i < page_count; i++) {
1696 if (!can_merge_pages(pga[i - 1], pga[i]))
1700 pill = &req->rq_pill;
1701 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1703 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1704 niocount * sizeof(*niobuf));
1706 for (i = 0; i < page_count; i++) {
1707 short_io_size += pga[i]->bp_count;
1708 if (!inode || !IS_ENCRYPTED(inode) ||
1709 !llcrypt_has_encryption_key(inode)) {
1710 pga[i]->bp_count_diff = 0;
1711 pga[i]->bp_off_diff = 0;
1715 if (brw_page2oap(pga[0])->oap_brw_flags & OBD_BRW_RDMA_ONLY) {
1716 enable_checksum = false;
1721 /* Check if read/write is small enough to be a short io. */
1722 if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1723 !imp_connect_shortio(cli->cl_import))
1726 /* If this is an empty RPC to old server, just ignore it */
1727 if (!short_io_size && !pga[0]->bp_page) {
1728 ptlrpc_request_free(req);
1732 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1733 opc == OST_READ ? 0 : short_io_size);
1734 if (opc == OST_READ)
1735 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1738 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1740 ptlrpc_request_free(req);
1743 osc_set_io_portal(req);
1745 ptlrpc_at_set_req_timeout(req);
1746 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1748 req->rq_no_retry_einprogress = 1;
1750 if (short_io_size != 0) {
1752 short_io_buf = NULL;
1756 desc = ptlrpc_prep_bulk_imp(req, page_count,
1757 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1758 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1759 PTLRPC_BULK_PUT_SINK),
1761 &ptlrpc_bulk_kiov_pin_ops);
1764 GOTO(out, rc = -ENOMEM);
1765 /* NB request now owns desc and will free it when it gets freed */
1766 desc->bd_is_rdma = gpu;
1768 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1769 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1770 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1771 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1773 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1775 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1776 * and from_kgid(), because they are asynchronous. Fortunately, variable
1777 * oa contains valid o_uid and o_gid in these two operations.
1778 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1779 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1780 * other process logic */
1781 body->oa.o_uid = oa->o_uid;
1782 body->oa.o_gid = oa->o_gid;
1784 obdo_to_ioobj(oa, ioobj);
1785 ioobj->ioo_bufcnt = niocount;
1786 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1787 * that might be send for this request. The actual number is decided
1788 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1789 * "max - 1" for old client compatibility sending "0", and also so the
1790 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1792 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1794 ioobj_max_brw_set(ioobj, 0);
1796 if (inode && IS_ENCRYPTED(inode) &&
1797 llcrypt_has_encryption_key(inode) &&
1798 !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NO_ENCFLAG)) {
1799 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1800 body->oa.o_valid |= OBD_MD_FLFLAGS;
1801 body->oa.o_flags = 0;
1803 body->oa.o_flags |= LUSTRE_ENCRYPT_FL;
1806 if (short_io_size != 0) {
1807 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1808 body->oa.o_valid |= OBD_MD_FLFLAGS;
1809 body->oa.o_flags = 0;
1811 body->oa.o_flags |= OBD_FL_SHORT_IO;
1812 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1814 if (opc == OST_WRITE) {
1815 short_io_buf = req_capsule_client_get(pill,
1817 LASSERT(short_io_buf != NULL);
1821 LASSERT(page_count > 0);
1823 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1824 struct brw_page *pg = pga[i];
1825 int poff = pg->bp_off & ~PAGE_MASK;
1827 LASSERT(pg->bp_count > 0);
1828 /* make sure there is no gap in the middle of page array */
1829 LASSERTF(page_count == 1 ||
1830 (ergo(i == 0, poff + pg->bp_count == PAGE_SIZE) &&
1831 ergo(i > 0 && i < page_count - 1,
1832 poff == 0 && pg->bp_count == PAGE_SIZE) &&
1833 ergo(i == page_count - 1, poff == 0)),
1834 "i: %d/%d pg: %px off: %llu, count: %u\n",
1835 i, page_count, pg, pg->bp_off, pg->bp_count);
1836 LASSERTF(i == 0 || pg->bp_off > pg_prev->bp_off,
1837 "i %d p_c %u pg %px [pri %lu ind %lu] off %llu prev_pg %px [pri %lu ind %lu] off %llu\n",
1839 pg->bp_page, page_private(pg->bp_page),
1840 pg->bp_page->index, pg->bp_off,
1841 pg_prev->bp_page, page_private(pg_prev->bp_page),
1842 pg_prev->bp_page->index, pg_prev->bp_off);
1843 LASSERT((pga[0]->bp_flag & OBD_BRW_SRVLOCK) ==
1844 (pg->bp_flag & OBD_BRW_SRVLOCK));
1845 if (short_io_size != 0 && opc == OST_WRITE) {
1846 unsigned char *ptr = kmap_atomic(pg->bp_page);
1848 LASSERT(short_io_size >= requested_nob + pg->bp_count);
1849 memcpy(short_io_buf + requested_nob,
1853 } else if (short_io_size == 0) {
1854 desc->bd_frag_ops->add_kiov_frag(desc, pg->bp_page, poff,
1857 requested_nob += pg->bp_count;
1859 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1861 niobuf->rnb_len += pg->bp_count;
1863 niobuf->rnb_offset = pg->bp_off;
1864 niobuf->rnb_len = pg->bp_count;
1865 niobuf->rnb_flags = pg->bp_flag;
1868 if (CFS_FAIL_CHECK(OBD_FAIL_OSC_MARK_COMPRESSED))
1869 niobuf->rnb_flags |= OBD_BRW_COMPRESSED;
1872 LASSERTF((void *)(niobuf - niocount) ==
1873 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1874 "want %px - real %px\n",
1875 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1876 (void *)(niobuf - niocount));
1878 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1880 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1881 body->oa.o_valid |= OBD_MD_FLFLAGS;
1882 body->oa.o_flags = 0;
1884 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1887 if (osc_should_shrink_grant(cli))
1888 osc_shrink_grant_local(cli, &body->oa);
1890 if (!cli->cl_checksum || sptlrpc_flavor_has_bulk(&req->rq_flvr))
1891 enable_checksum = false;
1893 /* size[REQ_REC_OFF] still sizeof (*body) */
1894 if (opc == OST_WRITE) {
1895 if (enable_checksum) {
1896 /* store cl_cksum_type in a local variable since
1897 * it can be changed via lprocfs */
1898 enum cksum_types cksum_type = cli->cl_cksum_type;
1900 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1901 body->oa.o_flags = 0;
1903 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1905 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1907 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1908 requested_nob, page_count,
1910 &body->oa.o_cksum, resend);
1912 CDEBUG(D_PAGE, "failed to checksum: rc = %d\n",
1916 CDEBUG(D_PAGE | (resend ? D_HA : 0),
1917 "checksum at write origin: %x (%x)\n",
1918 body->oa.o_cksum, cksum_type);
1920 /* save this in 'oa', too, for later checking */
1921 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1922 oa->o_flags |= obd_cksum_type_pack(obd_name,
1925 /* clear out the checksum flag, in case this is a
1926 * resend but cl_checksum is no longer set. b=11238 */
1927 oa->o_valid &= ~OBD_MD_FLCKSUM;
1929 oa->o_cksum = body->oa.o_cksum;
1930 /* 1 RC per niobuf */
1931 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1932 sizeof(__u32) * niocount);
1934 if (enable_checksum) {
1935 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1936 body->oa.o_flags = 0;
1937 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1938 cli->cl_cksum_type);
1939 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1942 /* Client cksum has been already copied to wire obdo in previous
1943 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1944 * resent due to cksum error, this will allow Server to
1945 * check+dump pages on its side */
1947 ptlrpc_request_set_replen(req);
1949 aa = ptlrpc_req_async_args(aa, req);
1951 aa->aa_requested_nob = requested_nob;
1952 aa->aa_nio_count = niocount;
1953 aa->aa_page_count = page_count;
1957 INIT_LIST_HEAD(&aa->aa_oaps);
1960 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1961 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1962 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1963 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1967 ptlrpc_req_finished(req);
1971 char dbgcksum_file_name[PATH_MAX];
1973 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1974 struct brw_page **pga, __u32 server_cksum,
1982 /* will only keep dump of pages on first error for the same range in
1983 * file/fid, not during the resends/retries. */
1984 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1985 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1986 (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
1987 libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1988 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1989 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1990 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1992 pga[page_count-1]->bp_off + pga[page_count-1]->bp_count - 1,
1993 client_cksum, server_cksum);
1994 CWARN("dumping checksum data to %s\n", dbgcksum_file_name);
1995 filp = filp_open(dbgcksum_file_name,
1996 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
2000 CDEBUG(D_INFO, "%s: can't open to dump pages with "
2001 "checksum error: rc = %d\n", dbgcksum_file_name,
2004 CERROR("%s: can't open to dump pages with checksum "
2005 "error: rc = %d\n", dbgcksum_file_name, rc);
2009 for (i = 0; i < page_count; i++) {
2010 len = pga[i]->bp_count;
2011 buf = kmap(pga[i]->bp_page);
2013 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
2015 CERROR("%s: wanted to write %u but got %d "
2016 "error\n", dbgcksum_file_name, len, rc);
2022 kunmap(pga[i]->bp_page);
2025 rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
2027 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
2028 filp_close(filp, NULL);
2030 libcfs_debug_dumplog();
2034 check_write_checksum(struct obdo *oa, const struct lnet_processid *peer,
2035 __u32 client_cksum, __u32 server_cksum,
2036 struct osc_brw_async_args *aa)
2038 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
2039 enum cksum_types cksum_type;
2040 obd_dif_csum_fn *fn = NULL;
2041 int sector_size = 0;
2046 if (server_cksum == client_cksum) {
2047 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2051 if (aa->aa_cli->cl_checksum_dump)
2052 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
2053 server_cksum, client_cksum);
2055 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
2058 switch (cksum_type) {
2059 case OBD_CKSUM_T10IP512:
2063 case OBD_CKSUM_T10IP4K:
2067 case OBD_CKSUM_T10CRC512:
2068 fn = obd_dif_crc_fn;
2071 case OBD_CKSUM_T10CRC4K:
2072 fn = obd_dif_crc_fn;
2080 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
2081 aa->aa_page_count, aa->aa_ppga,
2082 OST_WRITE, fn, sector_size,
2085 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
2086 aa->aa_ppga, OST_WRITE, cksum_type,
2090 msg = "failed to calculate the client write checksum";
2091 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
2092 msg = "the server did not use the checksum type specified in "
2093 "the original request - likely a protocol problem";
2094 else if (new_cksum == server_cksum)
2095 msg = "changed on the client after we checksummed it - "
2096 "likely false positive due to mmap IO (bug 11742)";
2097 else if (new_cksum == client_cksum)
2098 msg = "changed in transit before arrival at OST";
2100 msg = "changed in transit AND doesn't match the original - "
2101 "likely false positive due to mmap IO (bug 11742)";
2103 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
2104 DFID " object "DOSTID" extent [%llu-%llu], original "
2105 "client csum %x (type %x), server csum %x (type %x),"
2106 " client csum now %x\n",
2107 obd_name, msg, libcfs_nidstr(&peer->nid),
2108 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
2109 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
2110 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
2111 POSTID(&oa->o_oi), aa->aa_ppga[0]->bp_off,
2112 aa->aa_ppga[aa->aa_page_count - 1]->bp_off +
2113 aa->aa_ppga[aa->aa_page_count-1]->bp_count - 1,
2115 obd_cksum_type_unpack(aa->aa_oa->o_flags),
2116 server_cksum, cksum_type, new_cksum);
2120 /* Note rc enters this function as number of bytes transferred */
2121 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
2123 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
2124 struct client_obd *cli = aa->aa_cli;
2125 const char *obd_name = cli->cl_import->imp_obd->obd_name;
2126 const struct lnet_processid *peer =
2127 &req->rq_import->imp_connection->c_peer;
2128 struct ost_body *body;
2129 u32 client_cksum = 0;
2130 struct inode *inode = NULL;
2131 unsigned int blockbits = 0, blocksize = 0;
2132 struct cl_page *clpage;
2136 if (rc < 0 && rc != -EDQUOT) {
2137 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
2141 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
2142 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
2144 DEBUG_REQ(D_INFO, req, "cannot unpack body");
2148 /* set/clear over quota flag for a uid/gid/projid */
2149 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
2150 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
2151 unsigned qid[LL_MAXQUOTAS] = {
2152 body->oa.o_uid, body->oa.o_gid,
2153 body->oa.o_projid };
2155 "setdq for [%u %u %u] with valid %#llx, flags %x\n",
2156 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
2157 body->oa.o_valid, body->oa.o_flags);
2158 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
2162 osc_update_grant(cli, body);
2167 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
2168 client_cksum = aa->aa_oa->o_cksum; /* save for later */
2170 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2172 CERROR("%s: unexpected positive size %d\n",
2177 if (req->rq_bulk != NULL &&
2178 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
2181 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
2182 check_write_checksum(&body->oa, peer, client_cksum,
2183 body->oa.o_cksum, aa))
2186 rc = check_write_rcs(req, aa->aa_requested_nob,
2187 aa->aa_nio_count, aa->aa_page_count,
2192 /* The rest of this function executes only for OST_READs */
2194 if (req->rq_bulk == NULL) {
2195 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
2197 LASSERT(rc == req->rq_status);
2199 /* if unwrap_bulk failed, return -EAGAIN to retry */
2200 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
2203 GOTO(out, rc = -EAGAIN);
2205 if (rc > aa->aa_requested_nob) {
2206 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
2207 rc, aa->aa_requested_nob);
2211 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2212 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2213 rc, req->rq_bulk->bd_nob_transferred);
2217 if (req->rq_bulk == NULL) {
2219 int nob, pg_count, i = 0;
2222 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2223 pg_count = aa->aa_page_count;
2224 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2227 while (nob > 0 && pg_count > 0) {
2229 int count = aa->aa_ppga[i]->bp_count > nob ?
2230 nob : aa->aa_ppga[i]->bp_count;
2232 CDEBUG(D_CACHE, "page %p count %d\n",
2233 aa->aa_ppga[i]->bp_page, count);
2234 ptr = kmap_atomic(aa->aa_ppga[i]->bp_page);
2235 memcpy(ptr + (aa->aa_ppga[i]->bp_off & ~PAGE_MASK), buf,
2237 kunmap_atomic((void *) ptr);
2246 if (rc < aa->aa_requested_nob)
2247 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2249 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2250 static int cksum_counter;
2251 u32 server_cksum = body->oa.o_cksum;
2255 enum cksum_types cksum_type;
2256 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2257 body->oa.o_flags : 0;
2259 cksum_type = obd_cksum_type_unpack(o_flags);
2260 rc = osc_checksum_bulk_rw(obd_name, cksum_type, nob,
2261 aa->aa_page_count, aa->aa_ppga,
2262 OST_READ, &client_cksum, false);
2266 if (req->rq_bulk != NULL &&
2267 !nid_same(&peer->nid, &req->rq_bulk->bd_sender)) {
2269 router = libcfs_nidstr(&req->rq_bulk->bd_sender);
2272 if (server_cksum != client_cksum) {
2273 struct ost_body *clbody;
2274 __u32 client_cksum2;
2275 u32 page_count = aa->aa_page_count;
2277 osc_checksum_bulk_rw(obd_name, cksum_type, nob,
2278 page_count, aa->aa_ppga,
2279 OST_READ, &client_cksum2, true);
2280 clbody = req_capsule_client_get(&req->rq_pill,
2282 if (cli->cl_checksum_dump)
2283 dump_all_bulk_pages(&clbody->oa, page_count,
2284 aa->aa_ppga, server_cksum,
2287 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2288 "%s%s%s inode "DFID" object "DOSTID
2289 " extent [%llu-%llu], client %x/%x, "
2290 "server %x, cksum_type %x\n",
2292 libcfs_nidstr(&peer->nid),
2294 clbody->oa.o_valid & OBD_MD_FLFID ?
2295 clbody->oa.o_parent_seq : 0ULL,
2296 clbody->oa.o_valid & OBD_MD_FLFID ?
2297 clbody->oa.o_parent_oid : 0,
2298 clbody->oa.o_valid & OBD_MD_FLFID ?
2299 clbody->oa.o_parent_ver : 0,
2300 POSTID(&body->oa.o_oi),
2301 aa->aa_ppga[0]->bp_off,
2302 aa->aa_ppga[page_count-1]->bp_off +
2303 aa->aa_ppga[page_count-1]->bp_count - 1,
2304 client_cksum, client_cksum2,
2305 server_cksum, cksum_type);
2307 aa->aa_oa->o_cksum = client_cksum;
2311 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2314 } else if (unlikely(client_cksum)) {
2315 static int cksum_missed;
2318 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2319 CERROR("%s: checksum %u requested from %s but not sent\n",
2320 obd_name, cksum_missed,
2321 libcfs_nidstr(&peer->nid));
2326 /* get the inode from the first cl_page */
2327 clpage = oap2cl_page(brw_page2oap(aa->aa_ppga[0]));
2328 inode = clpage->cp_inode;
2329 if (clpage->cp_type == CPT_TRANSIENT && inode) {
2330 blockbits = inode->i_blkbits;
2331 blocksize = 1 << blockbits;
2333 if (inode && IS_ENCRYPTED(inode)) {
2336 if (!llcrypt_has_encryption_key(inode)) {
2337 CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2340 for (idx = 0; idx < aa->aa_page_count; idx++) {
2341 struct brw_page *brwpg = aa->aa_ppga[idx];
2342 unsigned int offs = 0;
2344 while (offs < PAGE_SIZE) {
2345 /* do not decrypt if page is all 0s */
2346 if (memchr_inv(page_address(brwpg->bp_page) + offs,
2347 0, LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2348 /* if page is empty forward info to
2349 * upper layers (ll_io_zero_page) by
2350 * clearing PagePrivate2
2353 ClearPagePrivate2(brwpg->bp_page);
2358 /* This is direct IO case. Directly call
2359 * decrypt function that takes inode as
2360 * input parameter. Page does not need
2367 oap2cl_page(brw_page2oap(brwpg));
2369 ((u64)(clpage->cp_page_index) <<
2370 (PAGE_SHIFT - blockbits)) +
2371 (offs >> blockbits);
2374 LUSTRE_ENCRYPTION_UNIT_SIZE;
2375 i += blocksize, lblk_num++) {
2377 llcrypt_decrypt_block_inplace(
2378 inode, brwpg->bp_page,
2385 rc = llcrypt_decrypt_pagecache_blocks(
2387 LUSTRE_ENCRYPTION_UNIT_SIZE,
2393 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2400 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2401 aa->aa_oa, &body->oa);
2406 static int osc_brw_redo_request(struct ptlrpc_request *request,
2407 struct osc_brw_async_args *aa, int rc)
2409 struct ptlrpc_request *new_req;
2410 struct osc_brw_async_args *new_aa;
2413 /* The below message is checked in replay-ost-single.sh test_8ae*/
2414 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2415 "redo for recoverable error %d", rc);
2417 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2418 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2419 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2420 aa->aa_ppga, &new_req, 1);
2425 LASSERTF(request == aa->aa_request,
2426 "request %p != aa_request %p\n",
2427 request, aa->aa_request);
2429 * New request takes over pga and oaps from old request.
2430 * Note that copying a list_head doesn't work, need to move it...
2433 new_req->rq_interpret_reply = request->rq_interpret_reply;
2434 new_req->rq_async_args = request->rq_async_args;
2435 new_req->rq_commit_cb = request->rq_commit_cb;
2436 /* cap resend delay to the current request timeout, this is similar to
2437 * what ptlrpc does (see after_reply()) */
2438 if (aa->aa_resends > new_req->rq_timeout)
2439 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2441 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2442 new_req->rq_generation_set = 1;
2443 new_req->rq_import_generation = request->rq_import_generation;
2445 new_aa = ptlrpc_req_async_args(new_aa, new_req);
2447 INIT_LIST_HEAD(&new_aa->aa_oaps);
2448 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2449 INIT_LIST_HEAD(&new_aa->aa_exts);
2450 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2451 new_aa->aa_resends = aa->aa_resends;
2453 if (aa->aa_request) {
2454 ptlrpc_req_finished(aa->aa_request);
2455 new_aa->aa_request = ptlrpc_request_addref(new_req);
2458 /* XXX: This code will run into problem if we're going to support
2459 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2460 * and wait for all of them to be finished. We should inherit request
2461 * set from old request. */
2462 ptlrpcd_add_req(new_req);
2464 DEBUG_REQ(D_INFO, new_req, "new request");
2469 * ugh, we want disk allocation on the target to happen in offset order. we'll
2470 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2471 * fine for our small page arrays and doesn't require allocation. its an
2472 * insertion sort that swaps elements that are strides apart, shrinking the
2473 * stride down until its '1' and the array is sorted.
2475 static void sort_brw_pages(struct brw_page **array, int num)
2478 struct brw_page *tmp;
2482 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2487 for (i = stride ; i < num ; i++) {
2490 while (j >= stride && array[j - stride]->bp_off > tmp->bp_off) {
2491 array[j] = array[j - stride];
2496 } while (stride > 1);
2499 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2501 LASSERT(ppga != NULL);
2502 OBD_FREE_PTR_ARRAY_LARGE(ppga, count);
2505 /* this is trying to propogate async writeback errors back up to the
2506 * application. As an async write fails we record the error code for later if
2507 * the app does an fsync. As long as errors persist we force future rpcs to be
2508 * sync so that the app can get a sync error and break the cycle of queueing
2509 * pages for which writeback will fail.
2511 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2518 ar->ar_force_sync = 1;
2519 ar->ar_min_xid = ptlrpc_sample_next_xid();
2524 if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2525 ar->ar_force_sync = 0;
2528 static int brw_interpret(const struct lu_env *env,
2529 struct ptlrpc_request *req, void *args, int rc)
2531 struct osc_brw_async_args *aa = args;
2532 struct client_obd *cli = aa->aa_cli;
2533 unsigned long transferred = 0;
2534 struct cl_object *obj = NULL;
2535 struct osc_async_page *last;
2536 struct osc_extent *ext;
2537 struct osc_extent *tmp;
2538 struct lov_oinfo *loi;
2542 ext = list_first_entry(&aa->aa_exts, struct osc_extent, oe_link);
2544 rc = osc_brw_fini_request(req, rc);
2545 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2547 /* restore clear text pages */
2548 osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2551 * When server returns -EINPROGRESS, client should always retry
2552 * regardless of the number of times the bulk was resent already.
2554 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2555 if (req->rq_import_generation !=
2556 req->rq_import->imp_generation) {
2557 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2558 ""DOSTID", rc = %d.\n",
2559 req->rq_import->imp_obd->obd_name,
2560 POSTID(&aa->aa_oa->o_oi), rc);
2561 } else if (rc == -EINPROGRESS ||
2562 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2563 rc = osc_brw_redo_request(req, aa, rc);
2565 CERROR("%s: too many resent retries for object: "
2566 "%llu:%llu, rc = %d.\n",
2567 req->rq_import->imp_obd->obd_name,
2568 POSTID(&aa->aa_oa->o_oi), rc);
2573 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2577 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2578 obj = osc2cl(ext->oe_obj);
2579 loi = cl2osc(obj)->oo_oinfo;
2582 struct obdo *oa = aa->aa_oa;
2583 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2584 unsigned long valid = 0;
2586 cl_object_attr_lock(obj);
2587 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2588 attr->cat_blocks = oa->o_blocks;
2589 valid |= CAT_BLOCKS;
2591 if (oa->o_valid & OBD_MD_FLMTIME) {
2592 attr->cat_mtime = oa->o_mtime;
2595 if (oa->o_valid & OBD_MD_FLATIME) {
2596 attr->cat_atime = oa->o_atime;
2599 if (oa->o_valid & OBD_MD_FLCTIME) {
2600 attr->cat_ctime = oa->o_ctime;
2604 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2605 loff_t last_off = last->oap_count + last->oap_obj_off +
2608 /* Change file size if this is an out of quota or
2609 * direct IO write and it extends the file size */
2610 if (loi->loi_lvb.lvb_size < last_off) {
2611 attr->cat_size = last_off;
2614 /* Extend KMS if it's not a lockless write */
2615 if (loi->loi_kms < last_off &&
2616 oap2osc_page(last)->ops_srvlock == 0) {
2617 attr->cat_kms = last_off;
2623 cl_object_attr_update(env, obj, attr, valid);
2624 cl_object_attr_unlock(obj);
2626 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2629 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0) {
2630 osc_inc_unstable_pages(req);
2632 * If req->rq_committed is set, it means that the dirty pages
2633 * have already committed into the stable storage on OSTs
2634 * (i.e. Direct I/O).
2636 if (!req->rq_committed)
2637 cl_object_dirty_for_sync(env, cl_object_top(obj));
2640 if (aa->aa_request) {
2641 __u64 xid = ptlrpc_req_xid(req);
2643 ptlrpc_req_finished(req);
2644 if (xid && lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2645 spin_lock(&cli->cl_loi_list_lock);
2646 osc_process_ar(&cli->cl_ar, xid, rc);
2647 osc_process_ar(&loi->loi_ar, xid, rc);
2648 spin_unlock(&cli->cl_loi_list_lock);
2651 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2652 list_del_init(&ext->oe_link);
2653 osc_extent_finish(env, ext, 1,
2654 rc && req->rq_no_delay ? -EAGAIN : rc);
2656 LASSERT(list_empty(&aa->aa_exts));
2657 LASSERT(list_empty(&aa->aa_oaps));
2659 transferred = (req->rq_bulk == NULL ? /* short io */
2660 aa->aa_requested_nob :
2661 req->rq_bulk->bd_nob_transferred);
2663 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2664 ptlrpc_lprocfs_brw(req, transferred);
2666 spin_lock(&cli->cl_loi_list_lock);
2667 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2668 * is called so we know whether to go to sync BRWs or wait for more
2669 * RPCs to complete */
2670 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2671 cli->cl_w_in_flight--;
2673 cli->cl_r_in_flight--;
2674 osc_wake_cache_waiters(cli);
2675 spin_unlock(&cli->cl_loi_list_lock);
2677 osc_io_unplug(env, cli, NULL);
2681 static void brw_commit(struct ptlrpc_request *req)
2683 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2684 * this called via the rq_commit_cb, I need to ensure
2685 * osc_dec_unstable_pages is still called. Otherwise unstable
2686 * pages may be leaked. */
2687 spin_lock(&req->rq_lock);
2688 if (likely(req->rq_unstable)) {
2689 req->rq_unstable = 0;
2690 spin_unlock(&req->rq_lock);
2692 osc_dec_unstable_pages(req);
2694 req->rq_committed = 1;
2695 spin_unlock(&req->rq_lock);
2700 * Build an RPC by the list of extent @ext_list. The caller must ensure
2701 * that the total pages in this list are NOT over max pages per RPC.
2702 * Extents in the list must be in OES_RPC state.
2704 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2705 struct list_head *ext_list, int cmd)
2707 struct ptlrpc_request *req = NULL;
2708 struct osc_extent *ext;
2709 struct brw_page **pga = NULL;
2710 struct osc_brw_async_args *aa = NULL;
2711 struct obdo *oa = NULL;
2712 struct osc_async_page *oap;
2713 struct osc_object *obj = NULL;
2714 struct cl_req_attr *crattr = NULL;
2715 loff_t starting_offset = OBD_OBJECT_EOF;
2716 loff_t ending_offset = 0;
2717 /* '1' for consistency with code that checks !mpflag to restore */
2721 bool soft_sync = false;
2722 bool ndelay = false;
2726 __u32 layout_version = 0;
2727 LIST_HEAD(rpc_list);
2728 struct ost_body *body;
2730 LASSERT(!list_empty(ext_list));
2732 /* add pages into rpc_list to build BRW rpc */
2733 list_for_each_entry(ext, ext_list, oe_link) {
2734 LASSERT(ext->oe_state == OES_RPC);
2735 mem_tight |= ext->oe_memalloc;
2736 grant += ext->oe_grants;
2737 page_count += ext->oe_nr_pages;
2738 layout_version = max(layout_version, ext->oe_layout_version);
2743 soft_sync = osc_over_unstable_soft_limit(cli);
2745 mpflag = memalloc_noreclaim_save();
2747 OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count);
2749 GOTO(out, rc = -ENOMEM);
2751 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2753 GOTO(out, rc = -ENOMEM);
2756 list_for_each_entry(ext, ext_list, oe_link) {
2757 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2759 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2761 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2762 pga[i] = &oap->oap_brw_page;
2763 pga[i]->bp_off = oap->oap_obj_off + oap->oap_page_off;
2766 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2767 if (starting_offset == OBD_OBJECT_EOF ||
2768 starting_offset > oap->oap_obj_off) {
2769 starting_offset = oap->oap_obj_off;
2771 CDEBUG(D_CACHE, "page i:%d, oap->oap_obj_off %llu, oap->oap_page_off %u\n",
2772 i, oap->oap_obj_off, oap->oap_page_off);
2773 LASSERT(oap->oap_page_off == 0);
2775 if (ending_offset < oap->oap_obj_off + oap->oap_count) {
2776 ending_offset = oap->oap_obj_off +
2779 LASSERT(oap->oap_page_off + oap->oap_count ==
2787 /* first page in the list */
2788 oap = list_first_entry(&rpc_list, typeof(*oap), oap_rpc_item);
2790 crattr = &osc_env_info(env)->oti_req_attr;
2791 memset(crattr, 0, sizeof(*crattr));
2792 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2793 crattr->cra_flags = ~0ULL;
2794 crattr->cra_page = oap2cl_page(oap);
2795 crattr->cra_oa = oa;
2796 cl_req_attr_set(env, osc2cl(obj), crattr);
2798 if (cmd == OBD_BRW_WRITE) {
2799 oa->o_grant_used = grant;
2800 if (layout_version > 0) {
2801 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2802 PFID(&oa->o_oi.oi_fid), layout_version);
2804 oa->o_layout_version = layout_version;
2805 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2809 sort_brw_pages(pga, page_count);
2810 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2812 CERROR("prep_req failed: %d\n", rc);
2816 req->rq_commit_cb = brw_commit;
2817 req->rq_interpret_reply = brw_interpret;
2818 req->rq_memalloc = mem_tight != 0;
2820 req->rq_no_resend = req->rq_no_delay = 1;
2821 /* probably set a shorter timeout value.
2822 * to handle ETIMEDOUT in brw_interpret() correctly. */
2823 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2826 /* Need to update the timestamps after the request is built in case
2827 * we race with setattr (locally or in queue at OST). If OST gets
2828 * later setattr before earlier BRW (as determined by the request xid),
2829 * the OST will not use BRW timestamps. Sadly, there is no obvious
2830 * way to do this in a single call. bug 10150 */
2831 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2832 crattr->cra_oa = &body->oa;
2833 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2834 cl_req_attr_set(env, osc2cl(obj), crattr);
2835 lustre_msg_set_uid_gid(req->rq_reqmsg, &crattr->cra_uid,
2837 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2839 aa = ptlrpc_req_async_args(aa, req);
2840 INIT_LIST_HEAD(&aa->aa_oaps);
2841 list_splice_init(&rpc_list, &aa->aa_oaps);
2842 INIT_LIST_HEAD(&aa->aa_exts);
2843 list_splice_init(ext_list, &aa->aa_exts);
2844 aa->aa_request = ptlrpc_request_addref(req);
2846 spin_lock(&cli->cl_loi_list_lock);
2847 starting_offset >>= PAGE_SHIFT;
2848 ending_offset >>= PAGE_SHIFT;
2849 if (cmd == OBD_BRW_READ) {
2850 cli->cl_r_in_flight++;
2851 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2852 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2853 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2854 starting_offset + 1);
2856 cli->cl_w_in_flight++;
2857 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2858 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2859 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2860 starting_offset + 1);
2862 spin_unlock(&cli->cl_loi_list_lock);
2864 DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2865 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2866 if (libcfs_debug & D_IOTRACE) {
2869 fid.f_seq = crattr->cra_oa->o_parent_seq;
2870 fid.f_oid = crattr->cra_oa->o_parent_oid;
2871 fid.f_ver = crattr->cra_oa->o_parent_ver;
2873 DFID": %d %s pages, start %lld, end %lld, now %ur/%uw in flight\n",
2874 PFID(&fid), page_count,
2875 cmd == OBD_BRW_READ ? "read" : "write", starting_offset,
2876 ending_offset, cli->cl_r_in_flight, cli->cl_w_in_flight);
2878 CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2880 ptlrpcd_add_req(req);
2886 memalloc_noreclaim_restore(mpflag);
2889 LASSERT(req == NULL);
2892 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2894 osc_release_bounce_pages(pga, page_count);
2895 osc_release_ppga(pga, page_count);
2897 /* this should happen rarely and is pretty bad, it makes the
2898 * pending list not follow the dirty order
2900 while ((ext = list_first_entry_or_null(ext_list,
2902 oe_link)) != NULL) {
2903 list_del_init(&ext->oe_link);
2904 osc_extent_finish(env, ext, 0, rc);
2910 /* This is to refresh our lock in face of no RPCs. */
2911 void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start)
2913 struct ptlrpc_request *req;
2915 struct brw_page bpg = { .bp_off = start, .bp_count = 1};
2916 struct brw_page *pga = &bpg;
2919 memset(&oa, 0, sizeof(oa));
2920 oa.o_oi = osc->oo_oinfo->loi_oi;
2921 oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
2922 /* For updated servers - don't do a read */
2923 oa.o_flags = OBD_FL_NORPC;
2925 rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga,
2928 /* If we succeeded we ship it off, if not there's no point in doing
2929 * anything. Also no resends.
2930 * No interpret callback, no commit callback.
2933 req->rq_no_resend = 1;
2934 ptlrpcd_add_req(req);
2938 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2942 LASSERT(lock != NULL);
2944 lock_res_and_lock(lock);
2946 if (lock->l_ast_data == NULL)
2947 lock->l_ast_data = data;
2948 if (lock->l_ast_data == data)
2951 unlock_res_and_lock(lock);
2956 static int osc_enqueue_fini(struct ptlrpc_request *req,
2957 osc_enqueue_upcall_f upcall,
2958 void *cookie, struct lustre_handle *lockh,
2959 enum ldlm_mode mode, __u64 *flags,
2960 bool speculative, int errcode)
2962 bool intent = *flags & LDLM_FL_HAS_INTENT;
2966 /* The request was created before ldlm_cli_enqueue call. */
2967 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2968 struct ldlm_reply *rep;
2970 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2971 LASSERT(rep != NULL);
2973 rep->lock_policy_res1 =
2974 ptlrpc_status_ntoh(rep->lock_policy_res1);
2975 if (rep->lock_policy_res1)
2976 errcode = rep->lock_policy_res1;
2978 *flags |= LDLM_FL_LVB_READY;
2979 } else if (errcode == ELDLM_OK) {
2980 *flags |= LDLM_FL_LVB_READY;
2983 /* Call the update callback. */
2984 rc = (*upcall)(cookie, lockh, errcode);
2986 /* release the reference taken in ldlm_cli_enqueue() */
2987 if (errcode == ELDLM_LOCK_MATCHED)
2989 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2990 ldlm_lock_decref(lockh, mode);
2995 static int osc_enqueue_interpret(const struct lu_env *env,
2996 struct ptlrpc_request *req,
2999 struct osc_enqueue_args *aa = args;
3000 struct ldlm_lock *lock;
3001 struct lustre_handle *lockh = &aa->oa_lockh;
3002 enum ldlm_mode mode = aa->oa_mode;
3003 struct ost_lvb *lvb = aa->oa_lvb;
3004 __u32 lvb_len = sizeof(*lvb);
3006 struct ldlm_enqueue_info einfo = {
3007 .ei_type = aa->oa_type,
3013 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3015 lock = ldlm_handle2lock(lockh);
3016 LASSERTF(lock != NULL,
3017 "lockh %#llx, req %px, aa %px - client evicted?\n",
3018 lockh->cookie, req, aa);
3020 /* Take an additional reference so that a blocking AST that
3021 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3022 * to arrive after an upcall has been executed by
3023 * osc_enqueue_fini(). */
3024 ldlm_lock_addref(lockh, mode);
3026 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
3027 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
3029 /* Let CP AST to grant the lock first. */
3030 CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3032 if (aa->oa_speculative) {
3033 LASSERT(aa->oa_lvb == NULL);
3034 LASSERT(aa->oa_flags == NULL);
3035 aa->oa_flags = &flags;
3038 /* Complete obtaining the lock procedure. */
3039 rc = ldlm_cli_enqueue_fini(aa->oa_exp, &req->rq_pill, &einfo, 1,
3040 aa->oa_flags, lvb, lvb_len, lockh, rc,
3042 /* Complete osc stuff. */
3043 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
3044 aa->oa_flags, aa->oa_speculative, rc);
3046 CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3048 ldlm_lock_decref(lockh, mode);
3049 LDLM_LOCK_PUT(lock);
3053 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3054 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3055 * other synchronous requests, however keeping some locks and trying to obtain
3056 * others may take a considerable amount of time in a case of ost failure; and
3057 * when other sync requests do not get released lock from a client, the client
3058 * is evicted from the cluster -- such scenarious make the life difficult, so
3059 * release locks just after they are obtained. */
3060 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3061 __u64 *flags, union ldlm_policy_data *policy,
3062 struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
3063 void *cookie, struct ldlm_enqueue_info *einfo,
3064 struct ptlrpc_request_set *rqset, int async,
3067 struct obd_device *obd = exp->exp_obd;
3068 struct lustre_handle lockh = { 0 };
3069 struct ptlrpc_request *req = NULL;
3070 int intent = *flags & LDLM_FL_HAS_INTENT;
3071 __u64 search_flags = *flags;
3072 __u64 match_flags = 0;
3073 enum ldlm_mode mode;
3077 /* Filesystem lock extents are extended to page boundaries so that
3078 * dealing with the page cache is a little smoother. */
3079 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
3080 policy->l_extent.end |= ~PAGE_MASK;
3082 /* Next, search for already existing extent locks that will cover us */
3083 /* If we're trying to read, we also search for an existing PW lock. The
3084 * VFS and page cache already protect us locally, so lots of readers/
3085 * writers can share a single PW lock.
3087 * There are problems with conversion deadlocks, so instead of
3088 * converting a read lock to a write lock, we'll just enqueue a new
3091 * At some point we should cancel the read lock instead of making them
3092 * send us a blocking callback, but there are problems with canceling
3093 * locks out from other users right now, too. */
3094 mode = einfo->ei_mode;
3095 if (einfo->ei_mode == LCK_PR)
3097 /* Normal lock requests must wait for the LVB to be ready before
3098 * matching a lock; speculative lock requests do not need to,
3099 * because they will not actually use the lock. */
3101 search_flags |= LDLM_FL_LVB_READY;
3103 search_flags |= LDLM_FL_BLOCK_GRANTED;
3104 if (mode == LCK_GROUP)
3105 match_flags = LDLM_MATCH_GROUP;
3106 mode = ldlm_lock_match_with_skip(obd->obd_namespace, search_flags, 0,
3107 res_id, einfo->ei_type, policy, mode,
3108 &lockh, match_flags);
3110 struct ldlm_lock *matched;
3112 if (*flags & LDLM_FL_TEST_LOCK)
3115 matched = ldlm_handle2lock(&lockh);
3117 /* This DLM lock request is speculative, and does not
3118 * have an associated IO request. Therefore if there
3119 * is already a DLM lock, it wll just inform the
3120 * caller to cancel the request for this stripe.*/
3121 lock_res_and_lock(matched);
3122 if (ldlm_extent_equal(&policy->l_extent,
3123 &matched->l_policy_data.l_extent))
3127 unlock_res_and_lock(matched);
3129 ldlm_lock_decref(&lockh, mode);
3130 LDLM_LOCK_PUT(matched);
3132 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
3133 *flags |= LDLM_FL_LVB_READY;
3135 /* We already have a lock, and it's referenced. */
3136 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
3138 ldlm_lock_decref(&lockh, mode);
3139 LDLM_LOCK_PUT(matched);
3142 ldlm_lock_decref(&lockh, mode);
3143 LDLM_LOCK_PUT(matched);
3147 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
3150 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3151 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3153 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3154 sizeof(*lvb), LVB_T_OST, &lockh, async);
3157 struct osc_enqueue_args *aa;
3158 aa = ptlrpc_req_async_args(aa, req);
3160 aa->oa_mode = einfo->ei_mode;
3161 aa->oa_type = einfo->ei_type;
3162 lustre_handle_copy(&aa->oa_lockh, &lockh);
3163 aa->oa_upcall = upcall;
3164 aa->oa_cookie = cookie;
3165 aa->oa_speculative = speculative;
3167 aa->oa_flags = flags;
3170 /* speculative locks are essentially to enqueue
3171 * a DLM lock in advance, so we don't care
3172 * about the result of the enqueue. */
3174 aa->oa_flags = NULL;
3177 req->rq_interpret_reply = osc_enqueue_interpret;
3178 ptlrpc_set_add_req(rqset, req);
3183 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
3184 flags, speculative, rc);
3189 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
3190 struct ldlm_res_id *res_id, enum ldlm_type type,
3191 union ldlm_policy_data *policy, enum ldlm_mode mode,
3192 __u64 *flags, struct osc_object *obj,
3193 struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
3195 struct obd_device *obd = exp->exp_obd;
3196 __u64 lflags = *flags;
3200 if (CFS_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3203 /* Filesystem lock extents are extended to page boundaries so that
3204 * dealing with the page cache is a little smoother */
3205 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
3206 policy->l_extent.end |= ~PAGE_MASK;
3208 /* Next, search for already existing extent locks that will cover us */
3209 rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
3210 res_id, type, policy, mode, lockh,
3212 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
3216 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3218 LASSERT(lock != NULL);
3219 if (osc_set_lock_data(lock, obj)) {
3220 lock_res_and_lock(lock);
3221 if (!ldlm_is_lvb_cached(lock)) {
3222 LASSERT(lock->l_ast_data == obj);
3223 osc_lock_lvb_update(env, obj, lock, NULL);
3224 ldlm_set_lvb_cached(lock);
3226 unlock_res_and_lock(lock);
3228 ldlm_lock_decref(lockh, rc);
3231 LDLM_LOCK_PUT(lock);
3236 static int osc_statfs_interpret(const struct lu_env *env,
3237 struct ptlrpc_request *req, void *args, int rc)
3239 struct osc_async_args *aa = args;
3240 struct obd_statfs *msfs;
3245 * The request has in fact never been sent due to issues at
3246 * a higher level (LOV). Exit immediately since the caller
3247 * is aware of the problem and takes care of the clean up.
3251 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3252 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3258 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3260 GOTO(out, rc = -EPROTO);
3262 *aa->aa_oi->oi_osfs = *msfs;
3264 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3269 static int osc_statfs_async(struct obd_export *exp,
3270 struct obd_info *oinfo, time64_t max_age,
3271 struct ptlrpc_request_set *rqset)
3273 struct obd_device *obd = class_exp2obd(exp);
3274 struct ptlrpc_request *req;
3275 struct osc_async_args *aa;
3279 if (obd->obd_osfs_age >= max_age) {
3281 "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
3282 obd->obd_name, &obd->obd_osfs,
3283 obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
3284 obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
3285 spin_lock(&obd->obd_osfs_lock);
3286 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
3287 spin_unlock(&obd->obd_osfs_lock);
3288 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
3289 if (oinfo->oi_cb_up)
3290 oinfo->oi_cb_up(oinfo, 0);
3295 /* We could possibly pass max_age in the request (as an absolute
3296 * timestamp or a "seconds.usec ago") so the target can avoid doing
3297 * extra calls into the filesystem if that isn't necessary (e.g.
3298 * during mount that would help a bit). Having relative timestamps
3299 * is not so great if request processing is slow, while absolute
3300 * timestamps are not ideal because they need time synchronization. */
3301 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3305 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3307 ptlrpc_request_free(req);
3310 ptlrpc_request_set_replen(req);
3311 req->rq_request_portal = OST_CREATE_PORTAL;
3312 ptlrpc_at_set_req_timeout(req);
3314 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3315 /* procfs requests not want stat in wait for avoid deadlock */
3316 req->rq_no_resend = 1;
3317 req->rq_no_delay = 1;
3320 req->rq_interpret_reply = osc_statfs_interpret;
3321 aa = ptlrpc_req_async_args(aa, req);
3324 ptlrpc_set_add_req(rqset, req);
3328 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3329 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3331 struct obd_device *obd = class_exp2obd(exp);
3332 struct obd_statfs *msfs;
3333 struct ptlrpc_request *req;
3334 struct obd_import *imp, *imp0;
3338 /*Since the request might also come from lprocfs, so we need
3339 *sync this with client_disconnect_export Bug15684
3341 with_imp_locked(obd, imp0, rc)
3342 imp = class_import_get(imp0);
3346 /* We could possibly pass max_age in the request (as an absolute
3347 * timestamp or a "seconds.usec ago") so the target can avoid doing
3348 * extra calls into the filesystem if that isn't necessary (e.g.
3349 * during mount that would help a bit). Having relative timestamps
3350 * is not so great if request processing is slow, while absolute
3351 * timestamps are not ideal because they need time synchronization. */
3352 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3354 class_import_put(imp);
3359 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3361 ptlrpc_request_free(req);
3364 ptlrpc_request_set_replen(req);
3365 req->rq_request_portal = OST_CREATE_PORTAL;
3366 ptlrpc_at_set_req_timeout(req);
3368 if (flags & OBD_STATFS_NODELAY) {
3369 /* procfs requests not want stat in wait for avoid deadlock */
3370 req->rq_no_resend = 1;
3371 req->rq_no_delay = 1;
3374 rc = ptlrpc_queue_wait(req);
3378 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3380 GOTO(out, rc = -EPROTO);
3386 ptlrpc_req_finished(req);
3390 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3391 void *karg, void __user *uarg)
3393 struct obd_device *obd = exp->exp_obd;
3394 struct obd_ioctl_data *data;
3398 CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n",
3399 obd->obd_name, cmd, len, karg, uarg);
3401 if (!try_module_get(THIS_MODULE)) {
3402 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3403 module_name(THIS_MODULE));
3408 case OBD_IOC_CLIENT_RECOVER:
3409 if (unlikely(karg == NULL)) {
3410 OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
3415 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3416 data->ioc_inlbuf1, 0);
3420 case OBD_IOC_GETATTR:
3421 if (unlikely(karg == NULL)) {
3422 OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
3427 rc = obd_getattr(NULL, exp, &data->ioc_obdo1);
3429 #ifdef IOC_OSC_SET_ACTIVE
3430 case_OBD_IOC_DEPRECATED_FT(IOC_OSC_SET_ACTIVE, obd->obd_name, 2, 17);
3432 case OBD_IOC_SET_ACTIVE:
3433 if (unlikely(karg == NULL)) {
3434 OBD_IOC_ERROR(obd->obd_name, cmd, "karg=NULL",
3439 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3443 rc = OBD_IOC_DEBUG(D_IOCTL, obd->obd_name, cmd, "unrecognized",
3448 module_put(THIS_MODULE);
3452 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3453 u32 keylen, void *key, u32 vallen, void *val,
3454 struct ptlrpc_request_set *set)
3456 struct ptlrpc_request *req;
3457 struct obd_device *obd = exp->exp_obd;
3458 struct obd_import *imp = class_exp2cliimp(exp);
3463 CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3465 if (KEY_IS(KEY_CHECKSUM)) {
3466 if (vallen != sizeof(int))
3468 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3472 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3473 sptlrpc_conf_client_adapt(obd);
3477 if (KEY_IS(KEY_FLUSH_CTX)) {
3478 sptlrpc_import_flush_my_ctx(imp);
3482 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3483 struct client_obd *cli = &obd->u.cli;
3484 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3485 long target = *(long *)val;
3487 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3492 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3496 * We pass all other commands directly to OST. Since nobody calls osc
3497 * methods directly and everybody is supposed to go through LOV, we
3498 * assume lov checked invalid values for us.
3499 * The only recognised values so far are evict_by_nid and mds_conn.
3500 * Even if something bad goes through, we'd get a -EINVAL from OST
3504 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3505 &RQF_OST_SET_GRANT_INFO :
3510 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3511 RCL_CLIENT, keylen);
3512 if (!KEY_IS(KEY_GRANT_SHRINK))
3513 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3514 RCL_CLIENT, vallen);
3515 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3517 ptlrpc_request_free(req);
3521 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3522 memcpy(tmp, key, keylen);
3523 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3526 memcpy(tmp, val, vallen);
3528 if (KEY_IS(KEY_GRANT_SHRINK)) {
3529 struct osc_grant_args *aa;
3532 aa = ptlrpc_req_async_args(aa, req);
3533 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3535 ptlrpc_req_finished(req);
3538 *oa = ((struct ost_body *)val)->oa;
3540 req->rq_interpret_reply = osc_shrink_grant_interpret;
3543 ptlrpc_request_set_replen(req);
3544 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3545 LASSERT(set != NULL);
3546 ptlrpc_set_add_req(set, req);
3547 ptlrpc_check_set(NULL, set);
3549 ptlrpcd_add_req(req);
3554 EXPORT_SYMBOL(osc_set_info_async);
3556 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3557 struct obd_device *obd, struct obd_uuid *cluuid,
3558 struct obd_connect_data *data, void *localdata)
3560 struct client_obd *cli = &obd->u.cli;
3562 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3566 spin_lock(&cli->cl_loi_list_lock);
3567 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3568 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3569 /* restore ocd_grant_blkbits as client page bits */
3570 data->ocd_grant_blkbits = PAGE_SHIFT;
3571 grant += cli->cl_dirty_grant;
3573 grant += cli->cl_dirty_pages << PAGE_SHIFT;
3575 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3576 lost_grant = cli->cl_lost_grant;
3577 cli->cl_lost_grant = 0;
3578 spin_unlock(&cli->cl_loi_list_lock);
3580 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3581 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3582 data->ocd_version, data->ocd_grant, lost_grant);
3587 EXPORT_SYMBOL(osc_reconnect);
3589 int osc_disconnect(struct obd_export *exp)
3591 struct obd_device *obd = class_exp2obd(exp);
3594 rc = client_disconnect_export(exp);
3596 * Initially we put del_shrink_grant before disconnect_export, but it
3597 * causes the following problem if setup (connect) and cleanup
3598 * (disconnect) are tangled together.
3599 * connect p1 disconnect p2
3600 * ptlrpc_connect_import
3601 * ............... class_manual_cleanup
3604 * ptlrpc_connect_interrupt
3606 * add this client to shrink list
3608 * Bang! grant shrink thread trigger the shrink. BUG18662
3610 osc_del_grant_list(&obd->u.cli);
3613 EXPORT_SYMBOL(osc_disconnect);
3615 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3616 struct hlist_node *hnode, void *arg)
3618 struct lu_env *env = arg;
3619 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3620 struct ldlm_lock *lock;
3621 struct osc_object *osc = NULL;
3625 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3626 if (lock->l_ast_data != NULL && osc == NULL) {
3627 osc = lock->l_ast_data;
3628 cl_object_get(osc2cl(osc));
3631 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3632 * by the 2nd round of ldlm_namespace_clean() call in
3633 * osc_import_event(). */
3634 ldlm_clear_cleaned(lock);
3639 osc_object_invalidate(env, osc);
3640 cl_object_put(env, osc2cl(osc));
3645 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3647 static int osc_import_event(struct obd_device *obd, struct obd_import *imp,
3648 enum obd_import_event event)
3650 struct client_obd *cli;
3654 if (WARN_ON_ONCE(!obd || !imp || imp->imp_obd != obd))
3658 case IMP_EVENT_DISCON: {
3662 spin_lock(&cli->cl_loi_list_lock);
3663 cli->cl_avail_grant = 0;
3664 cli->cl_lost_grant = 0;
3665 spin_unlock(&cli->cl_loi_list_lock);
3668 case IMP_EVENT_INACTIVE: {
3669 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3672 case IMP_EVENT_INVALIDATE: {
3673 struct ldlm_namespace *ns = obd->obd_namespace;
3677 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3679 env = cl_env_get(&refcheck);
3681 osc_io_unplug(env, &obd->u.cli, NULL);
3683 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3684 osc_ldlm_resource_invalidate,
3686 cl_env_put(env, &refcheck);
3688 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3694 case IMP_EVENT_ACTIVE: {
3695 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3698 case IMP_EVENT_OCD: {
3699 struct obd_connect_data *ocd = &imp->imp_connect_data;
3701 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3702 osc_init_grant(&obd->u.cli, ocd);
3705 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3706 imp->imp_client->cli_request_portal =
3709 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3712 case IMP_EVENT_DEACTIVATE: {
3713 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3716 case IMP_EVENT_ACTIVATE: {
3717 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3721 CERROR("%s: Unknown import event %d: rc = %d\n",
3722 obd->obd_name, event, -EINVAL);
3729 * Determine whether the lock can be canceled before replaying the lock
3730 * during recovery, see bug16774 for detailed information.
3732 * \retval zero the lock can't be canceled
3733 * \retval other ok to cancel
3735 static int osc_cancel_weight(struct ldlm_lock *lock)
3738 * Cancel all unused and granted extent lock.
3740 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3741 ldlm_is_granted(lock) &&
3742 osc_ldlm_weigh_ast(lock) == 0)
3748 static int brw_queue_work(const struct lu_env *env, void *data)
3750 struct client_obd *cli = data;
3752 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3754 osc_io_unplug(env, cli, NULL);
3758 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3760 struct client_obd *cli = &obd->u.cli;
3766 rc = ptlrpcd_addref();
3770 rc = client_obd_setup(obd, lcfg);
3772 GOTO(out_ptlrpcd, rc);
3775 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3776 if (IS_ERR(handler))
3777 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3778 cli->cl_writeback_work = handler;
3780 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3781 if (IS_ERR(handler))
3782 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3783 cli->cl_lru_work = handler;
3785 rc = osc_quota_setup(obd);
3787 GOTO(out_ptlrpcd_work, rc);
3789 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3790 cli->cl_root_squash = 0;
3791 osc_update_next_shrink(cli);
3796 if (cli->cl_writeback_work != NULL) {
3797 ptlrpcd_destroy_work(cli->cl_writeback_work);
3798 cli->cl_writeback_work = NULL;
3800 if (cli->cl_lru_work != NULL) {
3801 ptlrpcd_destroy_work(cli->cl_lru_work);
3802 cli->cl_lru_work = NULL;
3804 client_obd_cleanup(obd);
3809 EXPORT_SYMBOL(osc_setup_common);
3811 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3813 struct client_obd *cli = &obd->u.cli;
3821 rc = osc_setup_common(obd, lcfg);
3825 rc = osc_tunables_init(obd);
3830 * We try to control the total number of requests with a upper limit
3831 * osc_reqpool_maxreqcount. There might be some race which will cause
3832 * over-limit allocation, but it is fine.
3834 req_count = atomic_read(&osc_pool_req_count);
3835 if (req_count < osc_reqpool_maxreqcount) {
3836 adding = cli->cl_max_rpcs_in_flight + 2;
3837 if (req_count + adding > osc_reqpool_maxreqcount)
3838 adding = osc_reqpool_maxreqcount - req_count;
3840 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3841 atomic_add(added, &osc_pool_req_count);
3844 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3846 spin_lock(&osc_shrink_lock);
3847 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3848 spin_unlock(&osc_shrink_lock);
3849 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3850 cli->cl_import->imp_idle_debug = D_HA;
3855 int osc_precleanup_common(struct obd_device *obd)
3857 struct client_obd *cli = &obd->u.cli;
3861 * for echo client, export may be on zombie list, wait for
3862 * zombie thread to cull it, because cli.cl_import will be
3863 * cleared in client_disconnect_export():
3864 * class_export_destroy() -> obd_cleanup() ->
3865 * echo_device_free() -> echo_client_cleanup() ->
3866 * obd_disconnect() -> osc_disconnect() ->
3867 * client_disconnect_export()
3869 obd_zombie_barrier();
3870 if (cli->cl_writeback_work) {
3871 ptlrpcd_destroy_work(cli->cl_writeback_work);
3872 cli->cl_writeback_work = NULL;
3875 if (cli->cl_lru_work) {
3876 ptlrpcd_destroy_work(cli->cl_lru_work);
3877 cli->cl_lru_work = NULL;
3880 obd_cleanup_client_import(obd);
3883 EXPORT_SYMBOL(osc_precleanup_common);
3885 static int osc_precleanup(struct obd_device *obd)
3889 osc_precleanup_common(obd);
3891 ptlrpc_lprocfs_unregister_obd(obd);
3895 int osc_cleanup_common(struct obd_device *obd)
3897 struct client_obd *cli = &obd->u.cli;
3902 spin_lock(&osc_shrink_lock);
3903 list_del(&cli->cl_shrink_list);
3904 spin_unlock(&osc_shrink_lock);
3907 if (cli->cl_cache != NULL) {
3908 LASSERT(refcount_read(&cli->cl_cache->ccc_users) > 0);
3909 spin_lock(&cli->cl_cache->ccc_lru_lock);
3910 list_del_init(&cli->cl_lru_osc);
3911 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3912 cli->cl_lru_left = NULL;
3913 cl_cache_decref(cli->cl_cache);
3914 cli->cl_cache = NULL;
3917 /* free memory of osc quota cache */
3918 osc_quota_cleanup(obd);
3920 rc = client_obd_cleanup(obd);
3925 EXPORT_SYMBOL(osc_cleanup_common);
3927 static const struct obd_ops osc_obd_ops = {
3928 .o_owner = THIS_MODULE,
3929 .o_setup = osc_setup,
3930 .o_precleanup = osc_precleanup,
3931 .o_cleanup = osc_cleanup_common,
3932 .o_add_conn = client_import_add_conn,
3933 .o_del_conn = client_import_del_conn,
3934 .o_connect = client_connect_import,
3935 .o_reconnect = osc_reconnect,
3936 .o_disconnect = osc_disconnect,
3937 .o_statfs = osc_statfs,
3938 .o_statfs_async = osc_statfs_async,
3939 .o_create = osc_create,
3940 .o_destroy = osc_destroy,
3941 .o_getattr = osc_getattr,
3942 .o_setattr = osc_setattr,
3943 .o_iocontrol = osc_iocontrol,
3944 .o_set_info_async = osc_set_info_async,
3945 .o_import_event = osc_import_event,
3946 .o_quotactl = osc_quotactl,
3949 LIST_HEAD(osc_shrink_list);
3950 DEFINE_SPINLOCK(osc_shrink_lock);
3951 bool osc_page_cache_shrink_enabled = true;
3953 #ifdef HAVE_SHRINKER_COUNT
3954 static struct ll_shrinker_ops osc_cache_sh_ops = {
3955 .count_objects = osc_cache_shrink_count,
3956 .scan_objects = osc_cache_shrink_scan,
3957 .seeks = DEFAULT_SEEKS,
3960 static int osc_cache_shrink(struct shrinker *shrinker,
3961 struct shrink_control *sc)
3963 (void)osc_cache_shrink_scan(shrinker, sc);
3965 return osc_cache_shrink_count(shrinker, sc);
3968 static struct ll_shrinker_ops osc_cache_sh_ops = {
3969 .shrink = osc_cache_shrink,
3970 .seeks = DEFAULT_SEEKS,
3974 static struct shrinker *osc_cache_shrinker;
3976 static int __init osc_init(void)
3978 unsigned int reqpool_size;
3979 unsigned int reqsize;
3983 /* print an address of _any_ initialized kernel symbol from this
3984 * module, to allow debugging with gdb that doesn't support data
3985 * symbols from modules.*/
3986 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3988 rc = libcfs_setup();
3992 rc = lu_kmem_init(osc_caches);
3996 osc_cache_shrinker = ll_shrinker_create(&osc_cache_sh_ops, 0,
3998 if (IS_ERR(osc_cache_shrinker))
3999 GOTO(out_kmem, rc = PTR_ERR(osc_cache_shrinker));
4001 /* This is obviously too much memory, only prevent overflow here */
4002 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
4003 GOTO(out_shrinker, rc = -EINVAL);
4005 reqpool_size = osc_reqpool_mem_max << 20;
4008 while (reqsize < OST_IO_MAXREQSIZE)
4009 reqsize = reqsize << 1;
4012 * We don't enlarge the request count in OSC pool according to
4013 * cl_max_rpcs_in_flight. The allocation from the pool will only be
4014 * tried after normal allocation failed. So a small OSC pool won't
4015 * cause much performance degression in most of cases.
4017 osc_reqpool_maxreqcount = reqpool_size / reqsize;
4019 atomic_set(&osc_pool_req_count, 0);
4020 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
4021 ptlrpc_add_rqs_to_pool);
4023 if (osc_rq_pool == NULL)
4024 GOTO(out_shrinker, rc = -ENOMEM);
4026 rc = osc_start_grant_work();
4028 GOTO(out_req_pool, rc);
4030 rc = class_register_type(&osc_obd_ops, NULL, true,
4031 LUSTRE_OSC_NAME, &osc_device_type);
4033 GOTO(out_stop_grant, rc);
4038 osc_stop_grant_work();
4040 ptlrpc_free_rq_pool(osc_rq_pool);
4042 shrinker_free(osc_cache_shrinker);
4044 lu_kmem_fini(osc_caches);
4049 static void __exit osc_exit(void)
4051 class_unregister_type(LUSTRE_OSC_NAME);
4052 ptlrpc_free_rq_pool(osc_rq_pool);
4053 osc_stop_grant_work();
4054 shrinker_free(osc_cache_shrinker);
4055 lu_kmem_fini(osc_caches);
4058 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
4059 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4060 MODULE_VERSION(LUSTRE_VERSION_STRING);
4061 MODULE_LICENSE("GPL");
4063 module_init(osc_init);
4064 module_exit(osc_exit);