4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
32 #define DEBUG_SUBSYSTEM S_OSC
34 #include <linux/workqueue.h>
35 #include <libcfs/libcfs.h>
36 #include <linux/falloc.h>
37 #include <lprocfs_status.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
48 #include <linux/falloc.h>
50 #include "osc_internal.h"
51 #include <lnet/lnet_rdma.h>
53 atomic_t osc_pool_req_count;
54 unsigned int osc_reqpool_maxreqcount;
55 struct ptlrpc_request_pool *osc_rq_pool;
57 /* max memory used for request pool, unit is MB */
58 static unsigned int osc_reqpool_mem_max = 5;
59 module_param(osc_reqpool_mem_max, uint, 0444);
61 static int osc_idle_timeout = 20;
62 module_param(osc_idle_timeout, uint, 0644);
64 #define osc_grant_args osc_brw_async_args
66 struct osc_setattr_args {
68 obd_enqueue_update_f sa_upcall;
72 struct osc_fsync_args {
73 struct osc_object *fa_obj;
75 obd_enqueue_update_f fa_upcall;
79 struct osc_ladvise_args {
81 obd_enqueue_update_f la_upcall;
85 static void osc_release_ppga(struct brw_page **ppga, size_t count);
86 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
89 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
91 struct ost_body *body;
93 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
96 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
99 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
102 struct ptlrpc_request *req;
103 struct ost_body *body;
107 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
111 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
113 ptlrpc_request_free(req);
117 osc_pack_req_body(req, oa);
119 ptlrpc_request_set_replen(req);
121 rc = ptlrpc_queue_wait(req);
125 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
127 GOTO(out, rc = -EPROTO);
129 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
130 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
132 oa->o_blksize = cli_brw_size(exp->exp_obd);
133 oa->o_valid |= OBD_MD_FLBLKSZ;
137 ptlrpc_req_finished(req);
142 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
145 struct ptlrpc_request *req;
146 struct ost_body *body;
150 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
152 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
156 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
158 ptlrpc_request_free(req);
162 osc_pack_req_body(req, oa);
164 ptlrpc_request_set_replen(req);
166 rc = ptlrpc_queue_wait(req);
170 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
172 GOTO(out, rc = -EPROTO);
174 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
178 ptlrpc_req_finished(req);
183 static int osc_setattr_interpret(const struct lu_env *env,
184 struct ptlrpc_request *req, void *args, int rc)
186 struct osc_setattr_args *sa = args;
187 struct ost_body *body;
194 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
196 GOTO(out, rc = -EPROTO);
198 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
201 rc = sa->sa_upcall(sa->sa_cookie, rc);
205 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
206 obd_enqueue_update_f upcall, void *cookie,
207 struct ptlrpc_request_set *rqset)
209 struct ptlrpc_request *req;
210 struct osc_setattr_args *sa;
215 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
219 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
221 ptlrpc_request_free(req);
225 osc_pack_req_body(req, oa);
227 ptlrpc_request_set_replen(req);
229 /* do mds to ost setattr asynchronously */
231 /* Do not wait for response. */
232 ptlrpcd_add_req(req);
234 req->rq_interpret_reply = osc_setattr_interpret;
236 sa = ptlrpc_req_async_args(sa, req);
238 sa->sa_upcall = upcall;
239 sa->sa_cookie = cookie;
241 ptlrpc_set_add_req(rqset, req);
247 static int osc_ladvise_interpret(const struct lu_env *env,
248 struct ptlrpc_request *req,
251 struct osc_ladvise_args *la = arg;
252 struct ost_body *body;
258 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
260 GOTO(out, rc = -EPROTO);
262 *la->la_oa = body->oa;
264 rc = la->la_upcall(la->la_cookie, rc);
269 * If rqset is NULL, do not wait for response. Upcall and cookie could also
270 * be NULL in this case
272 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
273 struct ladvise_hdr *ladvise_hdr,
274 obd_enqueue_update_f upcall, void *cookie,
275 struct ptlrpc_request_set *rqset)
277 struct ptlrpc_request *req;
278 struct ost_body *body;
279 struct osc_ladvise_args *la;
281 struct lu_ladvise *req_ladvise;
282 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
283 int num_advise = ladvise_hdr->lah_count;
284 struct ladvise_hdr *req_ladvise_hdr;
287 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
291 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
292 num_advise * sizeof(*ladvise));
293 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
295 ptlrpc_request_free(req);
298 req->rq_request_portal = OST_IO_PORTAL;
299 ptlrpc_at_set_req_timeout(req);
301 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
303 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
306 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
307 &RMF_OST_LADVISE_HDR);
308 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
310 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
311 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
312 ptlrpc_request_set_replen(req);
315 /* Do not wait for response. */
316 ptlrpcd_add_req(req);
320 req->rq_interpret_reply = osc_ladvise_interpret;
321 la = ptlrpc_req_async_args(la, req);
323 la->la_upcall = upcall;
324 la->la_cookie = cookie;
326 ptlrpc_set_add_req(rqset, req);
331 static int osc_create(const struct lu_env *env, struct obd_export *exp,
334 struct ptlrpc_request *req;
335 struct ost_body *body;
340 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
341 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
343 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
345 GOTO(out, rc = -ENOMEM);
347 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
349 ptlrpc_request_free(req);
353 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
356 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
358 ptlrpc_request_set_replen(req);
360 rc = ptlrpc_queue_wait(req);
364 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366 GOTO(out_req, rc = -EPROTO);
368 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
369 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
371 oa->o_blksize = cli_brw_size(exp->exp_obd);
372 oa->o_valid |= OBD_MD_FLBLKSZ;
374 CDEBUG(D_HA, "transno: %lld\n",
375 lustre_msg_get_transno(req->rq_repmsg));
377 ptlrpc_req_finished(req);
382 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
383 obd_enqueue_update_f upcall, void *cookie)
385 struct ptlrpc_request *req;
386 struct osc_setattr_args *sa;
387 struct obd_import *imp = class_exp2cliimp(exp);
388 struct ost_body *body;
393 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
397 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
399 ptlrpc_request_free(req);
403 osc_set_io_portal(req);
405 ptlrpc_at_set_req_timeout(req);
407 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
409 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
411 ptlrpc_request_set_replen(req);
413 req->rq_interpret_reply = osc_setattr_interpret;
414 sa = ptlrpc_req_async_args(sa, req);
416 sa->sa_upcall = upcall;
417 sa->sa_cookie = cookie;
419 ptlrpcd_add_req(req);
423 EXPORT_SYMBOL(osc_punch_send);
426 * osc_fallocate_base() - Handles fallocate request.
428 * @exp: Export structure
429 * @oa: Attributes passed to OSS from client (obdo structure)
430 * @upcall: Primary & supplementary group information
431 * @cookie: Exclusive identifier
432 * @rqset: Request list.
433 * @mode: Operation done on given range.
435 * osc_fallocate_base() - Handles fallocate requests only. Only block
436 * allocation or standard preallocate operation is supported currently.
437 * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
438 * is supported via SETATTR request.
440 * Return: Non-zero on failure and O on success.
442 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
443 obd_enqueue_update_f upcall, void *cookie, int mode)
445 struct ptlrpc_request *req;
446 struct osc_setattr_args *sa;
447 struct ost_body *body;
448 struct obd_import *imp = class_exp2cliimp(exp);
452 oa->o_falloc_mode = mode;
453 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
458 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
460 ptlrpc_request_free(req);
464 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
467 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
469 ptlrpc_request_set_replen(req);
471 req->rq_interpret_reply = osc_setattr_interpret;
472 BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
473 sa = ptlrpc_req_async_args(sa, req);
475 sa->sa_upcall = upcall;
476 sa->sa_cookie = cookie;
478 ptlrpcd_add_req(req);
482 EXPORT_SYMBOL(osc_fallocate_base);
484 static int osc_sync_interpret(const struct lu_env *env,
485 struct ptlrpc_request *req, void *args, int rc)
487 struct osc_fsync_args *fa = args;
488 struct ost_body *body;
489 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
490 unsigned long valid = 0;
491 struct cl_object *obj;
497 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
499 CERROR("can't unpack ost_body\n");
500 GOTO(out, rc = -EPROTO);
503 *fa->fa_oa = body->oa;
504 obj = osc2cl(fa->fa_obj);
506 /* Update osc object's blocks attribute */
507 cl_object_attr_lock(obj);
508 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
509 attr->cat_blocks = body->oa.o_blocks;
514 cl_object_attr_update(env, obj, attr, valid);
515 cl_object_attr_unlock(obj);
518 rc = fa->fa_upcall(fa->fa_cookie, rc);
522 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
523 obd_enqueue_update_f upcall, void *cookie,
524 struct ptlrpc_request_set *rqset)
526 struct obd_export *exp = osc_export(obj);
527 struct ptlrpc_request *req;
528 struct ost_body *body;
529 struct osc_fsync_args *fa;
533 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
537 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
539 ptlrpc_request_free(req);
543 /* overload the size and blocks fields in the oa with start/end */
544 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
546 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
548 ptlrpc_request_set_replen(req);
549 req->rq_interpret_reply = osc_sync_interpret;
551 fa = ptlrpc_req_async_args(fa, req);
554 fa->fa_upcall = upcall;
555 fa->fa_cookie = cookie;
557 ptlrpc_set_add_req(rqset, req);
562 /* Find and cancel locally locks matched by @mode in the resource found by
563 * @objid. Found locks are added into @cancel list. Returns the amount of
564 * locks added to @cancels list. */
565 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
566 struct list_head *cancels,
567 enum ldlm_mode mode, __u64 lock_flags)
569 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
570 struct ldlm_res_id res_id;
571 struct ldlm_resource *res;
575 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
576 * export) but disabled through procfs (flag in NS).
578 * This distinguishes from a case when ELC is not supported originally,
579 * when we still want to cancel locks in advance and just cancel them
580 * locally, without sending any RPC. */
581 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
584 ostid_build_res_name(&oa->o_oi, &res_id);
585 res = ldlm_resource_get(ns, &res_id, 0, 0);
589 LDLM_RESOURCE_ADDREF(res);
590 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
591 lock_flags, 0, NULL);
592 LDLM_RESOURCE_DELREF(res);
593 ldlm_resource_putref(res);
597 static int osc_destroy_interpret(const struct lu_env *env,
598 struct ptlrpc_request *req, void *args, int rc)
600 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
602 atomic_dec(&cli->cl_destroy_in_flight);
603 wake_up(&cli->cl_destroy_waitq);
608 static int osc_can_send_destroy(struct client_obd *cli)
610 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
611 cli->cl_max_rpcs_in_flight) {
612 /* The destroy request can be sent */
615 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
616 cli->cl_max_rpcs_in_flight) {
618 * The counter has been modified between the two atomic
621 wake_up(&cli->cl_destroy_waitq);
626 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
629 struct client_obd *cli = &exp->exp_obd->u.cli;
630 struct ptlrpc_request *req;
631 struct ost_body *body;
637 CDEBUG(D_INFO, "oa NULL\n");
641 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
642 LDLM_FL_DISCARD_DATA);
644 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
646 ldlm_lock_list_put(&cancels, l_bl_ast, count);
650 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
653 ptlrpc_request_free(req);
657 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
658 ptlrpc_at_set_req_timeout(req);
660 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
662 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
664 ptlrpc_request_set_replen(req);
666 req->rq_interpret_reply = osc_destroy_interpret;
667 if (!osc_can_send_destroy(cli)) {
669 * Wait until the number of on-going destroy RPCs drops
670 * under max_rpc_in_flight
672 rc = l_wait_event_abortable_exclusive(
673 cli->cl_destroy_waitq,
674 osc_can_send_destroy(cli));
676 ptlrpc_req_finished(req);
681 /* Do not wait for response */
682 ptlrpcd_add_req(req);
686 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
689 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
691 LASSERT(!(oa->o_valid & bits));
694 spin_lock(&cli->cl_loi_list_lock);
695 if (cli->cl_ocd_grant_param)
696 oa->o_dirty = cli->cl_dirty_grant;
698 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
699 if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
700 CERROR("dirty %lu > dirty_max %lu\n",
702 cli->cl_dirty_max_pages);
704 } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
705 (long)(obd_max_dirty_pages + 1))) {
706 /* The atomic_read() allowing the atomic_inc() are
707 * not covered by a lock thus they may safely race and trip
708 * this CERROR() unless we add in a small fudge factor (+1). */
709 CERROR("%s: dirty %ld > system dirty_max %ld\n",
710 cli_name(cli), atomic_long_read(&obd_dirty_pages),
711 obd_max_dirty_pages);
713 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
715 CERROR("dirty %lu - dirty_max %lu too big???\n",
716 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
719 unsigned long nrpages;
720 unsigned long undirty;
722 nrpages = cli->cl_max_pages_per_rpc;
723 nrpages *= cli->cl_max_rpcs_in_flight + 1;
724 nrpages = max(nrpages, cli->cl_dirty_max_pages);
725 undirty = nrpages << PAGE_SHIFT;
726 if (cli->cl_ocd_grant_param) {
729 /* take extent tax into account when asking for more
731 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
732 cli->cl_max_extent_pages;
733 undirty += nrextents * cli->cl_grant_extent_tax;
735 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
736 * to add extent tax, etc.
738 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
739 ~(PTLRPC_MAX_BRW_SIZE * 4UL));
741 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
742 /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
743 if (cli->cl_lost_grant > INT_MAX) {
745 "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
746 cli_name(cli), cli->cl_lost_grant);
747 oa->o_dropped = INT_MAX;
749 oa->o_dropped = cli->cl_lost_grant;
751 cli->cl_lost_grant -= oa->o_dropped;
752 spin_unlock(&cli->cl_loi_list_lock);
753 CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
754 " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
755 oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
758 void osc_update_next_shrink(struct client_obd *cli)
760 cli->cl_next_shrink_grant = ktime_get_seconds() +
761 cli->cl_grant_shrink_interval;
763 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
764 cli->cl_next_shrink_grant);
766 EXPORT_SYMBOL(osc_update_next_shrink);
768 static void __osc_update_grant(struct client_obd *cli, u64 grant)
770 spin_lock(&cli->cl_loi_list_lock);
771 cli->cl_avail_grant += grant;
772 spin_unlock(&cli->cl_loi_list_lock);
775 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
777 if (body->oa.o_valid & OBD_MD_FLGRANT) {
778 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
779 __osc_update_grant(cli, body->oa.o_grant);
784 * grant thread data for shrinking space.
786 struct grant_thread_data {
787 struct list_head gtd_clients;
788 struct mutex gtd_mutex;
789 unsigned long gtd_stopped:1;
791 static struct grant_thread_data client_gtd;
793 static int osc_shrink_grant_interpret(const struct lu_env *env,
794 struct ptlrpc_request *req,
797 struct osc_grant_args *aa = args;
798 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
799 struct ost_body *body;
802 __osc_update_grant(cli, aa->aa_oa->o_grant);
806 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
808 osc_update_grant(cli, body);
810 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
816 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
818 spin_lock(&cli->cl_loi_list_lock);
819 oa->o_grant = cli->cl_avail_grant / 4;
820 cli->cl_avail_grant -= oa->o_grant;
821 spin_unlock(&cli->cl_loi_list_lock);
822 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
823 oa->o_valid |= OBD_MD_FLFLAGS;
826 oa->o_flags |= OBD_FL_SHRINK_GRANT;
827 osc_update_next_shrink(cli);
830 /* Shrink the current grant, either from some large amount to enough for a
831 * full set of in-flight RPCs, or if we have already shrunk to that limit
832 * then to enough for a single RPC. This avoids keeping more grant than
833 * needed, and avoids shrinking the grant piecemeal. */
834 static int osc_shrink_grant(struct client_obd *cli)
836 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
837 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
839 spin_lock(&cli->cl_loi_list_lock);
840 if (cli->cl_avail_grant <= target_bytes)
841 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
842 spin_unlock(&cli->cl_loi_list_lock);
844 return osc_shrink_grant_to_target(cli, target_bytes);
847 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
850 struct ost_body *body;
853 spin_lock(&cli->cl_loi_list_lock);
854 /* Don't shrink if we are already above or below the desired limit
855 * We don't want to shrink below a single RPC, as that will negatively
856 * impact block allocation and long-term performance. */
857 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
858 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
860 if (target_bytes >= cli->cl_avail_grant) {
861 spin_unlock(&cli->cl_loi_list_lock);
864 spin_unlock(&cli->cl_loi_list_lock);
870 osc_announce_cached(cli, &body->oa, 0);
872 spin_lock(&cli->cl_loi_list_lock);
873 if (target_bytes >= cli->cl_avail_grant) {
874 /* available grant has changed since target calculation */
875 spin_unlock(&cli->cl_loi_list_lock);
876 GOTO(out_free, rc = 0);
878 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
879 cli->cl_avail_grant = target_bytes;
880 spin_unlock(&cli->cl_loi_list_lock);
881 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
882 body->oa.o_valid |= OBD_MD_FLFLAGS;
883 body->oa.o_flags = 0;
885 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
886 osc_update_next_shrink(cli);
888 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
889 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
890 sizeof(*body), body, NULL);
892 __osc_update_grant(cli, body->oa.o_grant);
898 static int osc_should_shrink_grant(struct client_obd *client)
900 time64_t next_shrink = client->cl_next_shrink_grant;
902 if (client->cl_import == NULL)
905 if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
906 client->cl_import->imp_grant_shrink_disabled) {
907 osc_update_next_shrink(client);
911 if (ktime_get_seconds() >= next_shrink - 5) {
912 /* Get the current RPC size directly, instead of going via:
913 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
914 * Keep comment here so that it can be found by searching. */
915 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
917 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
918 client->cl_avail_grant > brw_size)
921 osc_update_next_shrink(client);
926 #define GRANT_SHRINK_RPC_BATCH 100
928 static struct delayed_work work;
930 static void osc_grant_work_handler(struct work_struct *data)
932 struct client_obd *cli;
934 bool init_next_shrink = true;
935 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
938 mutex_lock(&client_gtd.gtd_mutex);
939 list_for_each_entry(cli, &client_gtd.gtd_clients,
941 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
942 osc_should_shrink_grant(cli)) {
943 osc_shrink_grant(cli);
947 if (!init_next_shrink) {
948 if (cli->cl_next_shrink_grant < next_shrink &&
949 cli->cl_next_shrink_grant > ktime_get_seconds())
950 next_shrink = cli->cl_next_shrink_grant;
952 init_next_shrink = false;
953 next_shrink = cli->cl_next_shrink_grant;
956 mutex_unlock(&client_gtd.gtd_mutex);
958 if (client_gtd.gtd_stopped == 1)
961 if (next_shrink > ktime_get_seconds()) {
962 time64_t delay = next_shrink - ktime_get_seconds();
964 schedule_delayed_work(&work, cfs_time_seconds(delay));
966 schedule_work(&work.work);
970 void osc_schedule_grant_work(void)
972 cancel_delayed_work_sync(&work);
973 schedule_work(&work.work);
975 EXPORT_SYMBOL(osc_schedule_grant_work);
978 * Start grant thread for returing grant to server for idle clients.
980 static int osc_start_grant_work(void)
982 client_gtd.gtd_stopped = 0;
983 mutex_init(&client_gtd.gtd_mutex);
984 INIT_LIST_HEAD(&client_gtd.gtd_clients);
986 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
987 schedule_work(&work.work);
992 static void osc_stop_grant_work(void)
994 client_gtd.gtd_stopped = 1;
995 cancel_delayed_work_sync(&work);
998 static void osc_add_grant_list(struct client_obd *client)
1000 mutex_lock(&client_gtd.gtd_mutex);
1001 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1002 mutex_unlock(&client_gtd.gtd_mutex);
1005 static void osc_del_grant_list(struct client_obd *client)
1007 if (list_empty(&client->cl_grant_chain))
1010 mutex_lock(&client_gtd.gtd_mutex);
1011 list_del_init(&client->cl_grant_chain);
1012 mutex_unlock(&client_gtd.gtd_mutex);
1015 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1018 * ocd_grant is the total grant amount we're expect to hold: if we've
1019 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1020 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1023 * race is tolerable here: if we're evicted, but imp_state already
1024 * left EVICTED state, then cl_dirty_pages must be 0 already.
1026 spin_lock(&cli->cl_loi_list_lock);
1027 cli->cl_avail_grant = ocd->ocd_grant;
1028 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1029 unsigned long consumed = cli->cl_reserved_grant;
1031 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1032 consumed += cli->cl_dirty_grant;
1034 consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1035 if (cli->cl_avail_grant < consumed) {
1036 CERROR("%s: granted %ld but already consumed %ld\n",
1037 cli_name(cli), cli->cl_avail_grant, consumed);
1038 cli->cl_avail_grant = 0;
1040 cli->cl_avail_grant -= consumed;
1044 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1048 /* overhead for each extent insertion */
1049 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1050 /* determine the appropriate chunk size used by osc_extent. */
1051 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1052 ocd->ocd_grant_blkbits);
1053 /* max_pages_per_rpc must be chunk aligned */
1054 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1055 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1056 ~chunk_mask) & chunk_mask;
1057 /* determine maximum extent size, in #pages */
1058 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1059 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1060 cli->cl_ocd_grant_param = 1;
1062 cli->cl_ocd_grant_param = 0;
1063 cli->cl_grant_extent_tax = 0;
1064 cli->cl_chunkbits = PAGE_SHIFT;
1065 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1067 spin_unlock(&cli->cl_loi_list_lock);
1070 "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1072 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1073 cli->cl_max_extent_pages);
1075 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1076 osc_add_grant_list(cli);
1078 EXPORT_SYMBOL(osc_init_grant);
1080 /* We assume that the reason this OSC got a short read is because it read
1081 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1082 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1083 * this stripe never got written at or beyond this stripe offset yet. */
1084 static void handle_short_read(int nob_read, size_t page_count,
1085 struct brw_page **pga)
1090 /* skip bytes read OK */
1091 while (nob_read > 0) {
1092 LASSERT (page_count > 0);
1094 if (pga[i]->count > nob_read) {
1095 /* EOF inside this page */
1096 ptr = kmap(pga[i]->pg) +
1097 (pga[i]->off & ~PAGE_MASK);
1098 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1105 nob_read -= pga[i]->count;
1110 /* zero remaining pages */
1111 while (page_count-- > 0) {
1112 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1113 memset(ptr, 0, pga[i]->count);
1119 static int check_write_rcs(struct ptlrpc_request *req,
1120 int requested_nob, int niocount,
1121 size_t page_count, struct brw_page **pga)
1126 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1127 sizeof(*remote_rcs) *
1129 if (remote_rcs == NULL) {
1130 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1134 /* return error if any niobuf was in error */
1135 for (i = 0; i < niocount; i++) {
1136 if ((int)remote_rcs[i] < 0) {
1137 CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1138 i, remote_rcs[i], req);
1139 return remote_rcs[i];
1142 if (remote_rcs[i] != 0) {
1143 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1144 i, remote_rcs[i], req);
1148 if (req->rq_bulk != NULL &&
1149 req->rq_bulk->bd_nob_transferred != requested_nob) {
1150 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1151 req->rq_bulk->bd_nob_transferred, requested_nob);
1158 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1160 if (p1->flag != p2->flag) {
1161 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1162 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1163 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC |
1164 OBD_BRW_SYS_RESOURCE);
1166 /* warn if we try to combine flags that we don't know to be
1167 * safe to combine */
1168 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1169 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1170 "report this at https://jira.whamcloud.com/\n",
1171 p1->flag, p2->flag);
1176 return (p1->off + p1->count == p2->off);
1179 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1180 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1181 size_t pg_count, struct brw_page **pga,
1182 int opc, obd_dif_csum_fn *fn,
1184 u32 *check_sum, bool resend)
1186 struct ahash_request *req;
1187 /* Used Adler as the default checksum type on top of DIF tags */
1188 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1189 struct page *__page;
1190 unsigned char *buffer;
1192 unsigned int bufsize;
1194 int used_number = 0;
1200 LASSERT(pg_count > 0);
1202 __page = alloc_page(GFP_KERNEL);
1206 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1209 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1210 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1214 buffer = kmap(__page);
1215 guard_start = (__u16 *)buffer;
1216 guard_number = PAGE_SIZE / sizeof(*guard_start);
1217 CDEBUG(D_PAGE | (resend ? D_HA : 0),
1218 "GRD tags per page=%u, resend=%u, bytes=%u, pages=%zu\n",
1219 guard_number, resend, nob, pg_count);
1221 while (nob > 0 && pg_count > 0) {
1222 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1224 /* corrupt the data before we compute the checksum, to
1225 * simulate an OST->client data error */
1226 if (unlikely(i == 0 && opc == OST_READ &&
1227 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1228 unsigned char *ptr = kmap(pga[i]->pg);
1229 int off = pga[i]->off & ~PAGE_MASK;
1231 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1236 * The left guard number should be able to hold checksums of a
1239 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1240 pga[i]->off & ~PAGE_MASK,
1242 guard_start + used_number,
1243 guard_number - used_number,
1246 if (unlikely(resend))
1247 CDEBUG(D_PAGE | D_HA,
1248 "pga[%u]: used %u off %llu+%u gen checksum: %*phN\n",
1249 i, used, pga[i]->off & ~PAGE_MASK, count,
1250 (int)(used * sizeof(*guard_start)),
1251 guard_start + used_number);
1255 used_number += used;
1256 if (used_number == guard_number) {
1257 cfs_crypto_hash_update_page(req, __page, 0,
1258 used_number * sizeof(*guard_start));
1262 nob -= pga[i]->count;
1270 if (used_number != 0)
1271 cfs_crypto_hash_update_page(req, __page, 0,
1272 used_number * sizeof(*guard_start));
1274 bufsize = sizeof(cksum);
1275 cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1277 /* For sending we only compute the wrong checksum instead
1278 * of corrupting the data so it is still correct on a redo */
1279 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1284 __free_page(__page);
1287 #else /* !CONFIG_CRC_T10DIF */
1288 #define obd_dif_ip_fn NULL
1289 #define obd_dif_crc_fn NULL
1290 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum, re) \
1292 #endif /* CONFIG_CRC_T10DIF */
1294 static int osc_checksum_bulk(int nob, size_t pg_count,
1295 struct brw_page **pga, int opc,
1296 enum cksum_types cksum_type,
1300 struct ahash_request *req;
1301 unsigned int bufsize;
1302 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1304 LASSERT(pg_count > 0);
1306 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1308 CERROR("Unable to initialize checksum hash %s\n",
1309 cfs_crypto_hash_name(cfs_alg));
1310 return PTR_ERR(req);
1313 while (nob > 0 && pg_count > 0) {
1314 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1316 /* corrupt the data before we compute the checksum, to
1317 * simulate an OST->client data error */
1318 if (i == 0 && opc == OST_READ &&
1319 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1320 unsigned char *ptr = kmap(pga[i]->pg);
1321 int off = pga[i]->off & ~PAGE_MASK;
1323 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1326 cfs_crypto_hash_update_page(req, pga[i]->pg,
1327 pga[i]->off & ~PAGE_MASK,
1329 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1330 (int)(pga[i]->off & ~PAGE_MASK));
1332 nob -= pga[i]->count;
1337 bufsize = sizeof(*cksum);
1338 cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1340 /* For sending we only compute the wrong checksum instead
1341 * of corrupting the data so it is still correct on a redo */
1342 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1348 static int osc_checksum_bulk_rw(const char *obd_name,
1349 enum cksum_types cksum_type,
1350 int nob, size_t pg_count,
1351 struct brw_page **pga, int opc,
1352 u32 *check_sum, bool resend)
1354 obd_dif_csum_fn *fn = NULL;
1355 int sector_size = 0;
1359 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1362 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1363 opc, fn, sector_size, check_sum,
1366 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1372 #ifdef CONFIG_LL_ENCRYPTION
1374 * osc_encrypt_pagecache_blocks() - overlay to llcrypt_encrypt_pagecache_blocks
1375 * @srcpage: The locked pagecache page containing the block(s) to encrypt
1376 * @dstpage: The page to put encryption result
1377 * @len: Total size of the block(s) to encrypt. Must be a nonzero
1378 * multiple of the filesystem's block size.
1379 * @offs: Byte offset within @page of the first block to encrypt. Must be
1380 * a multiple of the filesystem's block size.
1381 * @gfp_flags: Memory allocation flags
1383 * This overlay function is necessary to be able to provide our own bounce page.
1385 static struct page *osc_encrypt_pagecache_blocks(struct page *srcpage,
1386 struct page *dstpage,
1392 const struct inode *inode = srcpage->mapping->host;
1393 const unsigned int blockbits = inode->i_blkbits;
1394 const unsigned int blocksize = 1 << blockbits;
1395 u64 lblk_num = ((u64)srcpage->index << (PAGE_SHIFT - blockbits)) +
1396 (offs >> blockbits);
1400 if (unlikely(!dstpage))
1401 return llcrypt_encrypt_pagecache_blocks(srcpage, len, offs,
1404 if (WARN_ON_ONCE(!PageLocked(srcpage)))
1405 return ERR_PTR(-EINVAL);
1407 if (WARN_ON_ONCE(len <= 0 || !IS_ALIGNED(len | offs, blocksize)))
1408 return ERR_PTR(-EINVAL);
1410 /* Set PagePrivate2 for disambiguation in
1411 * osc_finalize_bounce_page().
1412 * It means cipher page was not allocated by llcrypt.
1414 SetPagePrivate2(dstpage);
1416 for (i = offs; i < offs + len; i += blocksize, lblk_num++) {
1417 err = llcrypt_encrypt_block(inode, srcpage, dstpage, blocksize,
1418 i, lblk_num, gfp_flags);
1420 return ERR_PTR(err);
1422 SetPagePrivate(dstpage);
1423 set_page_private(dstpage, (unsigned long)srcpage);
1428 * osc_finalize_bounce_page() - overlay to llcrypt_finalize_bounce_page
1430 * This overlay function is necessary to handle bounce pages
1431 * allocated by ourselves.
1433 static inline void osc_finalize_bounce_page(struct page **pagep)
1435 struct page *page = *pagep;
1437 /* PagePrivate2 was set in osc_encrypt_pagecache_blocks
1438 * to indicate the cipher page was allocated by ourselves.
1439 * So we must not free it via llcrypt.
1441 if (unlikely(!page || !PagePrivate2(page)))
1442 return llcrypt_finalize_bounce_page(pagep);
1444 if (llcrypt_is_bounce_page(page)) {
1445 *pagep = llcrypt_pagecache_page(page);
1446 ClearPagePrivate2(page);
1447 set_page_private(page, (unsigned long)NULL);
1448 ClearPagePrivate(page);
1451 #else /* !CONFIG_LL_ENCRYPTION */
1452 #define osc_encrypt_pagecache_blocks(srcpage, dstpage, len, offs, gfp_flags) \
1453 llcrypt_encrypt_pagecache_blocks(srcpage, len, offs, gfp_flags)
1454 #define osc_finalize_bounce_page(page) llcrypt_finalize_bounce_page(page)
1457 static inline void osc_release_bounce_pages(struct brw_page **pga,
1460 #ifdef HAVE_LUSTRE_CRYPTO
1461 struct page **pa = NULL;
1464 #ifdef CONFIG_LL_ENCRYPTION
1465 if (PageChecked(pga[0]->pg)) {
1466 OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
1472 for (i = 0; i < page_count; i++) {
1473 /* Bounce pages used by osc_encrypt_pagecache_blocks()
1474 * called from osc_brw_prep_request()
1475 * are identified thanks to the PageChecked flag.
1477 if (PageChecked(pga[i]->pg)) {
1479 pa[j++] = pga[i]->pg;
1480 osc_finalize_bounce_page(&pga[i]->pg);
1482 pga[i]->count -= pga[i]->bp_count_diff;
1483 pga[i]->off += pga[i]->bp_off_diff;
1487 sptlrpc_enc_pool_put_pages_array(pa, j);
1488 OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
1494 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1495 u32 page_count, struct brw_page **pga,
1496 struct ptlrpc_request **reqp, int resend)
1498 struct ptlrpc_request *req;
1499 struct ptlrpc_bulk_desc *desc;
1500 struct ost_body *body;
1501 struct obd_ioobj *ioobj;
1502 struct niobuf_remote *niobuf;
1503 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1504 struct osc_brw_async_args *aa;
1505 struct req_capsule *pill;
1506 struct brw_page *pg_prev;
1508 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1509 struct inode *inode = NULL;
1510 bool directio = false;
1512 bool enable_checksum = true;
1513 struct cl_page *clpage;
1517 clpage = oap2cl_page(brw_page2oap(pga[0]));
1518 inode = clpage->cp_inode;
1519 if (clpage->cp_type == CPT_TRANSIENT)
1522 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1523 RETURN(-ENOMEM); /* Recoverable */
1524 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1525 RETURN(-EINVAL); /* Fatal */
1527 if ((cmd & OBD_BRW_WRITE) != 0) {
1529 req = ptlrpc_request_alloc_pool(cli->cl_import,
1531 &RQF_OST_BRW_WRITE);
1534 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1539 if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode) &&
1540 llcrypt_has_encryption_key(inode)) {
1541 struct page **pa = NULL;
1543 #ifdef CONFIG_LL_ENCRYPTION
1544 OBD_ALLOC_PTR_ARRAY_LARGE(pa, page_count);
1546 ptlrpc_request_free(req);
1550 rc = sptlrpc_enc_pool_get_pages_array(pa, page_count);
1552 CDEBUG(D_SEC, "failed to allocate from enc pool: %d\n",
1554 ptlrpc_request_free(req);
1559 for (i = 0; i < page_count; i++) {
1560 struct brw_page *brwpg = pga[i];
1561 struct page *data_page = NULL;
1562 bool retried = false;
1563 bool lockedbymyself;
1564 u32 nunits = (brwpg->off & ~PAGE_MASK) + brwpg->count;
1565 struct address_space *map_orig = NULL;
1569 nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
1570 /* The page can already be locked when we arrive here.
1571 * This is possible when cl_page_assume/vvp_page_assume
1572 * is stuck on wait_on_page_writeback with page lock
1573 * held. In this case there is no risk for the lock to
1574 * be released while we are doing our encryption
1575 * processing, because writeback against that page will
1576 * end in vvp_page_completion_write/cl_page_completion,
1577 * which means only once the page is fully processed.
1579 lockedbymyself = trylock_page(brwpg->pg);
1581 map_orig = brwpg->pg->mapping;
1582 brwpg->pg->mapping = inode->i_mapping;
1583 index_orig = brwpg->pg->index;
1584 clpage = oap2cl_page(brw_page2oap(brwpg));
1585 brwpg->pg->index = clpage->cp_page_index;
1588 osc_encrypt_pagecache_blocks(brwpg->pg,
1593 brwpg->pg->mapping = map_orig;
1594 brwpg->pg->index = index_orig;
1597 unlock_page(brwpg->pg);
1598 if (IS_ERR(data_page)) {
1599 rc = PTR_ERR(data_page);
1600 if (rc == -ENOMEM && !retried) {
1606 sptlrpc_enc_pool_put_pages_array(pa + i,
1608 OBD_FREE_PTR_ARRAY_LARGE(pa,
1611 ptlrpc_request_free(req);
1614 /* Set PageChecked flag on bounce page for
1615 * disambiguation in osc_release_bounce_pages().
1617 SetPageChecked(data_page);
1618 brwpg->pg = data_page;
1619 /* there should be no gap in the middle of page array */
1620 if (i == page_count - 1) {
1621 struct osc_async_page *oap =
1622 brw_page2oap(brwpg);
1624 oa->o_size = oap->oap_count +
1625 oap->oap_obj_off + oap->oap_page_off;
1627 /* len is forced to nunits, and relative offset to 0
1628 * so store the old, clear text info
1630 brwpg->bp_count_diff = nunits - brwpg->count;
1631 brwpg->count = nunits;
1632 brwpg->bp_off_diff = brwpg->off & ~PAGE_MASK;
1633 brwpg->off = brwpg->off & PAGE_MASK;
1637 OBD_FREE_PTR_ARRAY_LARGE(pa, page_count);
1638 } else if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1639 struct osc_async_page *oap = brw_page2oap(pga[0]);
1640 struct cl_page *clpage = oap2cl_page(oap);
1641 struct cl_object *clobj = clpage->cp_obj;
1642 struct cl_attr attr = { 0 };
1646 env = cl_env_get(&refcheck);
1649 ptlrpc_request_free(req);
1653 cl_object_attr_lock(clobj);
1654 rc = cl_object_attr_get(env, clobj, &attr);
1655 cl_object_attr_unlock(clobj);
1656 cl_env_put(env, &refcheck);
1658 ptlrpc_request_free(req);
1662 oa->o_size = attr.cat_size;
1663 } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode) &&
1664 llcrypt_has_encryption_key(inode)) {
1665 for (i = 0; i < page_count; i++) {
1666 struct brw_page *pg = pga[i];
1667 u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1669 nunits = round_up(nunits, LUSTRE_ENCRYPTION_UNIT_SIZE);
1670 /* count/off are forced to cover the whole encryption
1671 * unit size so that all encrypted data is stored on the
1672 * OST, so adjust bp_{count,off}_diff for the size of
1675 pg->bp_count_diff = nunits - pg->count;
1677 pg->bp_off_diff = pg->off & ~PAGE_MASK;
1678 pg->off = pg->off & PAGE_MASK;
1682 for (niocount = i = 1; i < page_count; i++) {
1683 if (!can_merge_pages(pga[i - 1], pga[i]))
1687 pill = &req->rq_pill;
1688 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1690 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1691 niocount * sizeof(*niobuf));
1693 for (i = 0; i < page_count; i++) {
1694 short_io_size += pga[i]->count;
1695 if (!inode || !IS_ENCRYPTED(inode) ||
1696 !llcrypt_has_encryption_key(inode)) {
1697 pga[i]->bp_count_diff = 0;
1698 pga[i]->bp_off_diff = 0;
1702 if (brw_page2oap(pga[0])->oap_brw_flags & OBD_BRW_RDMA_ONLY) {
1703 enable_checksum = false;
1708 /* Check if read/write is small enough to be a short io. */
1709 if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1710 !imp_connect_shortio(cli->cl_import))
1713 /* If this is an empty RPC to old server, just ignore it */
1714 if (!short_io_size && !pga[0]->pg) {
1715 ptlrpc_request_free(req);
1719 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1720 opc == OST_READ ? 0 : short_io_size);
1721 if (opc == OST_READ)
1722 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1725 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1727 ptlrpc_request_free(req);
1730 osc_set_io_portal(req);
1732 ptlrpc_at_set_req_timeout(req);
1733 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1735 req->rq_no_retry_einprogress = 1;
1737 if (short_io_size != 0) {
1739 short_io_buf = NULL;
1743 desc = ptlrpc_prep_bulk_imp(req, page_count,
1744 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1745 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1746 PTLRPC_BULK_PUT_SINK),
1748 &ptlrpc_bulk_kiov_pin_ops);
1751 GOTO(out, rc = -ENOMEM);
1752 /* NB request now owns desc and will free it when it gets freed */
1753 desc->bd_is_rdma = gpu;
1755 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1756 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1757 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1758 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1760 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1762 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1763 * and from_kgid(), because they are asynchronous. Fortunately, variable
1764 * oa contains valid o_uid and o_gid in these two operations.
1765 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1766 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1767 * other process logic */
1768 body->oa.o_uid = oa->o_uid;
1769 body->oa.o_gid = oa->o_gid;
1771 obdo_to_ioobj(oa, ioobj);
1772 ioobj->ioo_bufcnt = niocount;
1773 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1774 * that might be send for this request. The actual number is decided
1775 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1776 * "max - 1" for old client compatibility sending "0", and also so the
1777 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1779 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1781 ioobj_max_brw_set(ioobj, 0);
1783 if (short_io_size != 0) {
1784 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1785 body->oa.o_valid |= OBD_MD_FLFLAGS;
1786 body->oa.o_flags = 0;
1788 body->oa.o_flags |= OBD_FL_SHORT_IO;
1789 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1791 if (opc == OST_WRITE) {
1792 short_io_buf = req_capsule_client_get(pill,
1794 LASSERT(short_io_buf != NULL);
1798 LASSERT(page_count > 0);
1800 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1801 struct brw_page *pg = pga[i];
1802 int poff = pg->off & ~PAGE_MASK;
1804 LASSERT(pg->count > 0);
1805 /* make sure there is no gap in the middle of page array */
1806 LASSERTF(page_count == 1 ||
1807 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1808 ergo(i > 0 && i < page_count - 1,
1809 poff == 0 && pg->count == PAGE_SIZE) &&
1810 ergo(i == page_count - 1, poff == 0)),
1811 "i: %d/%d pg: %p off: %llu, count: %u\n",
1812 i, page_count, pg, pg->off, pg->count);
1813 LASSERTF(i == 0 || pg->off > pg_prev->off,
1814 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1815 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1817 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1818 pg_prev->pg, page_private(pg_prev->pg),
1819 pg_prev->pg->index, pg_prev->off);
1820 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1821 (pg->flag & OBD_BRW_SRVLOCK));
1822 if (short_io_size != 0 && opc == OST_WRITE) {
1823 unsigned char *ptr = kmap_atomic(pg->pg);
1825 LASSERT(short_io_size >= requested_nob + pg->count);
1826 memcpy(short_io_buf + requested_nob,
1830 } else if (short_io_size == 0) {
1831 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1834 requested_nob += pg->count;
1836 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1838 niobuf->rnb_len += pg->count;
1840 niobuf->rnb_offset = pg->off;
1841 niobuf->rnb_len = pg->count;
1842 niobuf->rnb_flags = pg->flag;
1847 LASSERTF((void *)(niobuf - niocount) ==
1848 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1849 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1850 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1852 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1854 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1855 body->oa.o_valid |= OBD_MD_FLFLAGS;
1856 body->oa.o_flags = 0;
1858 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1861 if (osc_should_shrink_grant(cli))
1862 osc_shrink_grant_local(cli, &body->oa);
1864 if (!cli->cl_checksum || sptlrpc_flavor_has_bulk(&req->rq_flvr))
1865 enable_checksum = false;
1867 /* size[REQ_REC_OFF] still sizeof (*body) */
1868 if (opc == OST_WRITE) {
1869 if (enable_checksum) {
1870 /* store cl_cksum_type in a local variable since
1871 * it can be changed via lprocfs */
1872 enum cksum_types cksum_type = cli->cl_cksum_type;
1874 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1875 body->oa.o_flags = 0;
1877 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1879 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1881 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1882 requested_nob, page_count,
1884 &body->oa.o_cksum, resend);
1886 CDEBUG(D_PAGE, "failed to checksum: rc = %d\n",
1890 CDEBUG(D_PAGE | (resend ? D_HA : 0),
1891 "checksum at write origin: %x (%x)\n",
1892 body->oa.o_cksum, cksum_type);
1894 /* save this in 'oa', too, for later checking */
1895 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1896 oa->o_flags |= obd_cksum_type_pack(obd_name,
1899 /* clear out the checksum flag, in case this is a
1900 * resend but cl_checksum is no longer set. b=11238 */
1901 oa->o_valid &= ~OBD_MD_FLCKSUM;
1903 oa->o_cksum = body->oa.o_cksum;
1904 /* 1 RC per niobuf */
1905 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1906 sizeof(__u32) * niocount);
1908 if (enable_checksum) {
1909 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1910 body->oa.o_flags = 0;
1911 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1912 cli->cl_cksum_type);
1913 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1916 /* Client cksum has been already copied to wire obdo in previous
1917 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1918 * resent due to cksum error, this will allow Server to
1919 * check+dump pages on its side */
1921 ptlrpc_request_set_replen(req);
1923 aa = ptlrpc_req_async_args(aa, req);
1925 aa->aa_requested_nob = requested_nob;
1926 aa->aa_nio_count = niocount;
1927 aa->aa_page_count = page_count;
1931 INIT_LIST_HEAD(&aa->aa_oaps);
1934 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1935 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1936 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1937 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1941 ptlrpc_req_finished(req);
1945 char dbgcksum_file_name[PATH_MAX];
1947 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1948 struct brw_page **pga, __u32 server_cksum,
1956 /* will only keep dump of pages on first error for the same range in
1957 * file/fid, not during the resends/retries. */
1958 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1959 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1960 (strncmp(libcfs_debug_file_path, "NONE", 4) != 0 ?
1961 libcfs_debug_file_path : LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1962 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1963 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1964 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1966 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1967 client_cksum, server_cksum);
1968 CWARN("dumping checksum data to %s\n", dbgcksum_file_name);
1969 filp = filp_open(dbgcksum_file_name,
1970 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1974 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1975 "checksum error: rc = %d\n", dbgcksum_file_name,
1978 CERROR("%s: can't open to dump pages with checksum "
1979 "error: rc = %d\n", dbgcksum_file_name, rc);
1983 for (i = 0; i < page_count; i++) {
1984 len = pga[i]->count;
1985 buf = kmap(pga[i]->pg);
1987 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1989 CERROR("%s: wanted to write %u but got %d "
1990 "error\n", dbgcksum_file_name, len, rc);
1999 rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
2001 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
2002 filp_close(filp, NULL);
2004 libcfs_debug_dumplog();
2008 check_write_checksum(struct obdo *oa, const struct lnet_processid *peer,
2009 __u32 client_cksum, __u32 server_cksum,
2010 struct osc_brw_async_args *aa)
2012 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
2013 enum cksum_types cksum_type;
2014 obd_dif_csum_fn *fn = NULL;
2015 int sector_size = 0;
2020 if (server_cksum == client_cksum) {
2021 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2025 if (aa->aa_cli->cl_checksum_dump)
2026 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
2027 server_cksum, client_cksum);
2029 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
2032 switch (cksum_type) {
2033 case OBD_CKSUM_T10IP512:
2037 case OBD_CKSUM_T10IP4K:
2041 case OBD_CKSUM_T10CRC512:
2042 fn = obd_dif_crc_fn;
2045 case OBD_CKSUM_T10CRC4K:
2046 fn = obd_dif_crc_fn;
2054 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
2055 aa->aa_page_count, aa->aa_ppga,
2056 OST_WRITE, fn, sector_size,
2059 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
2060 aa->aa_ppga, OST_WRITE, cksum_type,
2064 msg = "failed to calculate the client write checksum";
2065 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
2066 msg = "the server did not use the checksum type specified in "
2067 "the original request - likely a protocol problem";
2068 else if (new_cksum == server_cksum)
2069 msg = "changed on the client after we checksummed it - "
2070 "likely false positive due to mmap IO (bug 11742)";
2071 else if (new_cksum == client_cksum)
2072 msg = "changed in transit before arrival at OST";
2074 msg = "changed in transit AND doesn't match the original - "
2075 "likely false positive due to mmap IO (bug 11742)";
2077 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
2078 DFID " object "DOSTID" extent [%llu-%llu], original "
2079 "client csum %x (type %x), server csum %x (type %x),"
2080 " client csum now %x\n",
2081 obd_name, msg, libcfs_nidstr(&peer->nid),
2082 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
2083 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
2084 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
2085 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
2086 aa->aa_ppga[aa->aa_page_count - 1]->off +
2087 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
2089 obd_cksum_type_unpack(aa->aa_oa->o_flags),
2090 server_cksum, cksum_type, new_cksum);
2094 /* Note rc enters this function as number of bytes transferred */
2095 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
2097 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
2098 struct client_obd *cli = aa->aa_cli;
2099 const char *obd_name = cli->cl_import->imp_obd->obd_name;
2100 const struct lnet_processid *peer =
2101 &req->rq_import->imp_connection->c_peer;
2102 struct ost_body *body;
2103 u32 client_cksum = 0;
2104 struct inode *inode = NULL;
2105 unsigned int blockbits = 0, blocksize = 0;
2106 struct cl_page *clpage;
2110 if (rc < 0 && rc != -EDQUOT) {
2111 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
2115 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
2116 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
2118 DEBUG_REQ(D_INFO, req, "cannot unpack body");
2122 /* set/clear over quota flag for a uid/gid/projid */
2123 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
2124 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
2125 unsigned qid[LL_MAXQUOTAS] = {
2126 body->oa.o_uid, body->oa.o_gid,
2127 body->oa.o_projid };
2129 "setdq for [%u %u %u] with valid %#llx, flags %x\n",
2130 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
2131 body->oa.o_valid, body->oa.o_flags);
2132 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
2136 osc_update_grant(cli, body);
2141 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
2142 client_cksum = aa->aa_oa->o_cksum; /* save for later */
2144 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2146 CERROR("%s: unexpected positive size %d\n",
2151 if (req->rq_bulk != NULL &&
2152 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
2155 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
2156 check_write_checksum(&body->oa, peer, client_cksum,
2157 body->oa.o_cksum, aa))
2160 rc = check_write_rcs(req, aa->aa_requested_nob,
2161 aa->aa_nio_count, aa->aa_page_count,
2166 /* The rest of this function executes only for OST_READs */
2168 if (req->rq_bulk == NULL) {
2169 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
2171 LASSERT(rc == req->rq_status);
2173 /* if unwrap_bulk failed, return -EAGAIN to retry */
2174 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
2177 GOTO(out, rc = -EAGAIN);
2179 if (rc > aa->aa_requested_nob) {
2180 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
2181 rc, aa->aa_requested_nob);
2185 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2186 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2187 rc, req->rq_bulk->bd_nob_transferred);
2191 if (req->rq_bulk == NULL) {
2193 int nob, pg_count, i = 0;
2196 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2197 pg_count = aa->aa_page_count;
2198 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2201 while (nob > 0 && pg_count > 0) {
2203 int count = aa->aa_ppga[i]->count > nob ?
2204 nob : aa->aa_ppga[i]->count;
2206 CDEBUG(D_CACHE, "page %p count %d\n",
2207 aa->aa_ppga[i]->pg, count);
2208 ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2209 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2211 kunmap_atomic((void *) ptr);
2220 if (rc < aa->aa_requested_nob)
2221 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2223 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2224 static int cksum_counter;
2225 u32 server_cksum = body->oa.o_cksum;
2229 enum cksum_types cksum_type;
2230 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2231 body->oa.o_flags : 0;
2233 cksum_type = obd_cksum_type_unpack(o_flags);
2234 rc = osc_checksum_bulk_rw(obd_name, cksum_type, nob,
2235 aa->aa_page_count, aa->aa_ppga,
2236 OST_READ, &client_cksum, false);
2240 if (req->rq_bulk != NULL &&
2241 !nid_same(&peer->nid, &req->rq_bulk->bd_sender)) {
2243 router = libcfs_nidstr(&req->rq_bulk->bd_sender);
2246 if (server_cksum != client_cksum) {
2247 struct ost_body *clbody;
2248 __u32 client_cksum2;
2249 u32 page_count = aa->aa_page_count;
2251 osc_checksum_bulk_rw(obd_name, cksum_type, nob,
2252 page_count, aa->aa_ppga,
2253 OST_READ, &client_cksum2, true);
2254 clbody = req_capsule_client_get(&req->rq_pill,
2256 if (cli->cl_checksum_dump)
2257 dump_all_bulk_pages(&clbody->oa, page_count,
2258 aa->aa_ppga, server_cksum,
2261 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2262 "%s%s%s inode "DFID" object "DOSTID
2263 " extent [%llu-%llu], client %x/%x, "
2264 "server %x, cksum_type %x\n",
2266 libcfs_nidstr(&peer->nid),
2268 clbody->oa.o_valid & OBD_MD_FLFID ?
2269 clbody->oa.o_parent_seq : 0ULL,
2270 clbody->oa.o_valid & OBD_MD_FLFID ?
2271 clbody->oa.o_parent_oid : 0,
2272 clbody->oa.o_valid & OBD_MD_FLFID ?
2273 clbody->oa.o_parent_ver : 0,
2274 POSTID(&body->oa.o_oi),
2275 aa->aa_ppga[0]->off,
2276 aa->aa_ppga[page_count-1]->off +
2277 aa->aa_ppga[page_count-1]->count - 1,
2278 client_cksum, client_cksum2,
2279 server_cksum, cksum_type);
2281 aa->aa_oa->o_cksum = client_cksum;
2285 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2288 } else if (unlikely(client_cksum)) {
2289 static int cksum_missed;
2292 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2293 CERROR("%s: checksum %u requested from %s but not sent\n",
2294 obd_name, cksum_missed,
2295 libcfs_nidstr(&peer->nid));
2300 /* get the inode from the first cl_page */
2301 clpage = oap2cl_page(brw_page2oap(aa->aa_ppga[0]));
2302 inode = clpage->cp_inode;
2303 if (clpage->cp_type == CPT_TRANSIENT && inode) {
2304 blockbits = inode->i_blkbits;
2305 blocksize = 1 << blockbits;
2307 if (inode && IS_ENCRYPTED(inode)) {
2310 if (!llcrypt_has_encryption_key(inode)) {
2311 CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2314 for (idx = 0; idx < aa->aa_page_count; idx++) {
2315 struct brw_page *brwpg = aa->aa_ppga[idx];
2316 unsigned int offs = 0;
2318 while (offs < PAGE_SIZE) {
2319 /* do not decrypt if page is all 0s */
2320 if (memchr_inv(page_address(brwpg->pg) + offs,
2321 0, LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2322 /* if page is empty forward info to
2323 * upper layers (ll_io_zero_page) by
2324 * clearing PagePrivate2
2327 ClearPagePrivate2(brwpg->pg);
2332 /* This is direct IO case. Directly call
2333 * decrypt function that takes inode as
2334 * input parameter. Page does not need
2341 oap2cl_page(brw_page2oap(brwpg));
2343 ((u64)(clpage->cp_page_index) <<
2344 (PAGE_SHIFT - blockbits)) +
2345 (offs >> blockbits);
2348 LUSTRE_ENCRYPTION_UNIT_SIZE;
2349 i += blocksize, lblk_num++) {
2351 llcrypt_decrypt_block_inplace(
2359 rc = llcrypt_decrypt_pagecache_blocks(
2361 LUSTRE_ENCRYPTION_UNIT_SIZE,
2367 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2374 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2375 aa->aa_oa, &body->oa);
2380 static int osc_brw_redo_request(struct ptlrpc_request *request,
2381 struct osc_brw_async_args *aa, int rc)
2383 struct ptlrpc_request *new_req;
2384 struct osc_brw_async_args *new_aa;
2385 struct osc_async_page *oap;
2388 /* The below message is checked in replay-ost-single.sh test_8ae*/
2389 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2390 "redo for recoverable error %d", rc);
2392 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2393 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2394 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2395 aa->aa_ppga, &new_req, 1);
2399 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2400 if (oap->oap_request != NULL) {
2401 LASSERTF(request == oap->oap_request,
2402 "request %p != oap_request %p\n",
2403 request, oap->oap_request);
2407 * New request takes over pga and oaps from old request.
2408 * Note that copying a list_head doesn't work, need to move it...
2411 new_req->rq_interpret_reply = request->rq_interpret_reply;
2412 new_req->rq_async_args = request->rq_async_args;
2413 new_req->rq_commit_cb = request->rq_commit_cb;
2414 /* cap resend delay to the current request timeout, this is similar to
2415 * what ptlrpc does (see after_reply()) */
2416 if (aa->aa_resends > new_req->rq_timeout)
2417 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2419 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2420 new_req->rq_generation_set = 1;
2421 new_req->rq_import_generation = request->rq_import_generation;
2423 new_aa = ptlrpc_req_async_args(new_aa, new_req);
2425 INIT_LIST_HEAD(&new_aa->aa_oaps);
2426 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2427 INIT_LIST_HEAD(&new_aa->aa_exts);
2428 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2429 new_aa->aa_resends = aa->aa_resends;
2431 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2432 if (oap->oap_request) {
2433 ptlrpc_req_finished(oap->oap_request);
2434 oap->oap_request = ptlrpc_request_addref(new_req);
2438 /* XXX: This code will run into problem if we're going to support
2439 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2440 * and wait for all of them to be finished. We should inherit request
2441 * set from old request. */
2442 ptlrpcd_add_req(new_req);
2444 DEBUG_REQ(D_INFO, new_req, "new request");
2449 * ugh, we want disk allocation on the target to happen in offset order. we'll
2450 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2451 * fine for our small page arrays and doesn't require allocation. its an
2452 * insertion sort that swaps elements that are strides apart, shrinking the
2453 * stride down until its '1' and the array is sorted.
2455 static void sort_brw_pages(struct brw_page **array, int num)
2458 struct brw_page *tmp;
2462 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2467 for (i = stride ; i < num ; i++) {
2470 while (j >= stride && array[j - stride]->off > tmp->off) {
2471 array[j] = array[j - stride];
2476 } while (stride > 1);
2479 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2481 LASSERT(ppga != NULL);
2482 OBD_FREE_PTR_ARRAY_LARGE(ppga, count);
2485 static int brw_interpret(const struct lu_env *env,
2486 struct ptlrpc_request *req, void *args, int rc)
2488 struct osc_brw_async_args *aa = args;
2489 struct osc_extent *ext;
2490 struct osc_extent *tmp;
2491 struct client_obd *cli = aa->aa_cli;
2492 unsigned long transferred = 0;
2496 rc = osc_brw_fini_request(req, rc);
2497 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2499 /* restore clear text pages */
2500 osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2503 * When server returns -EINPROGRESS, client should always retry
2504 * regardless of the number of times the bulk was resent already.
2506 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2507 if (req->rq_import_generation !=
2508 req->rq_import->imp_generation) {
2509 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2510 ""DOSTID", rc = %d.\n",
2511 req->rq_import->imp_obd->obd_name,
2512 POSTID(&aa->aa_oa->o_oi), rc);
2513 } else if (rc == -EINPROGRESS ||
2514 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2515 rc = osc_brw_redo_request(req, aa, rc);
2517 CERROR("%s: too many resent retries for object: "
2518 "%llu:%llu, rc = %d.\n",
2519 req->rq_import->imp_obd->obd_name,
2520 POSTID(&aa->aa_oa->o_oi), rc);
2525 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2530 struct obdo *oa = aa->aa_oa;
2531 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2532 unsigned long valid = 0;
2533 struct cl_object *obj;
2534 struct osc_async_page *last;
2536 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2537 obj = osc2cl(last->oap_obj);
2539 cl_object_attr_lock(obj);
2540 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2541 attr->cat_blocks = oa->o_blocks;
2542 valid |= CAT_BLOCKS;
2544 if (oa->o_valid & OBD_MD_FLMTIME) {
2545 attr->cat_mtime = oa->o_mtime;
2548 if (oa->o_valid & OBD_MD_FLATIME) {
2549 attr->cat_atime = oa->o_atime;
2552 if (oa->o_valid & OBD_MD_FLCTIME) {
2553 attr->cat_ctime = oa->o_ctime;
2557 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2558 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2559 loff_t last_off = last->oap_count + last->oap_obj_off +
2562 /* Change file size if this is an out of quota or
2563 * direct IO write and it extends the file size */
2564 if (loi->loi_lvb.lvb_size < last_off) {
2565 attr->cat_size = last_off;
2568 /* Extend KMS if it's not a lockless write */
2569 if (loi->loi_kms < last_off &&
2570 oap2osc_page(last)->ops_srvlock == 0) {
2571 attr->cat_kms = last_off;
2577 cl_object_attr_update(env, obj, attr, valid);
2578 cl_object_attr_unlock(obj);
2580 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2583 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2584 osc_inc_unstable_pages(req);
2586 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2587 list_del_init(&ext->oe_link);
2588 osc_extent_finish(env, ext, 1,
2589 rc && req->rq_no_delay ? -EAGAIN : rc);
2591 LASSERT(list_empty(&aa->aa_exts));
2592 LASSERT(list_empty(&aa->aa_oaps));
2594 transferred = (req->rq_bulk == NULL ? /* short io */
2595 aa->aa_requested_nob :
2596 req->rq_bulk->bd_nob_transferred);
2598 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2599 ptlrpc_lprocfs_brw(req, transferred);
2601 spin_lock(&cli->cl_loi_list_lock);
2602 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2603 * is called so we know whether to go to sync BRWs or wait for more
2604 * RPCs to complete */
2605 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2606 cli->cl_w_in_flight--;
2608 cli->cl_r_in_flight--;
2609 osc_wake_cache_waiters(cli);
2610 spin_unlock(&cli->cl_loi_list_lock);
2612 osc_io_unplug(env, cli, NULL);
2616 static void brw_commit(struct ptlrpc_request *req)
2618 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2619 * this called via the rq_commit_cb, I need to ensure
2620 * osc_dec_unstable_pages is still called. Otherwise unstable
2621 * pages may be leaked. */
2622 spin_lock(&req->rq_lock);
2623 if (likely(req->rq_unstable)) {
2624 req->rq_unstable = 0;
2625 spin_unlock(&req->rq_lock);
2627 osc_dec_unstable_pages(req);
2629 req->rq_committed = 1;
2630 spin_unlock(&req->rq_lock);
2635 * Build an RPC by the list of extent @ext_list. The caller must ensure
2636 * that the total pages in this list are NOT over max pages per RPC.
2637 * Extents in the list must be in OES_RPC state.
2639 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2640 struct list_head *ext_list, int cmd)
2642 struct ptlrpc_request *req = NULL;
2643 struct osc_extent *ext;
2644 struct brw_page **pga = NULL;
2645 struct osc_brw_async_args *aa = NULL;
2646 struct obdo *oa = NULL;
2647 struct osc_async_page *oap;
2648 struct osc_object *obj = NULL;
2649 struct cl_req_attr *crattr = NULL;
2650 loff_t starting_offset = OBD_OBJECT_EOF;
2651 loff_t ending_offset = 0;
2652 /* '1' for consistency with code that checks !mpflag to restore */
2656 bool soft_sync = false;
2657 bool ndelay = false;
2661 __u32 layout_version = 0;
2662 LIST_HEAD(rpc_list);
2663 struct ost_body *body;
2665 LASSERT(!list_empty(ext_list));
2667 /* add pages into rpc_list to build BRW rpc */
2668 list_for_each_entry(ext, ext_list, oe_link) {
2669 LASSERT(ext->oe_state == OES_RPC);
2670 mem_tight |= ext->oe_memalloc;
2671 grant += ext->oe_grants;
2672 page_count += ext->oe_nr_pages;
2673 layout_version = max(layout_version, ext->oe_layout_version);
2678 soft_sync = osc_over_unstable_soft_limit(cli);
2680 mpflag = memalloc_noreclaim_save();
2682 OBD_ALLOC_PTR_ARRAY_LARGE(pga, page_count);
2684 GOTO(out, rc = -ENOMEM);
2686 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2688 GOTO(out, rc = -ENOMEM);
2691 list_for_each_entry(ext, ext_list, oe_link) {
2692 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2694 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2696 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2697 pga[i] = &oap->oap_brw_page;
2698 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2701 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2702 if (starting_offset == OBD_OBJECT_EOF ||
2703 starting_offset > oap->oap_obj_off)
2704 starting_offset = oap->oap_obj_off;
2706 LASSERT(oap->oap_page_off == 0);
2707 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2708 ending_offset = oap->oap_obj_off +
2711 LASSERT(oap->oap_page_off + oap->oap_count ==
2718 /* first page in the list */
2719 oap = list_first_entry(&rpc_list, typeof(*oap), oap_rpc_item);
2721 crattr = &osc_env_info(env)->oti_req_attr;
2722 memset(crattr, 0, sizeof(*crattr));
2723 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2724 crattr->cra_flags = ~0ULL;
2725 crattr->cra_page = oap2cl_page(oap);
2726 crattr->cra_oa = oa;
2727 cl_req_attr_set(env, osc2cl(obj), crattr);
2729 if (cmd == OBD_BRW_WRITE) {
2730 oa->o_grant_used = grant;
2731 if (layout_version > 0) {
2732 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2733 PFID(&oa->o_oi.oi_fid), layout_version);
2735 oa->o_layout_version = layout_version;
2736 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2740 sort_brw_pages(pga, page_count);
2741 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2743 CERROR("prep_req failed: %d\n", rc);
2747 req->rq_commit_cb = brw_commit;
2748 req->rq_interpret_reply = brw_interpret;
2749 req->rq_memalloc = mem_tight != 0;
2750 oap->oap_request = ptlrpc_request_addref(req);
2752 req->rq_no_resend = req->rq_no_delay = 1;
2753 /* probably set a shorter timeout value.
2754 * to handle ETIMEDOUT in brw_interpret() correctly. */
2755 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2758 /* Need to update the timestamps after the request is built in case
2759 * we race with setattr (locally or in queue at OST). If OST gets
2760 * later setattr before earlier BRW (as determined by the request xid),
2761 * the OST will not use BRW timestamps. Sadly, there is no obvious
2762 * way to do this in a single call. bug 10150 */
2763 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2764 crattr->cra_oa = &body->oa;
2765 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2766 cl_req_attr_set(env, osc2cl(obj), crattr);
2767 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2769 aa = ptlrpc_req_async_args(aa, req);
2770 INIT_LIST_HEAD(&aa->aa_oaps);
2771 list_splice_init(&rpc_list, &aa->aa_oaps);
2772 INIT_LIST_HEAD(&aa->aa_exts);
2773 list_splice_init(ext_list, &aa->aa_exts);
2775 spin_lock(&cli->cl_loi_list_lock);
2776 starting_offset >>= PAGE_SHIFT;
2777 ending_offset >>= PAGE_SHIFT;
2778 if (cmd == OBD_BRW_READ) {
2779 cli->cl_r_in_flight++;
2780 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2781 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2782 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2783 starting_offset + 1);
2785 cli->cl_w_in_flight++;
2786 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2787 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2788 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2789 starting_offset + 1);
2791 spin_unlock(&cli->cl_loi_list_lock);
2793 DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2794 page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2795 if (libcfs_debug & D_IOTRACE) {
2798 fid.f_seq = crattr->cra_oa->o_parent_seq;
2799 fid.f_oid = crattr->cra_oa->o_parent_oid;
2800 fid.f_ver = crattr->cra_oa->o_parent_ver;
2802 DFID": %d %s pages, start %lld, end %lld, now %ur/%uw in flight\n",
2803 PFID(&fid), page_count,
2804 cmd == OBD_BRW_READ ? "read" : "write", starting_offset,
2805 ending_offset, cli->cl_r_in_flight, cli->cl_w_in_flight);
2807 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2809 ptlrpcd_add_req(req);
2815 memalloc_noreclaim_restore(mpflag);
2818 LASSERT(req == NULL);
2821 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2823 osc_release_bounce_pages(pga, page_count);
2824 osc_release_ppga(pga, page_count);
2826 /* this should happen rarely and is pretty bad, it makes the
2827 * pending list not follow the dirty order
2829 while ((ext = list_first_entry_or_null(ext_list,
2831 oe_link)) != NULL) {
2832 list_del_init(&ext->oe_link);
2833 osc_extent_finish(env, ext, 0, rc);
2839 /* This is to refresh our lock in face of no RPCs. */
2840 void osc_send_empty_rpc(struct osc_object *osc, pgoff_t start)
2842 struct ptlrpc_request *req;
2844 struct brw_page bpg = { .off = start, .count = 1};
2845 struct brw_page *pga = &bpg;
2848 memset(&oa, 0, sizeof(oa));
2849 oa.o_oi = osc->oo_oinfo->loi_oi;
2850 oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
2851 /* For updated servers - don't do a read */
2852 oa.o_flags = OBD_FL_NORPC;
2854 rc = osc_brw_prep_request(OBD_BRW_READ, osc_cli(osc), &oa, 1, &pga,
2857 /* If we succeeded we ship it off, if not there's no point in doing
2858 * anything. Also no resends.
2859 * No interpret callback, no commit callback.
2862 req->rq_no_resend = 1;
2863 ptlrpcd_add_req(req);
2867 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2871 LASSERT(lock != NULL);
2873 lock_res_and_lock(lock);
2875 if (lock->l_ast_data == NULL)
2876 lock->l_ast_data = data;
2877 if (lock->l_ast_data == data)
2880 unlock_res_and_lock(lock);
2885 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2886 void *cookie, struct lustre_handle *lockh,
2887 enum ldlm_mode mode, __u64 *flags, bool speculative,
2890 bool intent = *flags & LDLM_FL_HAS_INTENT;
2894 /* The request was created before ldlm_cli_enqueue call. */
2895 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2896 struct ldlm_reply *rep;
2898 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2899 LASSERT(rep != NULL);
2901 rep->lock_policy_res1 =
2902 ptlrpc_status_ntoh(rep->lock_policy_res1);
2903 if (rep->lock_policy_res1)
2904 errcode = rep->lock_policy_res1;
2906 *flags |= LDLM_FL_LVB_READY;
2907 } else if (errcode == ELDLM_OK) {
2908 *flags |= LDLM_FL_LVB_READY;
2911 /* Call the update callback. */
2912 rc = (*upcall)(cookie, lockh, errcode);
2914 /* release the reference taken in ldlm_cli_enqueue() */
2915 if (errcode == ELDLM_LOCK_MATCHED)
2917 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2918 ldlm_lock_decref(lockh, mode);
2923 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2926 struct osc_enqueue_args *aa = args;
2927 struct ldlm_lock *lock;
2928 struct lustre_handle *lockh = &aa->oa_lockh;
2929 enum ldlm_mode mode = aa->oa_mode;
2930 struct ost_lvb *lvb = aa->oa_lvb;
2931 __u32 lvb_len = sizeof(*lvb);
2933 struct ldlm_enqueue_info einfo = {
2934 .ei_type = aa->oa_type,
2940 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2942 lock = ldlm_handle2lock(lockh);
2943 LASSERTF(lock != NULL,
2944 "lockh %#llx, req %p, aa %p - client evicted?\n",
2945 lockh->cookie, req, aa);
2947 /* Take an additional reference so that a blocking AST that
2948 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2949 * to arrive after an upcall has been executed by
2950 * osc_enqueue_fini(). */
2951 ldlm_lock_addref(lockh, mode);
2953 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2954 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2956 /* Let CP AST to grant the lock first. */
2957 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2959 if (aa->oa_speculative) {
2960 LASSERT(aa->oa_lvb == NULL);
2961 LASSERT(aa->oa_flags == NULL);
2962 aa->oa_flags = &flags;
2965 /* Complete obtaining the lock procedure. */
2966 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
2967 lvb, lvb_len, lockh, rc, false);
2968 /* Complete osc stuff. */
2969 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2970 aa->oa_flags, aa->oa_speculative, rc);
2972 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2974 ldlm_lock_decref(lockh, mode);
2975 LDLM_LOCK_PUT(lock);
2979 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2980 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2981 * other synchronous requests, however keeping some locks and trying to obtain
2982 * others may take a considerable amount of time in a case of ost failure; and
2983 * when other sync requests do not get released lock from a client, the client
2984 * is evicted from the cluster -- such scenarious make the life difficult, so
2985 * release locks just after they are obtained. */
2986 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2987 __u64 *flags, union ldlm_policy_data *policy,
2988 struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2989 void *cookie, struct ldlm_enqueue_info *einfo,
2990 struct ptlrpc_request_set *rqset, int async,
2993 struct obd_device *obd = exp->exp_obd;
2994 struct lustre_handle lockh = { 0 };
2995 struct ptlrpc_request *req = NULL;
2996 int intent = *flags & LDLM_FL_HAS_INTENT;
2997 __u64 match_flags = *flags;
2998 enum ldlm_mode mode;
3002 /* Filesystem lock extents are extended to page boundaries so that
3003 * dealing with the page cache is a little smoother. */
3004 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
3005 policy->l_extent.end |= ~PAGE_MASK;
3007 /* Next, search for already existing extent locks that will cover us */
3008 /* If we're trying to read, we also search for an existing PW lock. The
3009 * VFS and page cache already protect us locally, so lots of readers/
3010 * writers can share a single PW lock.
3012 * There are problems with conversion deadlocks, so instead of
3013 * converting a read lock to a write lock, we'll just enqueue a new
3016 * At some point we should cancel the read lock instead of making them
3017 * send us a blocking callback, but there are problems with canceling
3018 * locks out from other users right now, too. */
3019 mode = einfo->ei_mode;
3020 if (einfo->ei_mode == LCK_PR)
3022 /* Normal lock requests must wait for the LVB to be ready before
3023 * matching a lock; speculative lock requests do not need to,
3024 * because they will not actually use the lock. */
3026 match_flags |= LDLM_FL_LVB_READY;
3028 match_flags |= LDLM_FL_BLOCK_GRANTED;
3029 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
3030 einfo->ei_type, policy, mode, &lockh);
3032 struct ldlm_lock *matched;
3034 if (*flags & LDLM_FL_TEST_LOCK)
3037 matched = ldlm_handle2lock(&lockh);
3039 /* This DLM lock request is speculative, and does not
3040 * have an associated IO request. Therefore if there
3041 * is already a DLM lock, it wll just inform the
3042 * caller to cancel the request for this stripe.*/
3043 lock_res_and_lock(matched);
3044 if (ldlm_extent_equal(&policy->l_extent,
3045 &matched->l_policy_data.l_extent))
3049 unlock_res_and_lock(matched);
3051 ldlm_lock_decref(&lockh, mode);
3052 LDLM_LOCK_PUT(matched);
3054 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
3055 *flags |= LDLM_FL_LVB_READY;
3057 /* We already have a lock, and it's referenced. */
3058 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
3060 ldlm_lock_decref(&lockh, mode);
3061 LDLM_LOCK_PUT(matched);
3064 ldlm_lock_decref(&lockh, mode);
3065 LDLM_LOCK_PUT(matched);
3069 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
3072 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3073 *flags &= ~LDLM_FL_BLOCK_GRANTED;
3075 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3076 sizeof(*lvb), LVB_T_OST, &lockh, async);
3079 struct osc_enqueue_args *aa;
3080 aa = ptlrpc_req_async_args(aa, req);
3082 aa->oa_mode = einfo->ei_mode;
3083 aa->oa_type = einfo->ei_type;
3084 lustre_handle_copy(&aa->oa_lockh, &lockh);
3085 aa->oa_upcall = upcall;
3086 aa->oa_cookie = cookie;
3087 aa->oa_speculative = speculative;
3089 aa->oa_flags = flags;
3092 /* speculative locks are essentially to enqueue
3093 * a DLM lock in advance, so we don't care
3094 * about the result of the enqueue. */
3096 aa->oa_flags = NULL;
3099 req->rq_interpret_reply = osc_enqueue_interpret;
3100 ptlrpc_set_add_req(rqset, req);
3105 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
3106 flags, speculative, rc);
3111 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
3112 struct ldlm_res_id *res_id, enum ldlm_type type,
3113 union ldlm_policy_data *policy, enum ldlm_mode mode,
3114 __u64 *flags, struct osc_object *obj,
3115 struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
3117 struct obd_device *obd = exp->exp_obd;
3118 __u64 lflags = *flags;
3122 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3125 /* Filesystem lock extents are extended to page boundaries so that
3126 * dealing with the page cache is a little smoother */
3127 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
3128 policy->l_extent.end |= ~PAGE_MASK;
3130 /* Next, search for already existing extent locks that will cover us */
3131 rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
3132 res_id, type, policy, mode, lockh,
3134 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
3138 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3140 LASSERT(lock != NULL);
3141 if (osc_set_lock_data(lock, obj)) {
3142 lock_res_and_lock(lock);
3143 if (!ldlm_is_lvb_cached(lock)) {
3144 LASSERT(lock->l_ast_data == obj);
3145 osc_lock_lvb_update(env, obj, lock, NULL);
3146 ldlm_set_lvb_cached(lock);
3148 unlock_res_and_lock(lock);
3150 ldlm_lock_decref(lockh, rc);
3153 LDLM_LOCK_PUT(lock);
3158 static int osc_statfs_interpret(const struct lu_env *env,
3159 struct ptlrpc_request *req, void *args, int rc)
3161 struct osc_async_args *aa = args;
3162 struct obd_statfs *msfs;
3167 * The request has in fact never been sent due to issues at
3168 * a higher level (LOV). Exit immediately since the caller
3169 * is aware of the problem and takes care of the clean up.
3173 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3174 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3180 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3182 GOTO(out, rc = -EPROTO);
3184 *aa->aa_oi->oi_osfs = *msfs;
3186 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3191 static int osc_statfs_async(struct obd_export *exp,
3192 struct obd_info *oinfo, time64_t max_age,
3193 struct ptlrpc_request_set *rqset)
3195 struct obd_device *obd = class_exp2obd(exp);
3196 struct ptlrpc_request *req;
3197 struct osc_async_args *aa;
3201 if (obd->obd_osfs_age >= max_age) {
3203 "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
3204 obd->obd_name, &obd->obd_osfs,
3205 obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
3206 obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
3207 spin_lock(&obd->obd_osfs_lock);
3208 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
3209 spin_unlock(&obd->obd_osfs_lock);
3210 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
3211 if (oinfo->oi_cb_up)
3212 oinfo->oi_cb_up(oinfo, 0);
3217 /* We could possibly pass max_age in the request (as an absolute
3218 * timestamp or a "seconds.usec ago") so the target can avoid doing
3219 * extra calls into the filesystem if that isn't necessary (e.g.
3220 * during mount that would help a bit). Having relative timestamps
3221 * is not so great if request processing is slow, while absolute
3222 * timestamps are not ideal because they need time synchronization. */
3223 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3227 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3229 ptlrpc_request_free(req);
3232 ptlrpc_request_set_replen(req);
3233 req->rq_request_portal = OST_CREATE_PORTAL;
3234 ptlrpc_at_set_req_timeout(req);
3236 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3237 /* procfs requests not want stat in wait for avoid deadlock */
3238 req->rq_no_resend = 1;
3239 req->rq_no_delay = 1;
3242 req->rq_interpret_reply = osc_statfs_interpret;
3243 aa = ptlrpc_req_async_args(aa, req);
3246 ptlrpc_set_add_req(rqset, req);
3250 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3251 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3253 struct obd_device *obd = class_exp2obd(exp);
3254 struct obd_statfs *msfs;
3255 struct ptlrpc_request *req;
3256 struct obd_import *imp, *imp0;
3260 /*Since the request might also come from lprocfs, so we need
3261 *sync this with client_disconnect_export Bug15684
3263 with_imp_locked(obd, imp0, rc)
3264 imp = class_import_get(imp0);
3268 /* We could possibly pass max_age in the request (as an absolute
3269 * timestamp or a "seconds.usec ago") so the target can avoid doing
3270 * extra calls into the filesystem if that isn't necessary (e.g.
3271 * during mount that would help a bit). Having relative timestamps
3272 * is not so great if request processing is slow, while absolute
3273 * timestamps are not ideal because they need time synchronization. */
3274 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3276 class_import_put(imp);
3281 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3283 ptlrpc_request_free(req);
3286 ptlrpc_request_set_replen(req);
3287 req->rq_request_portal = OST_CREATE_PORTAL;
3288 ptlrpc_at_set_req_timeout(req);
3290 if (flags & OBD_STATFS_NODELAY) {
3291 /* procfs requests not want stat in wait for avoid deadlock */
3292 req->rq_no_resend = 1;
3293 req->rq_no_delay = 1;
3296 rc = ptlrpc_queue_wait(req);
3300 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3302 GOTO(out, rc = -EPROTO);
3308 ptlrpc_req_finished(req);
3312 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3313 void *karg, void __user *uarg)
3315 struct obd_device *obd = exp->exp_obd;
3316 struct obd_ioctl_data *data = karg;
3320 if (!try_module_get(THIS_MODULE)) {
3321 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3322 module_name(THIS_MODULE));
3326 case OBD_IOC_CLIENT_RECOVER:
3327 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3328 data->ioc_inlbuf1, 0);
3332 case OBD_IOC_GETATTR:
3333 rc = obd_getattr(NULL, exp, &data->ioc_obdo1);
3335 case IOC_OSC_SET_ACTIVE:
3336 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3341 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3342 obd->obd_name, cmd, current->comm, rc);
3346 module_put(THIS_MODULE);
3350 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3351 u32 keylen, void *key, u32 vallen, void *val,
3352 struct ptlrpc_request_set *set)
3354 struct ptlrpc_request *req;
3355 struct obd_device *obd = exp->exp_obd;
3356 struct obd_import *imp = class_exp2cliimp(exp);
3361 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3363 if (KEY_IS(KEY_CHECKSUM)) {
3364 if (vallen != sizeof(int))
3366 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3370 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3371 sptlrpc_conf_client_adapt(obd);
3375 if (KEY_IS(KEY_FLUSH_CTX)) {
3376 sptlrpc_import_flush_my_ctx(imp);
3380 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3381 struct client_obd *cli = &obd->u.cli;
3382 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3383 long target = *(long *)val;
3385 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3390 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3393 /* We pass all other commands directly to OST. Since nobody calls osc
3394 methods directly and everybody is supposed to go through LOV, we
3395 assume lov checked invalid values for us.
3396 The only recognised values so far are evict_by_nid and mds_conn.
3397 Even if something bad goes through, we'd get a -EINVAL from OST
3400 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3401 &RQF_OST_SET_GRANT_INFO :
3406 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3407 RCL_CLIENT, keylen);
3408 if (!KEY_IS(KEY_GRANT_SHRINK))
3409 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3410 RCL_CLIENT, vallen);
3411 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3413 ptlrpc_request_free(req);
3417 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3418 memcpy(tmp, key, keylen);
3419 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3422 memcpy(tmp, val, vallen);
3424 if (KEY_IS(KEY_GRANT_SHRINK)) {
3425 struct osc_grant_args *aa;
3428 aa = ptlrpc_req_async_args(aa, req);
3429 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3431 ptlrpc_req_finished(req);
3434 *oa = ((struct ost_body *)val)->oa;
3436 req->rq_interpret_reply = osc_shrink_grant_interpret;
3439 ptlrpc_request_set_replen(req);
3440 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3441 LASSERT(set != NULL);
3442 ptlrpc_set_add_req(set, req);
3443 ptlrpc_check_set(NULL, set);
3445 ptlrpcd_add_req(req);
3450 EXPORT_SYMBOL(osc_set_info_async);
3452 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3453 struct obd_device *obd, struct obd_uuid *cluuid,
3454 struct obd_connect_data *data, void *localdata)
3456 struct client_obd *cli = &obd->u.cli;
3458 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3462 spin_lock(&cli->cl_loi_list_lock);
3463 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3464 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3465 /* restore ocd_grant_blkbits as client page bits */
3466 data->ocd_grant_blkbits = PAGE_SHIFT;
3467 grant += cli->cl_dirty_grant;
3469 grant += cli->cl_dirty_pages << PAGE_SHIFT;
3471 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3472 lost_grant = cli->cl_lost_grant;
3473 cli->cl_lost_grant = 0;
3474 spin_unlock(&cli->cl_loi_list_lock);
3476 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3477 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3478 data->ocd_version, data->ocd_grant, lost_grant);
3483 EXPORT_SYMBOL(osc_reconnect);
3485 int osc_disconnect(struct obd_export *exp)
3487 struct obd_device *obd = class_exp2obd(exp);
3490 rc = client_disconnect_export(exp);
3492 * Initially we put del_shrink_grant before disconnect_export, but it
3493 * causes the following problem if setup (connect) and cleanup
3494 * (disconnect) are tangled together.
3495 * connect p1 disconnect p2
3496 * ptlrpc_connect_import
3497 * ............... class_manual_cleanup
3500 * ptlrpc_connect_interrupt
3502 * add this client to shrink list
3504 * Bang! grant shrink thread trigger the shrink. BUG18662
3506 osc_del_grant_list(&obd->u.cli);
3509 EXPORT_SYMBOL(osc_disconnect);
3511 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3512 struct hlist_node *hnode, void *arg)
3514 struct lu_env *env = arg;
3515 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3516 struct ldlm_lock *lock;
3517 struct osc_object *osc = NULL;
3521 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3522 if (lock->l_ast_data != NULL && osc == NULL) {
3523 osc = lock->l_ast_data;
3524 cl_object_get(osc2cl(osc));
3527 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3528 * by the 2nd round of ldlm_namespace_clean() call in
3529 * osc_import_event(). */
3530 ldlm_clear_cleaned(lock);
3535 osc_object_invalidate(env, osc);
3536 cl_object_put(env, osc2cl(osc));
3541 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3543 static int osc_import_event(struct obd_device *obd,
3544 struct obd_import *imp,
3545 enum obd_import_event event)
3547 struct client_obd *cli;
3551 LASSERT(imp->imp_obd == obd);
3554 case IMP_EVENT_DISCON: {
3556 spin_lock(&cli->cl_loi_list_lock);
3557 cli->cl_avail_grant = 0;
3558 cli->cl_lost_grant = 0;
3559 spin_unlock(&cli->cl_loi_list_lock);
3562 case IMP_EVENT_INACTIVE: {
3563 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3566 case IMP_EVENT_INVALIDATE: {
3567 struct ldlm_namespace *ns = obd->obd_namespace;
3571 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3573 env = cl_env_get(&refcheck);
3575 osc_io_unplug(env, &obd->u.cli, NULL);
3577 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3578 osc_ldlm_resource_invalidate,
3580 cl_env_put(env, &refcheck);
3582 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3587 case IMP_EVENT_ACTIVE: {
3588 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3591 case IMP_EVENT_OCD: {
3592 struct obd_connect_data *ocd = &imp->imp_connect_data;
3594 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3595 osc_init_grant(&obd->u.cli, ocd);
3598 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3599 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3601 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3604 case IMP_EVENT_DEACTIVATE: {
3605 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3608 case IMP_EVENT_ACTIVATE: {
3609 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3613 CERROR("Unknown import event %d\n", event);
3620 * Determine whether the lock can be canceled before replaying the lock
3621 * during recovery, see bug16774 for detailed information.
3623 * \retval zero the lock can't be canceled
3624 * \retval other ok to cancel
3626 static int osc_cancel_weight(struct ldlm_lock *lock)
3629 * Cancel all unused and granted extent lock.
3631 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3632 ldlm_is_granted(lock) &&
3633 osc_ldlm_weigh_ast(lock) == 0)
3639 static int brw_queue_work(const struct lu_env *env, void *data)
3641 struct client_obd *cli = data;
3643 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3645 osc_io_unplug(env, cli, NULL);
3649 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3651 struct client_obd *cli = &obd->u.cli;
3657 rc = ptlrpcd_addref();
3661 rc = client_obd_setup(obd, lcfg);
3663 GOTO(out_ptlrpcd, rc);
3666 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3667 if (IS_ERR(handler))
3668 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3669 cli->cl_writeback_work = handler;
3671 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3672 if (IS_ERR(handler))
3673 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3674 cli->cl_lru_work = handler;
3676 rc = osc_quota_setup(obd);
3678 GOTO(out_ptlrpcd_work, rc);
3680 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3681 cli->cl_root_squash = 0;
3682 osc_update_next_shrink(cli);
3687 if (cli->cl_writeback_work != NULL) {
3688 ptlrpcd_destroy_work(cli->cl_writeback_work);
3689 cli->cl_writeback_work = NULL;
3691 if (cli->cl_lru_work != NULL) {
3692 ptlrpcd_destroy_work(cli->cl_lru_work);
3693 cli->cl_lru_work = NULL;
3695 client_obd_cleanup(obd);
3700 EXPORT_SYMBOL(osc_setup_common);
3702 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3704 struct client_obd *cli = &obd->u.cli;
3712 rc = osc_setup_common(obd, lcfg);
3716 rc = osc_tunables_init(obd);
3721 * We try to control the total number of requests with a upper limit
3722 * osc_reqpool_maxreqcount. There might be some race which will cause
3723 * over-limit allocation, but it is fine.
3725 req_count = atomic_read(&osc_pool_req_count);
3726 if (req_count < osc_reqpool_maxreqcount) {
3727 adding = cli->cl_max_rpcs_in_flight + 2;
3728 if (req_count + adding > osc_reqpool_maxreqcount)
3729 adding = osc_reqpool_maxreqcount - req_count;
3731 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3732 atomic_add(added, &osc_pool_req_count);
3735 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3737 spin_lock(&osc_shrink_lock);
3738 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3739 spin_unlock(&osc_shrink_lock);
3740 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3741 cli->cl_import->imp_idle_debug = D_HA;
3746 int osc_precleanup_common(struct obd_device *obd)
3748 struct client_obd *cli = &obd->u.cli;
3752 * for echo client, export may be on zombie list, wait for
3753 * zombie thread to cull it, because cli.cl_import will be
3754 * cleared in client_disconnect_export():
3755 * class_export_destroy() -> obd_cleanup() ->
3756 * echo_device_free() -> echo_client_cleanup() ->
3757 * obd_disconnect() -> osc_disconnect() ->
3758 * client_disconnect_export()
3760 obd_zombie_barrier();
3761 if (cli->cl_writeback_work) {
3762 ptlrpcd_destroy_work(cli->cl_writeback_work);
3763 cli->cl_writeback_work = NULL;
3766 if (cli->cl_lru_work) {
3767 ptlrpcd_destroy_work(cli->cl_lru_work);
3768 cli->cl_lru_work = NULL;
3771 obd_cleanup_client_import(obd);
3774 EXPORT_SYMBOL(osc_precleanup_common);
3776 static int osc_precleanup(struct obd_device *obd)
3780 osc_precleanup_common(obd);
3782 ptlrpc_lprocfs_unregister_obd(obd);
3786 int osc_cleanup_common(struct obd_device *obd)
3788 struct client_obd *cli = &obd->u.cli;
3793 spin_lock(&osc_shrink_lock);
3794 list_del(&cli->cl_shrink_list);
3795 spin_unlock(&osc_shrink_lock);
3798 if (cli->cl_cache != NULL) {
3799 LASSERT(refcount_read(&cli->cl_cache->ccc_users) > 0);
3800 spin_lock(&cli->cl_cache->ccc_lru_lock);
3801 list_del_init(&cli->cl_lru_osc);
3802 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3803 cli->cl_lru_left = NULL;
3804 cl_cache_decref(cli->cl_cache);
3805 cli->cl_cache = NULL;
3808 /* free memory of osc quota cache */
3809 osc_quota_cleanup(obd);
3811 rc = client_obd_cleanup(obd);
3816 EXPORT_SYMBOL(osc_cleanup_common);
3818 static const struct obd_ops osc_obd_ops = {
3819 .o_owner = THIS_MODULE,
3820 .o_setup = osc_setup,
3821 .o_precleanup = osc_precleanup,
3822 .o_cleanup = osc_cleanup_common,
3823 .o_add_conn = client_import_add_conn,
3824 .o_del_conn = client_import_del_conn,
3825 .o_connect = client_connect_import,
3826 .o_reconnect = osc_reconnect,
3827 .o_disconnect = osc_disconnect,
3828 .o_statfs = osc_statfs,
3829 .o_statfs_async = osc_statfs_async,
3830 .o_create = osc_create,
3831 .o_destroy = osc_destroy,
3832 .o_getattr = osc_getattr,
3833 .o_setattr = osc_setattr,
3834 .o_iocontrol = osc_iocontrol,
3835 .o_set_info_async = osc_set_info_async,
3836 .o_import_event = osc_import_event,
3837 .o_quotactl = osc_quotactl,
3840 LIST_HEAD(osc_shrink_list);
3841 DEFINE_SPINLOCK(osc_shrink_lock);
3843 #ifdef HAVE_SHRINKER_COUNT
3844 static struct shrinker osc_cache_shrinker = {
3845 .count_objects = osc_cache_shrink_count,
3846 .scan_objects = osc_cache_shrink_scan,
3847 .seeks = DEFAULT_SEEKS,
3850 static int osc_cache_shrink(struct shrinker *shrinker,
3851 struct shrink_control *sc)
3853 (void)osc_cache_shrink_scan(shrinker, sc);
3855 return osc_cache_shrink_count(shrinker, sc);
3858 static struct shrinker osc_cache_shrinker = {
3859 .shrink = osc_cache_shrink,
3860 .seeks = DEFAULT_SEEKS,
3864 static int __init osc_init(void)
3866 unsigned int reqpool_size;
3867 unsigned int reqsize;
3871 /* print an address of _any_ initialized kernel symbol from this
3872 * module, to allow debugging with gdb that doesn't support data
3873 * symbols from modules.*/
3874 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3876 rc = lu_kmem_init(osc_caches);
3880 rc = class_register_type(&osc_obd_ops, NULL, true,
3881 LUSTRE_OSC_NAME, &osc_device_type);
3885 rc = register_shrinker(&osc_cache_shrinker);
3889 /* This is obviously too much memory, only prevent overflow here */
3890 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3891 GOTO(out_shrinker, rc = -EINVAL);
3893 reqpool_size = osc_reqpool_mem_max << 20;
3896 while (reqsize < OST_IO_MAXREQSIZE)
3897 reqsize = reqsize << 1;
3900 * We don't enlarge the request count in OSC pool according to
3901 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3902 * tried after normal allocation failed. So a small OSC pool won't
3903 * cause much performance degression in most of cases.
3905 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3907 atomic_set(&osc_pool_req_count, 0);
3908 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3909 ptlrpc_add_rqs_to_pool);
3911 if (osc_rq_pool == NULL)
3912 GOTO(out_shrinker, rc = -ENOMEM);
3914 rc = osc_start_grant_work();
3916 GOTO(out_req_pool, rc);
3921 ptlrpc_free_rq_pool(osc_rq_pool);
3923 unregister_shrinker(&osc_cache_shrinker);
3925 class_unregister_type(LUSTRE_OSC_NAME);
3927 lu_kmem_fini(osc_caches);
3932 static void __exit osc_exit(void)
3934 osc_stop_grant_work();
3935 unregister_shrinker(&osc_cache_shrinker);
3936 class_unregister_type(LUSTRE_OSC_NAME);
3937 lu_kmem_fini(osc_caches);
3938 ptlrpc_free_rq_pool(osc_rq_pool);
3941 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3942 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3943 MODULE_VERSION(LUSTRE_VERSION_STRING);
3944 MODULE_LICENSE("GPL");
3946 module_init(osc_init);
3947 module_exit(osc_exit);