4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <linux/workqueue.h>
36 #include <libcfs/libcfs.h>
37 #include <linux/falloc.h>
38 #include <lprocfs_status.h>
39 #include <lustre_debug.h>
40 #include <lustre_dlm.h>
41 #include <lustre_fid.h>
42 #include <lustre_ha.h>
43 #include <uapi/linux/lustre/lustre_ioctl.h>
44 #include <lustre_net.h>
45 #include <lustre_obdo.h>
47 #include <obd_cksum.h>
48 #include <obd_class.h>
49 #include <lustre_osc.h>
50 #include <linux/falloc.h>
52 #include "osc_internal.h"
54 atomic_t osc_pool_req_count;
55 unsigned int osc_reqpool_maxreqcount;
56 struct ptlrpc_request_pool *osc_rq_pool;
58 /* max memory used for request pool, unit is MB */
59 static unsigned int osc_reqpool_mem_max = 5;
60 module_param(osc_reqpool_mem_max, uint, 0444);
62 static int osc_idle_timeout = 20;
63 module_param(osc_idle_timeout, uint, 0644);
65 #define osc_grant_args osc_brw_async_args
67 struct osc_setattr_args {
69 obd_enqueue_update_f sa_upcall;
73 struct osc_fsync_args {
74 struct osc_object *fa_obj;
76 obd_enqueue_update_f fa_upcall;
80 struct osc_ladvise_args {
82 obd_enqueue_update_f la_upcall;
86 static void osc_release_ppga(struct brw_page **ppga, size_t count);
87 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
90 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
92 struct ost_body *body;
94 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
97 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
100 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
103 struct ptlrpc_request *req;
104 struct ost_body *body;
108 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
112 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
114 ptlrpc_request_free(req);
118 osc_pack_req_body(req, oa);
120 ptlrpc_request_set_replen(req);
122 rc = ptlrpc_queue_wait(req);
126 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
128 GOTO(out, rc = -EPROTO);
130 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
131 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
133 oa->o_blksize = cli_brw_size(exp->exp_obd);
134 oa->o_valid |= OBD_MD_FLBLKSZ;
138 ptlrpc_req_finished(req);
143 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
146 struct ptlrpc_request *req;
147 struct ost_body *body;
151 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
153 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
157 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
159 ptlrpc_request_free(req);
163 osc_pack_req_body(req, oa);
165 ptlrpc_request_set_replen(req);
167 rc = ptlrpc_queue_wait(req);
171 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
173 GOTO(out, rc = -EPROTO);
175 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
179 ptlrpc_req_finished(req);
184 static int osc_setattr_interpret(const struct lu_env *env,
185 struct ptlrpc_request *req, void *args, int rc)
187 struct osc_setattr_args *sa = args;
188 struct ost_body *body;
195 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
197 GOTO(out, rc = -EPROTO);
199 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
202 rc = sa->sa_upcall(sa->sa_cookie, rc);
206 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
207 obd_enqueue_update_f upcall, void *cookie,
208 struct ptlrpc_request_set *rqset)
210 struct ptlrpc_request *req;
211 struct osc_setattr_args *sa;
216 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
220 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
222 ptlrpc_request_free(req);
226 osc_pack_req_body(req, oa);
228 ptlrpc_request_set_replen(req);
230 /* do mds to ost setattr asynchronously */
232 /* Do not wait for response. */
233 ptlrpcd_add_req(req);
235 req->rq_interpret_reply = osc_setattr_interpret;
237 sa = ptlrpc_req_async_args(sa, req);
239 sa->sa_upcall = upcall;
240 sa->sa_cookie = cookie;
242 ptlrpc_set_add_req(rqset, req);
248 static int osc_ladvise_interpret(const struct lu_env *env,
249 struct ptlrpc_request *req,
252 struct osc_ladvise_args *la = arg;
253 struct ost_body *body;
259 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
261 GOTO(out, rc = -EPROTO);
263 *la->la_oa = body->oa;
265 rc = la->la_upcall(la->la_cookie, rc);
270 * If rqset is NULL, do not wait for response. Upcall and cookie could also
271 * be NULL in this case
273 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
274 struct ladvise_hdr *ladvise_hdr,
275 obd_enqueue_update_f upcall, void *cookie,
276 struct ptlrpc_request_set *rqset)
278 struct ptlrpc_request *req;
279 struct ost_body *body;
280 struct osc_ladvise_args *la;
282 struct lu_ladvise *req_ladvise;
283 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
284 int num_advise = ladvise_hdr->lah_count;
285 struct ladvise_hdr *req_ladvise_hdr;
288 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
292 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
293 num_advise * sizeof(*ladvise));
294 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
296 ptlrpc_request_free(req);
299 req->rq_request_portal = OST_IO_PORTAL;
300 ptlrpc_at_set_req_timeout(req);
302 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
304 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
307 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
308 &RMF_OST_LADVISE_HDR);
309 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
311 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
312 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
313 ptlrpc_request_set_replen(req);
316 /* Do not wait for response. */
317 ptlrpcd_add_req(req);
321 req->rq_interpret_reply = osc_ladvise_interpret;
322 la = ptlrpc_req_async_args(la, req);
324 la->la_upcall = upcall;
325 la->la_cookie = cookie;
327 ptlrpc_set_add_req(rqset, req);
332 static int osc_create(const struct lu_env *env, struct obd_export *exp,
335 struct ptlrpc_request *req;
336 struct ost_body *body;
341 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
342 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
344 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
346 GOTO(out, rc = -ENOMEM);
348 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
350 ptlrpc_request_free(req);
354 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
357 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
359 ptlrpc_request_set_replen(req);
361 rc = ptlrpc_queue_wait(req);
365 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
367 GOTO(out_req, rc = -EPROTO);
369 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
370 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
372 oa->o_blksize = cli_brw_size(exp->exp_obd);
373 oa->o_valid |= OBD_MD_FLBLKSZ;
375 CDEBUG(D_HA, "transno: %lld\n",
376 lustre_msg_get_transno(req->rq_repmsg));
378 ptlrpc_req_finished(req);
383 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
384 obd_enqueue_update_f upcall, void *cookie)
386 struct ptlrpc_request *req;
387 struct osc_setattr_args *sa;
388 struct obd_import *imp = class_exp2cliimp(exp);
389 struct ost_body *body;
394 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
398 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
400 ptlrpc_request_free(req);
404 osc_set_io_portal(req);
406 ptlrpc_at_set_req_timeout(req);
408 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
410 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
412 ptlrpc_request_set_replen(req);
414 req->rq_interpret_reply = osc_setattr_interpret;
415 sa = ptlrpc_req_async_args(sa, req);
417 sa->sa_upcall = upcall;
418 sa->sa_cookie = cookie;
420 ptlrpcd_add_req(req);
424 EXPORT_SYMBOL(osc_punch_send);
427 * osc_fallocate_base() - Handles fallocate request.
429 * @exp: Export structure
430 * @oa: Attributes passed to OSS from client (obdo structure)
431 * @upcall: Primary & supplementary group information
432 * @cookie: Exclusive identifier
433 * @rqset: Request list.
434 * @mode: Operation done on given range.
436 * osc_fallocate_base() - Handles fallocate requests only. Only block
437 * allocation or standard preallocate operation is supported currently.
438 * Other mode flags is not supported yet. ftruncate(2) or truncate(2)
439 * is supported via SETATTR request.
441 * Return: Non-zero on failure and O on success.
443 int osc_fallocate_base(struct obd_export *exp, struct obdo *oa,
444 obd_enqueue_update_f upcall, void *cookie, int mode)
446 struct ptlrpc_request *req;
447 struct osc_setattr_args *sa;
448 struct ost_body *body;
449 struct obd_import *imp = class_exp2cliimp(exp);
454 * Only mode == 0 (which is standard prealloc) is supported now.
455 * Punch is not supported yet.
457 if (mode & ~FALLOC_FL_KEEP_SIZE)
459 oa->o_falloc_mode = mode;
461 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
466 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_FALLOCATE);
468 ptlrpc_request_free(req);
472 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
475 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
477 ptlrpc_request_set_replen(req);
479 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
480 BUILD_BUG_ON(sizeof(*sa) > sizeof(req->rq_async_args));
481 sa = ptlrpc_req_async_args(sa, req);
483 sa->sa_upcall = upcall;
484 sa->sa_cookie = cookie;
486 ptlrpcd_add_req(req);
491 static int osc_sync_interpret(const struct lu_env *env,
492 struct ptlrpc_request *req, void *args, int rc)
494 struct osc_fsync_args *fa = args;
495 struct ost_body *body;
496 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
497 unsigned long valid = 0;
498 struct cl_object *obj;
504 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
506 CERROR("can't unpack ost_body\n");
507 GOTO(out, rc = -EPROTO);
510 *fa->fa_oa = body->oa;
511 obj = osc2cl(fa->fa_obj);
513 /* Update osc object's blocks attribute */
514 cl_object_attr_lock(obj);
515 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
516 attr->cat_blocks = body->oa.o_blocks;
521 cl_object_attr_update(env, obj, attr, valid);
522 cl_object_attr_unlock(obj);
525 rc = fa->fa_upcall(fa->fa_cookie, rc);
529 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
530 obd_enqueue_update_f upcall, void *cookie,
531 struct ptlrpc_request_set *rqset)
533 struct obd_export *exp = osc_export(obj);
534 struct ptlrpc_request *req;
535 struct ost_body *body;
536 struct osc_fsync_args *fa;
540 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
544 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
546 ptlrpc_request_free(req);
550 /* overload the size and blocks fields in the oa with start/end */
551 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
553 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
555 ptlrpc_request_set_replen(req);
556 req->rq_interpret_reply = osc_sync_interpret;
558 fa = ptlrpc_req_async_args(fa, req);
561 fa->fa_upcall = upcall;
562 fa->fa_cookie = cookie;
564 ptlrpc_set_add_req(rqset, req);
569 /* Find and cancel locally locks matched by @mode in the resource found by
570 * @objid. Found locks are added into @cancel list. Returns the amount of
571 * locks added to @cancels list. */
572 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
573 struct list_head *cancels,
574 enum ldlm_mode mode, __u64 lock_flags)
576 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
577 struct ldlm_res_id res_id;
578 struct ldlm_resource *res;
582 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
583 * export) but disabled through procfs (flag in NS).
585 * This distinguishes from a case when ELC is not supported originally,
586 * when we still want to cancel locks in advance and just cancel them
587 * locally, without sending any RPC. */
588 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
591 ostid_build_res_name(&oa->o_oi, &res_id);
592 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
596 LDLM_RESOURCE_ADDREF(res);
597 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
598 lock_flags, 0, NULL);
599 LDLM_RESOURCE_DELREF(res);
600 ldlm_resource_putref(res);
604 static int osc_destroy_interpret(const struct lu_env *env,
605 struct ptlrpc_request *req, void *args, int rc)
607 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
609 atomic_dec(&cli->cl_destroy_in_flight);
610 wake_up(&cli->cl_destroy_waitq);
615 static int osc_can_send_destroy(struct client_obd *cli)
617 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
618 cli->cl_max_rpcs_in_flight) {
619 /* The destroy request can be sent */
622 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
623 cli->cl_max_rpcs_in_flight) {
625 * The counter has been modified between the two atomic
628 wake_up(&cli->cl_destroy_waitq);
633 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
636 struct client_obd *cli = &exp->exp_obd->u.cli;
637 struct ptlrpc_request *req;
638 struct ost_body *body;
644 CDEBUG(D_INFO, "oa NULL\n");
648 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
649 LDLM_FL_DISCARD_DATA);
651 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
653 ldlm_lock_list_put(&cancels, l_bl_ast, count);
657 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
660 ptlrpc_request_free(req);
664 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
665 ptlrpc_at_set_req_timeout(req);
667 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
669 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
671 ptlrpc_request_set_replen(req);
673 req->rq_interpret_reply = osc_destroy_interpret;
674 if (!osc_can_send_destroy(cli)) {
676 * Wait until the number of on-going destroy RPCs drops
677 * under max_rpc_in_flight
679 rc = l_wait_event_abortable_exclusive(
680 cli->cl_destroy_waitq,
681 osc_can_send_destroy(cli));
683 ptlrpc_req_finished(req);
688 /* Do not wait for response */
689 ptlrpcd_add_req(req);
693 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
696 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
698 LASSERT(!(oa->o_valid & bits));
701 spin_lock(&cli->cl_loi_list_lock);
702 if (cli->cl_ocd_grant_param)
703 oa->o_dirty = cli->cl_dirty_grant;
705 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
706 if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
707 CERROR("dirty %lu > dirty_max %lu\n",
709 cli->cl_dirty_max_pages);
711 } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
712 (long)(obd_max_dirty_pages + 1))) {
713 /* The atomic_read() allowing the atomic_inc() are
714 * not covered by a lock thus they may safely race and trip
715 * this CERROR() unless we add in a small fudge factor (+1). */
716 CERROR("%s: dirty %ld > system dirty_max %ld\n",
717 cli_name(cli), atomic_long_read(&obd_dirty_pages),
718 obd_max_dirty_pages);
720 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
722 CERROR("dirty %lu - dirty_max %lu too big???\n",
723 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
726 unsigned long nrpages;
727 unsigned long undirty;
729 nrpages = cli->cl_max_pages_per_rpc;
730 nrpages *= cli->cl_max_rpcs_in_flight + 1;
731 nrpages = max(nrpages, cli->cl_dirty_max_pages);
732 undirty = nrpages << PAGE_SHIFT;
733 if (cli->cl_ocd_grant_param) {
736 /* take extent tax into account when asking for more
738 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
739 cli->cl_max_extent_pages;
740 undirty += nrextents * cli->cl_grant_extent_tax;
742 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
743 * to add extent tax, etc.
745 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
746 ~(PTLRPC_MAX_BRW_SIZE * 4UL));
748 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
749 /* o_dropped AKA o_misc is 32 bits, but cl_lost_grant is 64 bits */
750 if (cli->cl_lost_grant > INT_MAX) {
752 "%s: avoided o_dropped overflow: cl_lost_grant %lu\n",
753 cli_name(cli), cli->cl_lost_grant);
754 oa->o_dropped = INT_MAX;
756 oa->o_dropped = cli->cl_lost_grant;
758 cli->cl_lost_grant -= oa->o_dropped;
759 spin_unlock(&cli->cl_loi_list_lock);
760 CDEBUG(D_CACHE, "%s: dirty: %llu undirty: %u dropped %u grant: %llu"
761 " cl_lost_grant %lu\n", cli_name(cli), oa->o_dirty,
762 oa->o_undirty, oa->o_dropped, oa->o_grant, cli->cl_lost_grant);
765 void osc_update_next_shrink(struct client_obd *cli)
767 cli->cl_next_shrink_grant = ktime_get_seconds() +
768 cli->cl_grant_shrink_interval;
770 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
771 cli->cl_next_shrink_grant);
774 static void __osc_update_grant(struct client_obd *cli, u64 grant)
776 spin_lock(&cli->cl_loi_list_lock);
777 cli->cl_avail_grant += grant;
778 spin_unlock(&cli->cl_loi_list_lock);
781 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
783 if (body->oa.o_valid & OBD_MD_FLGRANT) {
784 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
785 __osc_update_grant(cli, body->oa.o_grant);
790 * grant thread data for shrinking space.
792 struct grant_thread_data {
793 struct list_head gtd_clients;
794 struct mutex gtd_mutex;
795 unsigned long gtd_stopped:1;
797 static struct grant_thread_data client_gtd;
799 static int osc_shrink_grant_interpret(const struct lu_env *env,
800 struct ptlrpc_request *req,
803 struct osc_grant_args *aa = args;
804 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
805 struct ost_body *body;
808 __osc_update_grant(cli, aa->aa_oa->o_grant);
812 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
814 osc_update_grant(cli, body);
816 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
822 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
824 spin_lock(&cli->cl_loi_list_lock);
825 oa->o_grant = cli->cl_avail_grant / 4;
826 cli->cl_avail_grant -= oa->o_grant;
827 spin_unlock(&cli->cl_loi_list_lock);
828 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
829 oa->o_valid |= OBD_MD_FLFLAGS;
832 oa->o_flags |= OBD_FL_SHRINK_GRANT;
833 osc_update_next_shrink(cli);
836 /* Shrink the current grant, either from some large amount to enough for a
837 * full set of in-flight RPCs, or if we have already shrunk to that limit
838 * then to enough for a single RPC. This avoids keeping more grant than
839 * needed, and avoids shrinking the grant piecemeal. */
840 static int osc_shrink_grant(struct client_obd *cli)
842 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
843 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
845 spin_lock(&cli->cl_loi_list_lock);
846 if (cli->cl_avail_grant <= target_bytes)
847 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
848 spin_unlock(&cli->cl_loi_list_lock);
850 return osc_shrink_grant_to_target(cli, target_bytes);
853 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
856 struct ost_body *body;
859 spin_lock(&cli->cl_loi_list_lock);
860 /* Don't shrink if we are already above or below the desired limit
861 * We don't want to shrink below a single RPC, as that will negatively
862 * impact block allocation and long-term performance. */
863 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
864 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
866 if (target_bytes >= cli->cl_avail_grant) {
867 spin_unlock(&cli->cl_loi_list_lock);
870 spin_unlock(&cli->cl_loi_list_lock);
876 osc_announce_cached(cli, &body->oa, 0);
878 spin_lock(&cli->cl_loi_list_lock);
879 if (target_bytes >= cli->cl_avail_grant) {
880 /* available grant has changed since target calculation */
881 spin_unlock(&cli->cl_loi_list_lock);
882 GOTO(out_free, rc = 0);
884 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
885 cli->cl_avail_grant = target_bytes;
886 spin_unlock(&cli->cl_loi_list_lock);
887 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
888 body->oa.o_valid |= OBD_MD_FLFLAGS;
889 body->oa.o_flags = 0;
891 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
892 osc_update_next_shrink(cli);
894 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
895 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
896 sizeof(*body), body, NULL);
898 __osc_update_grant(cli, body->oa.o_grant);
904 static int osc_should_shrink_grant(struct client_obd *client)
906 time64_t next_shrink = client->cl_next_shrink_grant;
908 if (client->cl_import == NULL)
911 if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
912 client->cl_import->imp_grant_shrink_disabled) {
913 osc_update_next_shrink(client);
917 if (ktime_get_seconds() >= next_shrink - 5) {
918 /* Get the current RPC size directly, instead of going via:
919 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
920 * Keep comment here so that it can be found by searching. */
921 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
923 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
924 client->cl_avail_grant > brw_size)
927 osc_update_next_shrink(client);
932 #define GRANT_SHRINK_RPC_BATCH 100
934 static struct delayed_work work;
936 static void osc_grant_work_handler(struct work_struct *data)
938 struct client_obd *cli;
940 bool init_next_shrink = true;
941 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
944 mutex_lock(&client_gtd.gtd_mutex);
945 list_for_each_entry(cli, &client_gtd.gtd_clients,
947 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
948 osc_should_shrink_grant(cli)) {
949 osc_shrink_grant(cli);
953 if (!init_next_shrink) {
954 if (cli->cl_next_shrink_grant < next_shrink &&
955 cli->cl_next_shrink_grant > ktime_get_seconds())
956 next_shrink = cli->cl_next_shrink_grant;
958 init_next_shrink = false;
959 next_shrink = cli->cl_next_shrink_grant;
962 mutex_unlock(&client_gtd.gtd_mutex);
964 if (client_gtd.gtd_stopped == 1)
967 if (next_shrink > ktime_get_seconds()) {
968 time64_t delay = next_shrink - ktime_get_seconds();
970 schedule_delayed_work(&work, cfs_time_seconds(delay));
972 schedule_work(&work.work);
976 void osc_schedule_grant_work(void)
978 cancel_delayed_work_sync(&work);
979 schedule_work(&work.work);
983 * Start grant thread for returing grant to server for idle clients.
985 static int osc_start_grant_work(void)
987 client_gtd.gtd_stopped = 0;
988 mutex_init(&client_gtd.gtd_mutex);
989 INIT_LIST_HEAD(&client_gtd.gtd_clients);
991 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
992 schedule_work(&work.work);
997 static void osc_stop_grant_work(void)
999 client_gtd.gtd_stopped = 1;
1000 cancel_delayed_work_sync(&work);
1003 static void osc_add_grant_list(struct client_obd *client)
1005 mutex_lock(&client_gtd.gtd_mutex);
1006 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
1007 mutex_unlock(&client_gtd.gtd_mutex);
1010 static void osc_del_grant_list(struct client_obd *client)
1012 if (list_empty(&client->cl_grant_chain))
1015 mutex_lock(&client_gtd.gtd_mutex);
1016 list_del_init(&client->cl_grant_chain);
1017 mutex_unlock(&client_gtd.gtd_mutex);
1020 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1023 * ocd_grant is the total grant amount we're expect to hold: if we've
1024 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1025 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1028 * race is tolerable here: if we're evicted, but imp_state already
1029 * left EVICTED state, then cl_dirty_pages must be 0 already.
1031 spin_lock(&cli->cl_loi_list_lock);
1032 cli->cl_avail_grant = ocd->ocd_grant;
1033 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
1034 unsigned long consumed = cli->cl_reserved_grant;
1036 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
1037 consumed += cli->cl_dirty_grant;
1039 consumed += cli->cl_dirty_pages << PAGE_SHIFT;
1040 if (cli->cl_avail_grant < consumed) {
1041 CERROR("%s: granted %ld but already consumed %ld\n",
1042 cli_name(cli), cli->cl_avail_grant, consumed);
1043 cli->cl_avail_grant = 0;
1045 cli->cl_avail_grant -= consumed;
1049 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
1053 /* overhead for each extent insertion */
1054 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
1055 /* determine the appropriate chunk size used by osc_extent. */
1056 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
1057 ocd->ocd_grant_blkbits);
1058 /* max_pages_per_rpc must be chunk aligned */
1059 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
1060 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
1061 ~chunk_mask) & chunk_mask;
1062 /* determine maximum extent size, in #pages */
1063 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
1064 cli->cl_max_extent_pages = (size >> PAGE_SHIFT) ?: 1;
1065 cli->cl_ocd_grant_param = 1;
1067 cli->cl_ocd_grant_param = 0;
1068 cli->cl_grant_extent_tax = 0;
1069 cli->cl_chunkbits = PAGE_SHIFT;
1070 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1072 spin_unlock(&cli->cl_loi_list_lock);
1075 "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
1077 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1078 cli->cl_max_extent_pages);
1080 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1081 osc_add_grant_list(cli);
1083 EXPORT_SYMBOL(osc_init_grant);
1085 /* We assume that the reason this OSC got a short read is because it read
1086 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1087 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1088 * this stripe never got written at or beyond this stripe offset yet. */
1089 static void handle_short_read(int nob_read, size_t page_count,
1090 struct brw_page **pga)
1095 /* skip bytes read OK */
1096 while (nob_read > 0) {
1097 LASSERT (page_count > 0);
1099 if (pga[i]->count > nob_read) {
1100 /* EOF inside this page */
1101 ptr = kmap(pga[i]->pg) +
1102 (pga[i]->off & ~PAGE_MASK);
1103 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1110 nob_read -= pga[i]->count;
1115 /* zero remaining pages */
1116 while (page_count-- > 0) {
1117 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1118 memset(ptr, 0, pga[i]->count);
1124 static int check_write_rcs(struct ptlrpc_request *req,
1125 int requested_nob, int niocount,
1126 size_t page_count, struct brw_page **pga)
1131 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1132 sizeof(*remote_rcs) *
1134 if (remote_rcs == NULL) {
1135 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1139 /* return error if any niobuf was in error */
1140 for (i = 0; i < niocount; i++) {
1141 if ((int)remote_rcs[i] < 0) {
1142 CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1143 i, remote_rcs[i], req);
1144 return remote_rcs[i];
1147 if (remote_rcs[i] != 0) {
1148 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1149 i, remote_rcs[i], req);
1153 if (req->rq_bulk != NULL &&
1154 req->rq_bulk->bd_nob_transferred != requested_nob) {
1155 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1156 req->rq_bulk->bd_nob_transferred, requested_nob);
1163 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1165 if (p1->flag != p2->flag) {
1166 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1167 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1168 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1170 /* warn if we try to combine flags that we don't know to be
1171 * safe to combine */
1172 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1173 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1174 "report this at https://jira.whamcloud.com/\n",
1175 p1->flag, p2->flag);
1180 return (p1->off + p1->count == p2->off);
1183 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1184 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1185 size_t pg_count, struct brw_page **pga,
1186 int opc, obd_dif_csum_fn *fn,
1190 struct ahash_request *req;
1191 /* Used Adler as the default checksum type on top of DIF tags */
1192 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1193 struct page *__page;
1194 unsigned char *buffer;
1196 unsigned int bufsize;
1198 int used_number = 0;
1204 LASSERT(pg_count > 0);
1206 __page = alloc_page(GFP_KERNEL);
1210 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1213 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1214 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1218 buffer = kmap(__page);
1219 guard_start = (__u16 *)buffer;
1220 guard_number = PAGE_SIZE / sizeof(*guard_start);
1221 while (nob > 0 && pg_count > 0) {
1222 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1224 /* corrupt the data before we compute the checksum, to
1225 * simulate an OST->client data error */
1226 if (unlikely(i == 0 && opc == OST_READ &&
1227 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1228 unsigned char *ptr = kmap(pga[i]->pg);
1229 int off = pga[i]->off & ~PAGE_MASK;
1231 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1236 * The left guard number should be able to hold checksums of a
1239 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1240 pga[i]->off & ~PAGE_MASK,
1242 guard_start + used_number,
1243 guard_number - used_number,
1249 used_number += used;
1250 if (used_number == guard_number) {
1251 cfs_crypto_hash_update_page(req, __page, 0,
1252 used_number * sizeof(*guard_start));
1256 nob -= pga[i]->count;
1264 if (used_number != 0)
1265 cfs_crypto_hash_update_page(req, __page, 0,
1266 used_number * sizeof(*guard_start));
1268 bufsize = sizeof(cksum);
1269 cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1271 /* For sending we only compute the wrong checksum instead
1272 * of corrupting the data so it is still correct on a redo */
1273 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1278 __free_page(__page);
1281 #else /* !CONFIG_CRC_T10DIF */
1282 #define obd_dif_ip_fn NULL
1283 #define obd_dif_crc_fn NULL
1284 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \
1286 #endif /* CONFIG_CRC_T10DIF */
1288 static int osc_checksum_bulk(int nob, size_t pg_count,
1289 struct brw_page **pga, int opc,
1290 enum cksum_types cksum_type,
1294 struct ahash_request *req;
1295 unsigned int bufsize;
1296 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1298 LASSERT(pg_count > 0);
1300 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1302 CERROR("Unable to initialize checksum hash %s\n",
1303 cfs_crypto_hash_name(cfs_alg));
1304 return PTR_ERR(req);
1307 while (nob > 0 && pg_count > 0) {
1308 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1310 /* corrupt the data before we compute the checksum, to
1311 * simulate an OST->client data error */
1312 if (i == 0 && opc == OST_READ &&
1313 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1314 unsigned char *ptr = kmap(pga[i]->pg);
1315 int off = pga[i]->off & ~PAGE_MASK;
1317 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1320 cfs_crypto_hash_update_page(req, pga[i]->pg,
1321 pga[i]->off & ~PAGE_MASK,
1323 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1324 (int)(pga[i]->off & ~PAGE_MASK));
1326 nob -= pga[i]->count;
1331 bufsize = sizeof(*cksum);
1332 cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1334 /* For sending we only compute the wrong checksum instead
1335 * of corrupting the data so it is still correct on a redo */
1336 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1342 static int osc_checksum_bulk_rw(const char *obd_name,
1343 enum cksum_types cksum_type,
1344 int nob, size_t pg_count,
1345 struct brw_page **pga, int opc,
1348 obd_dif_csum_fn *fn = NULL;
1349 int sector_size = 0;
1353 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1356 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1357 opc, fn, sector_size, check_sum);
1359 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1365 static inline void osc_release_bounce_pages(struct brw_page **pga,
1368 #ifdef HAVE_LUSTRE_CRYPTO
1371 for (i = 0; i < page_count; i++) {
1372 /* Bounce pages allocated by a call to
1373 * llcrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
1374 * are identified thanks to the PageChecked flag.
1376 if (PageChecked(pga[i]->pg))
1377 llcrypt_finalize_bounce_page(&pga[i]->pg);
1378 pga[i]->count -= pga[i]->bp_count_diff;
1379 pga[i]->off += pga[i]->bp_off_diff;
1385 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1386 u32 page_count, struct brw_page **pga,
1387 struct ptlrpc_request **reqp, int resend)
1389 struct ptlrpc_request *req;
1390 struct ptlrpc_bulk_desc *desc;
1391 struct ost_body *body;
1392 struct obd_ioobj *ioobj;
1393 struct niobuf_remote *niobuf;
1394 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1395 struct osc_brw_async_args *aa;
1396 struct req_capsule *pill;
1397 struct brw_page *pg_prev;
1399 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1400 struct inode *inode;
1401 bool directio = false;
1404 inode = page2inode(pga[0]->pg);
1405 if (inode == NULL) {
1406 /* Try to get reference to inode from cl_page if we are
1407 * dealing with direct IO, as handled pages are not
1408 * actual page cache pages.
1410 struct osc_async_page *oap = brw_page2oap(pga[0]);
1411 struct cl_page *clpage = oap2cl_page(oap);
1413 inode = clpage->cp_inode;
1417 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1418 RETURN(-ENOMEM); /* Recoverable */
1419 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1420 RETURN(-EINVAL); /* Fatal */
1422 if ((cmd & OBD_BRW_WRITE) != 0) {
1424 req = ptlrpc_request_alloc_pool(cli->cl_import,
1426 &RQF_OST_BRW_WRITE);
1429 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1434 if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
1435 for (i = 0; i < page_count; i++) {
1436 struct brw_page *pg = pga[i];
1437 struct page *data_page = NULL;
1438 bool retried = false;
1439 bool lockedbymyself;
1440 u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1441 struct address_space *map_orig = NULL;
1445 if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1446 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1447 LUSTRE_ENCRYPTION_UNIT_SIZE;
1448 /* The page can already be locked when we arrive here.
1449 * This is possible when cl_page_assume/vvp_page_assume
1450 * is stuck on wait_on_page_writeback with page lock
1451 * held. In this case there is no risk for the lock to
1452 * be released while we are doing our encryption
1453 * processing, because writeback against that page will
1454 * end in vvp_page_completion_write/cl_page_completion,
1455 * which means only once the page is fully processed.
1457 lockedbymyself = trylock_page(pg->pg);
1459 map_orig = pg->pg->mapping;
1460 pg->pg->mapping = inode->i_mapping;
1461 index_orig = pg->pg->index;
1462 pg->pg->index = pg->off >> PAGE_SHIFT;
1465 llcrypt_encrypt_pagecache_blocks(pg->pg,
1469 pg->pg->mapping = map_orig;
1470 pg->pg->index = index_orig;
1473 unlock_page(pg->pg);
1474 if (IS_ERR(data_page)) {
1475 rc = PTR_ERR(data_page);
1476 if (rc == -ENOMEM && !retried) {
1481 ptlrpc_request_free(req);
1484 /* Set PageChecked flag on bounce page for
1485 * disambiguation in osc_release_bounce_pages().
1487 SetPageChecked(data_page);
1489 /* there should be no gap in the middle of page array */
1490 if (i == page_count - 1) {
1491 struct osc_async_page *oap = brw_page2oap(pg);
1493 oa->o_size = oap->oap_count +
1494 oap->oap_obj_off + oap->oap_page_off;
1496 /* len is forced to nunits, and relative offset to 0
1497 * so store the old, clear text info
1499 pg->bp_count_diff = nunits - pg->count;
1501 pg->bp_off_diff = pg->off & ~PAGE_MASK;
1502 pg->off = pg->off & PAGE_MASK;
1504 } else if (opc == OST_READ && inode && IS_ENCRYPTED(inode)) {
1505 for (i = 0; i < page_count; i++) {
1506 struct brw_page *pg = pga[i];
1507 u32 nunits = (pg->off & ~PAGE_MASK) + pg->count;
1509 if (nunits & ~LUSTRE_ENCRYPTION_MASK)
1510 nunits = (nunits & LUSTRE_ENCRYPTION_MASK) +
1511 LUSTRE_ENCRYPTION_UNIT_SIZE;
1512 /* count/off are forced to cover the whole encryption
1513 * unit size so that all encrypted data is stored on the
1514 * OST, so adjust bp_{count,off}_diff for the size of
1517 pg->bp_count_diff = nunits - pg->count;
1519 pg->bp_off_diff = pg->off & ~PAGE_MASK;
1520 pg->off = pg->off & PAGE_MASK;
1524 for (niocount = i = 1; i < page_count; i++) {
1525 if (!can_merge_pages(pga[i - 1], pga[i]))
1529 pill = &req->rq_pill;
1530 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1532 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1533 niocount * sizeof(*niobuf));
1535 for (i = 0; i < page_count; i++) {
1536 short_io_size += pga[i]->count;
1537 if (!inode || !IS_ENCRYPTED(inode)) {
1538 pga[i]->bp_count_diff = 0;
1539 pga[i]->bp_off_diff = 0;
1543 /* Check if read/write is small enough to be a short io. */
1544 if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1545 !imp_connect_shortio(cli->cl_import))
1548 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1549 opc == OST_READ ? 0 : short_io_size);
1550 if (opc == OST_READ)
1551 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1554 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1556 ptlrpc_request_free(req);
1559 osc_set_io_portal(req);
1561 ptlrpc_at_set_req_timeout(req);
1562 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1564 req->rq_no_retry_einprogress = 1;
1566 if (short_io_size != 0) {
1568 short_io_buf = NULL;
1572 desc = ptlrpc_prep_bulk_imp(req, page_count,
1573 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1574 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1575 PTLRPC_BULK_PUT_SINK),
1577 &ptlrpc_bulk_kiov_pin_ops);
1580 GOTO(out, rc = -ENOMEM);
1581 /* NB request now owns desc and will free it when it gets freed */
1583 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1584 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1585 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1586 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1588 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1590 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1591 * and from_kgid(), because they are asynchronous. Fortunately, variable
1592 * oa contains valid o_uid and o_gid in these two operations.
1593 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1594 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1595 * other process logic */
1596 body->oa.o_uid = oa->o_uid;
1597 body->oa.o_gid = oa->o_gid;
1599 obdo_to_ioobj(oa, ioobj);
1600 ioobj->ioo_bufcnt = niocount;
1601 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1602 * that might be send for this request. The actual number is decided
1603 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1604 * "max - 1" for old client compatibility sending "0", and also so the
1605 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1607 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1609 ioobj_max_brw_set(ioobj, 0);
1611 if (short_io_size != 0) {
1612 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1613 body->oa.o_valid |= OBD_MD_FLFLAGS;
1614 body->oa.o_flags = 0;
1616 body->oa.o_flags |= OBD_FL_SHORT_IO;
1617 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1619 if (opc == OST_WRITE) {
1620 short_io_buf = req_capsule_client_get(pill,
1622 LASSERT(short_io_buf != NULL);
1626 LASSERT(page_count > 0);
1628 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1629 struct brw_page *pg = pga[i];
1630 int poff = pg->off & ~PAGE_MASK;
1632 LASSERT(pg->count > 0);
1633 /* make sure there is no gap in the middle of page array */
1634 LASSERTF(page_count == 1 ||
1635 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1636 ergo(i > 0 && i < page_count - 1,
1637 poff == 0 && pg->count == PAGE_SIZE) &&
1638 ergo(i == page_count - 1, poff == 0)),
1639 "i: %d/%d pg: %p off: %llu, count: %u\n",
1640 i, page_count, pg, pg->off, pg->count);
1641 LASSERTF(i == 0 || pg->off > pg_prev->off,
1642 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1643 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1645 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1646 pg_prev->pg, page_private(pg_prev->pg),
1647 pg_prev->pg->index, pg_prev->off);
1648 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1649 (pg->flag & OBD_BRW_SRVLOCK));
1650 if (short_io_size != 0 && opc == OST_WRITE) {
1651 unsigned char *ptr = kmap_atomic(pg->pg);
1653 LASSERT(short_io_size >= requested_nob + pg->count);
1654 memcpy(short_io_buf + requested_nob,
1658 } else if (short_io_size == 0) {
1659 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1662 requested_nob += pg->count;
1664 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1666 niobuf->rnb_len += pg->count;
1668 niobuf->rnb_offset = pg->off;
1669 niobuf->rnb_len = pg->count;
1670 niobuf->rnb_flags = pg->flag;
1675 LASSERTF((void *)(niobuf - niocount) ==
1676 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1677 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1678 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1680 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1682 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1683 body->oa.o_valid |= OBD_MD_FLFLAGS;
1684 body->oa.o_flags = 0;
1686 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1689 if (osc_should_shrink_grant(cli))
1690 osc_shrink_grant_local(cli, &body->oa);
1692 /* size[REQ_REC_OFF] still sizeof (*body) */
1693 if (opc == OST_WRITE) {
1694 if (cli->cl_checksum &&
1695 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1696 /* store cl_cksum_type in a local variable since
1697 * it can be changed via lprocfs */
1698 enum cksum_types cksum_type = cli->cl_cksum_type;
1700 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1701 body->oa.o_flags = 0;
1703 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1705 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1707 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1708 requested_nob, page_count,
1712 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1716 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1719 /* save this in 'oa', too, for later checking */
1720 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1721 oa->o_flags |= obd_cksum_type_pack(obd_name,
1724 /* clear out the checksum flag, in case this is a
1725 * resend but cl_checksum is no longer set. b=11238 */
1726 oa->o_valid &= ~OBD_MD_FLCKSUM;
1728 oa->o_cksum = body->oa.o_cksum;
1729 /* 1 RC per niobuf */
1730 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1731 sizeof(__u32) * niocount);
1733 if (cli->cl_checksum &&
1734 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1735 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1736 body->oa.o_flags = 0;
1737 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1738 cli->cl_cksum_type);
1739 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1742 /* Client cksum has been already copied to wire obdo in previous
1743 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1744 * resent due to cksum error, this will allow Server to
1745 * check+dump pages on its side */
1747 ptlrpc_request_set_replen(req);
1749 aa = ptlrpc_req_async_args(aa, req);
1751 aa->aa_requested_nob = requested_nob;
1752 aa->aa_nio_count = niocount;
1753 aa->aa_page_count = page_count;
1757 INIT_LIST_HEAD(&aa->aa_oaps);
1760 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1761 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1762 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1763 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1767 ptlrpc_req_finished(req);
1771 char dbgcksum_file_name[PATH_MAX];
1773 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1774 struct brw_page **pga, __u32 server_cksum,
1782 /* will only keep dump of pages on first error for the same range in
1783 * file/fid, not during the resends/retries. */
1784 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1785 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1786 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1787 libcfs_debug_file_path_arr :
1788 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1789 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1790 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1791 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1793 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1794 client_cksum, server_cksum);
1795 filp = filp_open(dbgcksum_file_name,
1796 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1800 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1801 "checksum error: rc = %d\n", dbgcksum_file_name,
1804 CERROR("%s: can't open to dump pages with checksum "
1805 "error: rc = %d\n", dbgcksum_file_name, rc);
1809 for (i = 0; i < page_count; i++) {
1810 len = pga[i]->count;
1811 buf = kmap(pga[i]->pg);
1813 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1815 CERROR("%s: wanted to write %u but got %d "
1816 "error\n", dbgcksum_file_name, len, rc);
1821 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1822 dbgcksum_file_name, rc);
1827 rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1829 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1830 filp_close(filp, NULL);
1834 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1835 __u32 client_cksum, __u32 server_cksum,
1836 struct osc_brw_async_args *aa)
1838 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1839 enum cksum_types cksum_type;
1840 obd_dif_csum_fn *fn = NULL;
1841 int sector_size = 0;
1846 if (server_cksum == client_cksum) {
1847 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1851 if (aa->aa_cli->cl_checksum_dump)
1852 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1853 server_cksum, client_cksum);
1855 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1858 switch (cksum_type) {
1859 case OBD_CKSUM_T10IP512:
1863 case OBD_CKSUM_T10IP4K:
1867 case OBD_CKSUM_T10CRC512:
1868 fn = obd_dif_crc_fn;
1871 case OBD_CKSUM_T10CRC4K:
1872 fn = obd_dif_crc_fn;
1880 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1881 aa->aa_page_count, aa->aa_ppga,
1882 OST_WRITE, fn, sector_size,
1885 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1886 aa->aa_ppga, OST_WRITE, cksum_type,
1890 msg = "failed to calculate the client write checksum";
1891 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1892 msg = "the server did not use the checksum type specified in "
1893 "the original request - likely a protocol problem";
1894 else if (new_cksum == server_cksum)
1895 msg = "changed on the client after we checksummed it - "
1896 "likely false positive due to mmap IO (bug 11742)";
1897 else if (new_cksum == client_cksum)
1898 msg = "changed in transit before arrival at OST";
1900 msg = "changed in transit AND doesn't match the original - "
1901 "likely false positive due to mmap IO (bug 11742)";
1903 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1904 DFID " object "DOSTID" extent [%llu-%llu], original "
1905 "client csum %x (type %x), server csum %x (type %x),"
1906 " client csum now %x\n",
1907 obd_name, msg, libcfs_nid2str(peer->nid),
1908 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1909 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1910 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1911 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1912 aa->aa_ppga[aa->aa_page_count - 1]->off +
1913 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1915 obd_cksum_type_unpack(aa->aa_oa->o_flags),
1916 server_cksum, cksum_type, new_cksum);
1920 /* Note rc enters this function as number of bytes transferred */
1921 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1923 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1924 struct client_obd *cli = aa->aa_cli;
1925 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1926 const struct lnet_process_id *peer =
1927 &req->rq_import->imp_connection->c_peer;
1928 struct ost_body *body;
1929 u32 client_cksum = 0;
1930 struct inode *inode;
1931 unsigned int blockbits = 0, blocksize = 0;
1935 if (rc < 0 && rc != -EDQUOT) {
1936 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1940 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1941 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1943 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1947 /* set/clear over quota flag for a uid/gid/projid */
1948 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1949 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1950 unsigned qid[LL_MAXQUOTAS] = {
1951 body->oa.o_uid, body->oa.o_gid,
1952 body->oa.o_projid };
1954 "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1955 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1956 body->oa.o_valid, body->oa.o_flags);
1957 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1961 osc_update_grant(cli, body);
1966 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1967 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1969 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1971 CERROR("%s: unexpected positive size %d\n",
1976 if (req->rq_bulk != NULL &&
1977 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1980 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1981 check_write_checksum(&body->oa, peer, client_cksum,
1982 body->oa.o_cksum, aa))
1985 rc = check_write_rcs(req, aa->aa_requested_nob,
1986 aa->aa_nio_count, aa->aa_page_count,
1991 /* The rest of this function executes only for OST_READs */
1993 if (req->rq_bulk == NULL) {
1994 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1996 LASSERT(rc == req->rq_status);
1998 /* if unwrap_bulk failed, return -EAGAIN to retry */
1999 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
2002 GOTO(out, rc = -EAGAIN);
2004 if (rc > aa->aa_requested_nob) {
2005 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
2006 rc, aa->aa_requested_nob);
2010 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
2011 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
2012 rc, req->rq_bulk->bd_nob_transferred);
2016 if (req->rq_bulk == NULL) {
2018 int nob, pg_count, i = 0;
2021 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
2022 pg_count = aa->aa_page_count;
2023 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
2026 while (nob > 0 && pg_count > 0) {
2028 int count = aa->aa_ppga[i]->count > nob ?
2029 nob : aa->aa_ppga[i]->count;
2031 CDEBUG(D_CACHE, "page %p count %d\n",
2032 aa->aa_ppga[i]->pg, count);
2033 ptr = kmap_atomic(aa->aa_ppga[i]->pg);
2034 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
2036 kunmap_atomic((void *) ptr);
2045 if (rc < aa->aa_requested_nob)
2046 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
2048 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
2049 static int cksum_counter;
2050 u32 server_cksum = body->oa.o_cksum;
2053 enum cksum_types cksum_type;
2054 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
2055 body->oa.o_flags : 0;
2057 cksum_type = obd_cksum_type_unpack(o_flags);
2058 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
2059 aa->aa_page_count, aa->aa_ppga,
2060 OST_READ, &client_cksum);
2064 if (req->rq_bulk != NULL &&
2065 peer->nid != req->rq_bulk->bd_sender) {
2067 router = libcfs_nid2str(req->rq_bulk->bd_sender);
2070 if (server_cksum != client_cksum) {
2071 struct ost_body *clbody;
2072 u32 page_count = aa->aa_page_count;
2074 clbody = req_capsule_client_get(&req->rq_pill,
2076 if (cli->cl_checksum_dump)
2077 dump_all_bulk_pages(&clbody->oa, page_count,
2078 aa->aa_ppga, server_cksum,
2081 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
2082 "%s%s%s inode "DFID" object "DOSTID
2083 " extent [%llu-%llu], client %x, "
2084 "server %x, cksum_type %x\n",
2086 libcfs_nid2str(peer->nid),
2088 clbody->oa.o_valid & OBD_MD_FLFID ?
2089 clbody->oa.o_parent_seq : 0ULL,
2090 clbody->oa.o_valid & OBD_MD_FLFID ?
2091 clbody->oa.o_parent_oid : 0,
2092 clbody->oa.o_valid & OBD_MD_FLFID ?
2093 clbody->oa.o_parent_ver : 0,
2094 POSTID(&body->oa.o_oi),
2095 aa->aa_ppga[0]->off,
2096 aa->aa_ppga[page_count-1]->off +
2097 aa->aa_ppga[page_count-1]->count - 1,
2098 client_cksum, server_cksum,
2101 aa->aa_oa->o_cksum = client_cksum;
2105 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
2108 } else if (unlikely(client_cksum)) {
2109 static int cksum_missed;
2112 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
2113 CERROR("%s: checksum %u requested from %s but not sent\n",
2114 obd_name, cksum_missed,
2115 libcfs_nid2str(peer->nid));
2120 inode = page2inode(aa->aa_ppga[0]->pg);
2121 if (inode == NULL) {
2122 /* Try to get reference to inode from cl_page if we are
2123 * dealing with direct IO, as handled pages are not
2124 * actual page cache pages.
2126 struct osc_async_page *oap = brw_page2oap(aa->aa_ppga[0]);
2128 inode = oap2cl_page(oap)->cp_inode;
2130 blockbits = inode->i_blkbits;
2131 blocksize = 1 << blockbits;
2134 if (inode && IS_ENCRYPTED(inode)) {
2137 if (!llcrypt_has_encryption_key(inode)) {
2138 CDEBUG(D_SEC, "no enc key for ino %lu\n", inode->i_ino);
2141 for (idx = 0; idx < aa->aa_page_count; idx++) {
2142 struct brw_page *pg = aa->aa_ppga[idx];
2143 unsigned int offs = 0;
2145 while (offs < PAGE_SIZE) {
2146 /* do not decrypt if page is all 0s */
2147 if (memchr_inv(page_address(pg->pg) + offs, 0,
2148 LUSTRE_ENCRYPTION_UNIT_SIZE) == NULL) {
2149 /* if page is empty forward info to
2150 * upper layers (ll_io_zero_page) by
2151 * clearing PagePrivate2
2154 ClearPagePrivate2(pg->pg);
2159 /* This is direct IO case. Directly call
2160 * decrypt function that takes inode as
2161 * input parameter. Page does not need
2165 ((u64)(pg->off >> PAGE_SHIFT) <<
2166 (PAGE_SHIFT - blockbits)) +
2167 (offs >> blockbits);
2172 LUSTRE_ENCRYPTION_UNIT_SIZE;
2173 i += blocksize, lblk_num++) {
2175 llcrypt_decrypt_block_inplace(
2183 rc = llcrypt_decrypt_pagecache_blocks(
2185 LUSTRE_ENCRYPTION_UNIT_SIZE,
2191 offs += LUSTRE_ENCRYPTION_UNIT_SIZE;
2198 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
2199 aa->aa_oa, &body->oa);
2204 static int osc_brw_redo_request(struct ptlrpc_request *request,
2205 struct osc_brw_async_args *aa, int rc)
2207 struct ptlrpc_request *new_req;
2208 struct osc_brw_async_args *new_aa;
2209 struct osc_async_page *oap;
2212 /* The below message is checked in replay-ost-single.sh test_8ae*/
2213 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
2214 "redo for recoverable error %d", rc);
2216 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
2217 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
2218 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
2219 aa->aa_ppga, &new_req, 1);
2223 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2224 if (oap->oap_request != NULL) {
2225 LASSERTF(request == oap->oap_request,
2226 "request %p != oap_request %p\n",
2227 request, oap->oap_request);
2231 * New request takes over pga and oaps from old request.
2232 * Note that copying a list_head doesn't work, need to move it...
2235 new_req->rq_interpret_reply = request->rq_interpret_reply;
2236 new_req->rq_async_args = request->rq_async_args;
2237 new_req->rq_commit_cb = request->rq_commit_cb;
2238 /* cap resend delay to the current request timeout, this is similar to
2239 * what ptlrpc does (see after_reply()) */
2240 if (aa->aa_resends > new_req->rq_timeout)
2241 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
2243 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
2244 new_req->rq_generation_set = 1;
2245 new_req->rq_import_generation = request->rq_import_generation;
2247 new_aa = ptlrpc_req_async_args(new_aa, new_req);
2249 INIT_LIST_HEAD(&new_aa->aa_oaps);
2250 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
2251 INIT_LIST_HEAD(&new_aa->aa_exts);
2252 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
2253 new_aa->aa_resends = aa->aa_resends;
2255 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
2256 if (oap->oap_request) {
2257 ptlrpc_req_finished(oap->oap_request);
2258 oap->oap_request = ptlrpc_request_addref(new_req);
2262 /* XXX: This code will run into problem if we're going to support
2263 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
2264 * and wait for all of them to be finished. We should inherit request
2265 * set from old request. */
2266 ptlrpcd_add_req(new_req);
2268 DEBUG_REQ(D_INFO, new_req, "new request");
2273 * ugh, we want disk allocation on the target to happen in offset order. we'll
2274 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
2275 * fine for our small page arrays and doesn't require allocation. its an
2276 * insertion sort that swaps elements that are strides apart, shrinking the
2277 * stride down until its '1' and the array is sorted.
2279 static void sort_brw_pages(struct brw_page **array, int num)
2282 struct brw_page *tmp;
2286 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2291 for (i = stride ; i < num ; i++) {
2294 while (j >= stride && array[j - stride]->off > tmp->off) {
2295 array[j] = array[j - stride];
2300 } while (stride > 1);
2303 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2305 LASSERT(ppga != NULL);
2306 OBD_FREE_PTR_ARRAY(ppga, count);
2309 static int brw_interpret(const struct lu_env *env,
2310 struct ptlrpc_request *req, void *args, int rc)
2312 struct osc_brw_async_args *aa = args;
2313 struct osc_extent *ext;
2314 struct osc_extent *tmp;
2315 struct client_obd *cli = aa->aa_cli;
2316 unsigned long transferred = 0;
2320 rc = osc_brw_fini_request(req, rc);
2321 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2323 /* restore clear text pages */
2324 osc_release_bounce_pages(aa->aa_ppga, aa->aa_page_count);
2327 * When server returns -EINPROGRESS, client should always retry
2328 * regardless of the number of times the bulk was resent already.
2330 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2331 if (req->rq_import_generation !=
2332 req->rq_import->imp_generation) {
2333 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2334 ""DOSTID", rc = %d.\n",
2335 req->rq_import->imp_obd->obd_name,
2336 POSTID(&aa->aa_oa->o_oi), rc);
2337 } else if (rc == -EINPROGRESS ||
2338 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2339 rc = osc_brw_redo_request(req, aa, rc);
2341 CERROR("%s: too many resent retries for object: "
2342 "%llu:%llu, rc = %d.\n",
2343 req->rq_import->imp_obd->obd_name,
2344 POSTID(&aa->aa_oa->o_oi), rc);
2349 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2354 struct obdo *oa = aa->aa_oa;
2355 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2356 unsigned long valid = 0;
2357 struct cl_object *obj;
2358 struct osc_async_page *last;
2360 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2361 obj = osc2cl(last->oap_obj);
2363 cl_object_attr_lock(obj);
2364 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2365 attr->cat_blocks = oa->o_blocks;
2366 valid |= CAT_BLOCKS;
2368 if (oa->o_valid & OBD_MD_FLMTIME) {
2369 attr->cat_mtime = oa->o_mtime;
2372 if (oa->o_valid & OBD_MD_FLATIME) {
2373 attr->cat_atime = oa->o_atime;
2376 if (oa->o_valid & OBD_MD_FLCTIME) {
2377 attr->cat_ctime = oa->o_ctime;
2381 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2382 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2383 loff_t last_off = last->oap_count + last->oap_obj_off +
2386 /* Change file size if this is an out of quota or
2387 * direct IO write and it extends the file size */
2388 if (loi->loi_lvb.lvb_size < last_off) {
2389 attr->cat_size = last_off;
2392 /* Extend KMS if it's not a lockless write */
2393 if (loi->loi_kms < last_off &&
2394 oap2osc_page(last)->ops_srvlock == 0) {
2395 attr->cat_kms = last_off;
2401 cl_object_attr_update(env, obj, attr, valid);
2402 cl_object_attr_unlock(obj);
2404 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2407 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2408 osc_inc_unstable_pages(req);
2410 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2411 list_del_init(&ext->oe_link);
2412 osc_extent_finish(env, ext, 1,
2413 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2415 LASSERT(list_empty(&aa->aa_exts));
2416 LASSERT(list_empty(&aa->aa_oaps));
2418 transferred = (req->rq_bulk == NULL ? /* short io */
2419 aa->aa_requested_nob :
2420 req->rq_bulk->bd_nob_transferred);
2422 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2423 ptlrpc_lprocfs_brw(req, transferred);
2425 spin_lock(&cli->cl_loi_list_lock);
2426 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2427 * is called so we know whether to go to sync BRWs or wait for more
2428 * RPCs to complete */
2429 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2430 cli->cl_w_in_flight--;
2432 cli->cl_r_in_flight--;
2433 osc_wake_cache_waiters(cli);
2434 spin_unlock(&cli->cl_loi_list_lock);
2436 osc_io_unplug(env, cli, NULL);
2440 static void brw_commit(struct ptlrpc_request *req)
2442 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2443 * this called via the rq_commit_cb, I need to ensure
2444 * osc_dec_unstable_pages is still called. Otherwise unstable
2445 * pages may be leaked. */
2446 spin_lock(&req->rq_lock);
2447 if (likely(req->rq_unstable)) {
2448 req->rq_unstable = 0;
2449 spin_unlock(&req->rq_lock);
2451 osc_dec_unstable_pages(req);
2453 req->rq_committed = 1;
2454 spin_unlock(&req->rq_lock);
2459 * Build an RPC by the list of extent @ext_list. The caller must ensure
2460 * that the total pages in this list are NOT over max pages per RPC.
2461 * Extents in the list must be in OES_RPC state.
2463 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2464 struct list_head *ext_list, int cmd)
2466 struct ptlrpc_request *req = NULL;
2467 struct osc_extent *ext;
2468 struct brw_page **pga = NULL;
2469 struct osc_brw_async_args *aa = NULL;
2470 struct obdo *oa = NULL;
2471 struct osc_async_page *oap;
2472 struct osc_object *obj = NULL;
2473 struct cl_req_attr *crattr = NULL;
2474 loff_t starting_offset = OBD_OBJECT_EOF;
2475 loff_t ending_offset = 0;
2476 /* '1' for consistency with code that checks !mpflag to restore */
2480 bool soft_sync = false;
2481 bool ndelay = false;
2485 __u32 layout_version = 0;
2486 LIST_HEAD(rpc_list);
2487 struct ost_body *body;
2489 LASSERT(!list_empty(ext_list));
2491 /* add pages into rpc_list to build BRW rpc */
2492 list_for_each_entry(ext, ext_list, oe_link) {
2493 LASSERT(ext->oe_state == OES_RPC);
2494 mem_tight |= ext->oe_memalloc;
2495 grant += ext->oe_grants;
2496 page_count += ext->oe_nr_pages;
2497 layout_version = max(layout_version, ext->oe_layout_version);
2502 soft_sync = osc_over_unstable_soft_limit(cli);
2504 mpflag = memalloc_noreclaim_save();
2506 OBD_ALLOC_PTR_ARRAY(pga, page_count);
2508 GOTO(out, rc = -ENOMEM);
2510 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2512 GOTO(out, rc = -ENOMEM);
2515 list_for_each_entry(ext, ext_list, oe_link) {
2516 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2518 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2520 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2521 pga[i] = &oap->oap_brw_page;
2522 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2525 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2526 if (starting_offset == OBD_OBJECT_EOF ||
2527 starting_offset > oap->oap_obj_off)
2528 starting_offset = oap->oap_obj_off;
2530 LASSERT(oap->oap_page_off == 0);
2531 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2532 ending_offset = oap->oap_obj_off +
2535 LASSERT(oap->oap_page_off + oap->oap_count ==
2542 /* first page in the list */
2543 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2545 crattr = &osc_env_info(env)->oti_req_attr;
2546 memset(crattr, 0, sizeof(*crattr));
2547 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2548 crattr->cra_flags = ~0ULL;
2549 crattr->cra_page = oap2cl_page(oap);
2550 crattr->cra_oa = oa;
2551 cl_req_attr_set(env, osc2cl(obj), crattr);
2553 if (cmd == OBD_BRW_WRITE) {
2554 oa->o_grant_used = grant;
2555 if (layout_version > 0) {
2556 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2557 PFID(&oa->o_oi.oi_fid), layout_version);
2559 oa->o_layout_version = layout_version;
2560 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2564 sort_brw_pages(pga, page_count);
2565 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2567 CERROR("prep_req failed: %d\n", rc);
2571 req->rq_commit_cb = brw_commit;
2572 req->rq_interpret_reply = brw_interpret;
2573 req->rq_memalloc = mem_tight != 0;
2574 oap->oap_request = ptlrpc_request_addref(req);
2576 req->rq_no_resend = req->rq_no_delay = 1;
2577 /* probably set a shorter timeout value.
2578 * to handle ETIMEDOUT in brw_interpret() correctly. */
2579 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2582 /* Need to update the timestamps after the request is built in case
2583 * we race with setattr (locally or in queue at OST). If OST gets
2584 * later setattr before earlier BRW (as determined by the request xid),
2585 * the OST will not use BRW timestamps. Sadly, there is no obvious
2586 * way to do this in a single call. bug 10150 */
2587 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2588 crattr->cra_oa = &body->oa;
2589 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2590 cl_req_attr_set(env, osc2cl(obj), crattr);
2591 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2593 aa = ptlrpc_req_async_args(aa, req);
2594 INIT_LIST_HEAD(&aa->aa_oaps);
2595 list_splice_init(&rpc_list, &aa->aa_oaps);
2596 INIT_LIST_HEAD(&aa->aa_exts);
2597 list_splice_init(ext_list, &aa->aa_exts);
2599 spin_lock(&cli->cl_loi_list_lock);
2600 starting_offset >>= PAGE_SHIFT;
2601 if (cmd == OBD_BRW_READ) {
2602 cli->cl_r_in_flight++;
2603 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2604 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2605 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2606 starting_offset + 1);
2608 cli->cl_w_in_flight++;
2609 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2610 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2611 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2612 starting_offset + 1);
2614 spin_unlock(&cli->cl_loi_list_lock);
2616 DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2617 page_count, aa, cli->cl_r_in_flight,
2618 cli->cl_w_in_flight);
2619 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2621 ptlrpcd_add_req(req);
2627 memalloc_noreclaim_restore(mpflag);
2630 LASSERT(req == NULL);
2633 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2635 osc_release_bounce_pages(pga, page_count);
2636 osc_release_ppga(pga, page_count);
2638 /* this should happen rarely and is pretty bad, it makes the
2639 * pending list not follow the dirty order */
2640 while (!list_empty(ext_list)) {
2641 ext = list_entry(ext_list->next, struct osc_extent,
2643 list_del_init(&ext->oe_link);
2644 osc_extent_finish(env, ext, 0, rc);
2650 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2654 LASSERT(lock != NULL);
2656 lock_res_and_lock(lock);
2658 if (lock->l_ast_data == NULL)
2659 lock->l_ast_data = data;
2660 if (lock->l_ast_data == data)
2663 unlock_res_and_lock(lock);
2668 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2669 void *cookie, struct lustre_handle *lockh,
2670 enum ldlm_mode mode, __u64 *flags, bool speculative,
2673 bool intent = *flags & LDLM_FL_HAS_INTENT;
2677 /* The request was created before ldlm_cli_enqueue call. */
2678 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2679 struct ldlm_reply *rep;
2681 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2682 LASSERT(rep != NULL);
2684 rep->lock_policy_res1 =
2685 ptlrpc_status_ntoh(rep->lock_policy_res1);
2686 if (rep->lock_policy_res1)
2687 errcode = rep->lock_policy_res1;
2689 *flags |= LDLM_FL_LVB_READY;
2690 } else if (errcode == ELDLM_OK) {
2691 *flags |= LDLM_FL_LVB_READY;
2694 /* Call the update callback. */
2695 rc = (*upcall)(cookie, lockh, errcode);
2697 /* release the reference taken in ldlm_cli_enqueue() */
2698 if (errcode == ELDLM_LOCK_MATCHED)
2700 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2701 ldlm_lock_decref(lockh, mode);
2706 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2709 struct osc_enqueue_args *aa = args;
2710 struct ldlm_lock *lock;
2711 struct lustre_handle *lockh = &aa->oa_lockh;
2712 enum ldlm_mode mode = aa->oa_mode;
2713 struct ost_lvb *lvb = aa->oa_lvb;
2714 __u32 lvb_len = sizeof(*lvb);
2716 struct ldlm_enqueue_info einfo = {
2717 .ei_type = aa->oa_type,
2723 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2725 lock = ldlm_handle2lock(lockh);
2726 LASSERTF(lock != NULL,
2727 "lockh %#llx, req %p, aa %p - client evicted?\n",
2728 lockh->cookie, req, aa);
2730 /* Take an additional reference so that a blocking AST that
2731 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2732 * to arrive after an upcall has been executed by
2733 * osc_enqueue_fini(). */
2734 ldlm_lock_addref(lockh, mode);
2736 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2737 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2739 /* Let CP AST to grant the lock first. */
2740 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2742 if (aa->oa_speculative) {
2743 LASSERT(aa->oa_lvb == NULL);
2744 LASSERT(aa->oa_flags == NULL);
2745 aa->oa_flags = &flags;
2748 /* Complete obtaining the lock procedure. */
2749 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags,
2750 lvb, lvb_len, lockh, rc);
2751 /* Complete osc stuff. */
2752 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2753 aa->oa_flags, aa->oa_speculative, rc);
2755 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2757 ldlm_lock_decref(lockh, mode);
2758 LDLM_LOCK_PUT(lock);
2762 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2763 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2764 * other synchronous requests, however keeping some locks and trying to obtain
2765 * others may take a considerable amount of time in a case of ost failure; and
2766 * when other sync requests do not get released lock from a client, the client
2767 * is evicted from the cluster -- such scenarious make the life difficult, so
2768 * release locks just after they are obtained. */
2769 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2770 __u64 *flags, union ldlm_policy_data *policy,
2771 struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2772 void *cookie, struct ldlm_enqueue_info *einfo,
2773 struct ptlrpc_request_set *rqset, int async,
2776 struct obd_device *obd = exp->exp_obd;
2777 struct lustre_handle lockh = { 0 };
2778 struct ptlrpc_request *req = NULL;
2779 int intent = *flags & LDLM_FL_HAS_INTENT;
2780 __u64 match_flags = *flags;
2781 enum ldlm_mode mode;
2785 /* Filesystem lock extents are extended to page boundaries so that
2786 * dealing with the page cache is a little smoother. */
2787 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2788 policy->l_extent.end |= ~PAGE_MASK;
2790 /* Next, search for already existing extent locks that will cover us */
2791 /* If we're trying to read, we also search for an existing PW lock. The
2792 * VFS and page cache already protect us locally, so lots of readers/
2793 * writers can share a single PW lock.
2795 * There are problems with conversion deadlocks, so instead of
2796 * converting a read lock to a write lock, we'll just enqueue a new
2799 * At some point we should cancel the read lock instead of making them
2800 * send us a blocking callback, but there are problems with canceling
2801 * locks out from other users right now, too. */
2802 mode = einfo->ei_mode;
2803 if (einfo->ei_mode == LCK_PR)
2805 /* Normal lock requests must wait for the LVB to be ready before
2806 * matching a lock; speculative lock requests do not need to,
2807 * because they will not actually use the lock. */
2809 match_flags |= LDLM_FL_LVB_READY;
2811 match_flags |= LDLM_FL_BLOCK_GRANTED;
2812 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2813 einfo->ei_type, policy, mode, &lockh);
2815 struct ldlm_lock *matched;
2817 if (*flags & LDLM_FL_TEST_LOCK)
2820 matched = ldlm_handle2lock(&lockh);
2822 /* This DLM lock request is speculative, and does not
2823 * have an associated IO request. Therefore if there
2824 * is already a DLM lock, it wll just inform the
2825 * caller to cancel the request for this stripe.*/
2826 lock_res_and_lock(matched);
2827 if (ldlm_extent_equal(&policy->l_extent,
2828 &matched->l_policy_data.l_extent))
2832 unlock_res_and_lock(matched);
2834 ldlm_lock_decref(&lockh, mode);
2835 LDLM_LOCK_PUT(matched);
2837 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2838 *flags |= LDLM_FL_LVB_READY;
2840 /* We already have a lock, and it's referenced. */
2841 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2843 ldlm_lock_decref(&lockh, mode);
2844 LDLM_LOCK_PUT(matched);
2847 ldlm_lock_decref(&lockh, mode);
2848 LDLM_LOCK_PUT(matched);
2852 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2855 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2856 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2858 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2859 sizeof(*lvb), LVB_T_OST, &lockh, async);
2862 struct osc_enqueue_args *aa;
2863 aa = ptlrpc_req_async_args(aa, req);
2865 aa->oa_mode = einfo->ei_mode;
2866 aa->oa_type = einfo->ei_type;
2867 lustre_handle_copy(&aa->oa_lockh, &lockh);
2868 aa->oa_upcall = upcall;
2869 aa->oa_cookie = cookie;
2870 aa->oa_speculative = speculative;
2872 aa->oa_flags = flags;
2875 /* speculative locks are essentially to enqueue
2876 * a DLM lock in advance, so we don't care
2877 * about the result of the enqueue. */
2879 aa->oa_flags = NULL;
2882 req->rq_interpret_reply = osc_enqueue_interpret;
2883 ptlrpc_set_add_req(rqset, req);
2888 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2889 flags, speculative, rc);
2894 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2895 struct ldlm_res_id *res_id, enum ldlm_type type,
2896 union ldlm_policy_data *policy, enum ldlm_mode mode,
2897 __u64 *flags, struct osc_object *obj,
2898 struct lustre_handle *lockh, enum ldlm_match_flags match_flags)
2900 struct obd_device *obd = exp->exp_obd;
2901 __u64 lflags = *flags;
2905 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2908 /* Filesystem lock extents are extended to page boundaries so that
2909 * dealing with the page cache is a little smoother */
2910 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2911 policy->l_extent.end |= ~PAGE_MASK;
2913 /* Next, search for already existing extent locks that will cover us */
2914 rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
2915 res_id, type, policy, mode, lockh,
2917 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2921 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2923 LASSERT(lock != NULL);
2924 if (osc_set_lock_data(lock, obj)) {
2925 lock_res_and_lock(lock);
2926 if (!ldlm_is_lvb_cached(lock)) {
2927 LASSERT(lock->l_ast_data == obj);
2928 osc_lock_lvb_update(env, obj, lock, NULL);
2929 ldlm_set_lvb_cached(lock);
2931 unlock_res_and_lock(lock);
2933 ldlm_lock_decref(lockh, rc);
2936 LDLM_LOCK_PUT(lock);
2941 static int osc_statfs_interpret(const struct lu_env *env,
2942 struct ptlrpc_request *req, void *args, int rc)
2944 struct osc_async_args *aa = args;
2945 struct obd_statfs *msfs;
2950 * The request has in fact never been sent due to issues at
2951 * a higher level (LOV). Exit immediately since the caller
2952 * is aware of the problem and takes care of the clean up.
2956 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2957 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2963 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2965 GOTO(out, rc = -EPROTO);
2967 *aa->aa_oi->oi_osfs = *msfs;
2969 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2974 static int osc_statfs_async(struct obd_export *exp,
2975 struct obd_info *oinfo, time64_t max_age,
2976 struct ptlrpc_request_set *rqset)
2978 struct obd_device *obd = class_exp2obd(exp);
2979 struct ptlrpc_request *req;
2980 struct osc_async_args *aa;
2984 if (obd->obd_osfs_age >= max_age) {
2986 "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2987 obd->obd_name, &obd->obd_osfs,
2988 obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2989 obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2990 spin_lock(&obd->obd_osfs_lock);
2991 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2992 spin_unlock(&obd->obd_osfs_lock);
2993 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2994 if (oinfo->oi_cb_up)
2995 oinfo->oi_cb_up(oinfo, 0);
3000 /* We could possibly pass max_age in the request (as an absolute
3001 * timestamp or a "seconds.usec ago") so the target can avoid doing
3002 * extra calls into the filesystem if that isn't necessary (e.g.
3003 * during mount that would help a bit). Having relative timestamps
3004 * is not so great if request processing is slow, while absolute
3005 * timestamps are not ideal because they need time synchronization. */
3006 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3010 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3012 ptlrpc_request_free(req);
3015 ptlrpc_request_set_replen(req);
3016 req->rq_request_portal = OST_CREATE_PORTAL;
3017 ptlrpc_at_set_req_timeout(req);
3019 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3020 /* procfs requests not want stat in wait for avoid deadlock */
3021 req->rq_no_resend = 1;
3022 req->rq_no_delay = 1;
3025 req->rq_interpret_reply = osc_statfs_interpret;
3026 aa = ptlrpc_req_async_args(aa, req);
3029 ptlrpc_set_add_req(rqset, req);
3033 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
3034 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
3036 struct obd_device *obd = class_exp2obd(exp);
3037 struct obd_statfs *msfs;
3038 struct ptlrpc_request *req;
3039 struct obd_import *imp = NULL;
3044 /*Since the request might also come from lprocfs, so we need
3045 *sync this with client_disconnect_export Bug15684*/
3046 down_read(&obd->u.cli.cl_sem);
3047 if (obd->u.cli.cl_import)
3048 imp = class_import_get(obd->u.cli.cl_import);
3049 up_read(&obd->u.cli.cl_sem);
3053 /* We could possibly pass max_age in the request (as an absolute
3054 * timestamp or a "seconds.usec ago") so the target can avoid doing
3055 * extra calls into the filesystem if that isn't necessary (e.g.
3056 * during mount that would help a bit). Having relative timestamps
3057 * is not so great if request processing is slow, while absolute
3058 * timestamps are not ideal because they need time synchronization. */
3059 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3061 class_import_put(imp);
3066 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3068 ptlrpc_request_free(req);
3071 ptlrpc_request_set_replen(req);
3072 req->rq_request_portal = OST_CREATE_PORTAL;
3073 ptlrpc_at_set_req_timeout(req);
3075 if (flags & OBD_STATFS_NODELAY) {
3076 /* procfs requests not want stat in wait for avoid deadlock */
3077 req->rq_no_resend = 1;
3078 req->rq_no_delay = 1;
3081 rc = ptlrpc_queue_wait(req);
3085 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3087 GOTO(out, rc = -EPROTO);
3093 ptlrpc_req_finished(req);
3097 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3098 void *karg, void __user *uarg)
3100 struct obd_device *obd = exp->exp_obd;
3101 struct obd_ioctl_data *data = karg;
3105 if (!try_module_get(THIS_MODULE)) {
3106 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
3107 module_name(THIS_MODULE));
3111 case OBD_IOC_CLIENT_RECOVER:
3112 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
3113 data->ioc_inlbuf1, 0);
3117 case IOC_OSC_SET_ACTIVE:
3118 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
3123 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
3124 obd->obd_name, cmd, current->comm, rc);
3128 module_put(THIS_MODULE);
3132 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3133 u32 keylen, void *key, u32 vallen, void *val,
3134 struct ptlrpc_request_set *set)
3136 struct ptlrpc_request *req;
3137 struct obd_device *obd = exp->exp_obd;
3138 struct obd_import *imp = class_exp2cliimp(exp);
3143 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3145 if (KEY_IS(KEY_CHECKSUM)) {
3146 if (vallen != sizeof(int))
3148 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3152 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3153 sptlrpc_conf_client_adapt(obd);
3157 if (KEY_IS(KEY_FLUSH_CTX)) {
3158 sptlrpc_import_flush_my_ctx(imp);
3162 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3163 struct client_obd *cli = &obd->u.cli;
3164 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
3165 long target = *(long *)val;
3167 nr = osc_lru_shrink(env, cli, min(nr, target), true);
3172 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3175 /* We pass all other commands directly to OST. Since nobody calls osc
3176 methods directly and everybody is supposed to go through LOV, we
3177 assume lov checked invalid values for us.
3178 The only recognised values so far are evict_by_nid and mds_conn.
3179 Even if something bad goes through, we'd get a -EINVAL from OST
3182 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3183 &RQF_OST_SET_GRANT_INFO :
3188 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3189 RCL_CLIENT, keylen);
3190 if (!KEY_IS(KEY_GRANT_SHRINK))
3191 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3192 RCL_CLIENT, vallen);
3193 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3195 ptlrpc_request_free(req);
3199 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3200 memcpy(tmp, key, keylen);
3201 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3204 memcpy(tmp, val, vallen);
3206 if (KEY_IS(KEY_GRANT_SHRINK)) {
3207 struct osc_grant_args *aa;
3210 aa = ptlrpc_req_async_args(aa, req);
3211 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
3213 ptlrpc_req_finished(req);
3216 *oa = ((struct ost_body *)val)->oa;
3218 req->rq_interpret_reply = osc_shrink_grant_interpret;
3221 ptlrpc_request_set_replen(req);
3222 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3223 LASSERT(set != NULL);
3224 ptlrpc_set_add_req(set, req);
3225 ptlrpc_check_set(NULL, set);
3227 ptlrpcd_add_req(req);
3232 EXPORT_SYMBOL(osc_set_info_async);
3234 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
3235 struct obd_device *obd, struct obd_uuid *cluuid,
3236 struct obd_connect_data *data, void *localdata)
3238 struct client_obd *cli = &obd->u.cli;
3240 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3244 spin_lock(&cli->cl_loi_list_lock);
3245 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3246 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3247 /* restore ocd_grant_blkbits as client page bits */
3248 data->ocd_grant_blkbits = PAGE_SHIFT;
3249 grant += cli->cl_dirty_grant;
3251 grant += cli->cl_dirty_pages << PAGE_SHIFT;
3253 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3254 lost_grant = cli->cl_lost_grant;
3255 cli->cl_lost_grant = 0;
3256 spin_unlock(&cli->cl_loi_list_lock);
3258 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3259 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3260 data->ocd_version, data->ocd_grant, lost_grant);
3265 EXPORT_SYMBOL(osc_reconnect);
3267 int osc_disconnect(struct obd_export *exp)
3269 struct obd_device *obd = class_exp2obd(exp);
3272 rc = client_disconnect_export(exp);
3274 * Initially we put del_shrink_grant before disconnect_export, but it
3275 * causes the following problem if setup (connect) and cleanup
3276 * (disconnect) are tangled together.
3277 * connect p1 disconnect p2
3278 * ptlrpc_connect_import
3279 * ............... class_manual_cleanup
3282 * ptlrpc_connect_interrupt
3284 * add this client to shrink list
3286 * Bang! grant shrink thread trigger the shrink. BUG18662
3288 osc_del_grant_list(&obd->u.cli);
3291 EXPORT_SYMBOL(osc_disconnect);
3293 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3294 struct hlist_node *hnode, void *arg)
3296 struct lu_env *env = arg;
3297 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3298 struct ldlm_lock *lock;
3299 struct osc_object *osc = NULL;
3303 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3304 if (lock->l_ast_data != NULL && osc == NULL) {
3305 osc = lock->l_ast_data;
3306 cl_object_get(osc2cl(osc));
3309 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3310 * by the 2nd round of ldlm_namespace_clean() call in
3311 * osc_import_event(). */
3312 ldlm_clear_cleaned(lock);
3317 osc_object_invalidate(env, osc);
3318 cl_object_put(env, osc2cl(osc));
3323 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3325 static int osc_import_event(struct obd_device *obd,
3326 struct obd_import *imp,
3327 enum obd_import_event event)
3329 struct client_obd *cli;
3333 LASSERT(imp->imp_obd == obd);
3336 case IMP_EVENT_DISCON: {
3338 spin_lock(&cli->cl_loi_list_lock);
3339 cli->cl_avail_grant = 0;
3340 cli->cl_lost_grant = 0;
3341 spin_unlock(&cli->cl_loi_list_lock);
3344 case IMP_EVENT_INACTIVE: {
3345 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3348 case IMP_EVENT_INVALIDATE: {
3349 struct ldlm_namespace *ns = obd->obd_namespace;
3353 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3355 env = cl_env_get(&refcheck);
3357 osc_io_unplug(env, &obd->u.cli, NULL);
3359 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3360 osc_ldlm_resource_invalidate,
3362 cl_env_put(env, &refcheck);
3364 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3369 case IMP_EVENT_ACTIVE: {
3370 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3373 case IMP_EVENT_OCD: {
3374 struct obd_connect_data *ocd = &imp->imp_connect_data;
3376 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3377 osc_init_grant(&obd->u.cli, ocd);
3380 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3381 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3383 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3386 case IMP_EVENT_DEACTIVATE: {
3387 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3390 case IMP_EVENT_ACTIVATE: {
3391 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3395 CERROR("Unknown import event %d\n", event);
3402 * Determine whether the lock can be canceled before replaying the lock
3403 * during recovery, see bug16774 for detailed information.
3405 * \retval zero the lock can't be canceled
3406 * \retval other ok to cancel
3408 static int osc_cancel_weight(struct ldlm_lock *lock)
3411 * Cancel all unused and granted extent lock.
3413 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3414 ldlm_is_granted(lock) &&
3415 osc_ldlm_weigh_ast(lock) == 0)
3421 static int brw_queue_work(const struct lu_env *env, void *data)
3423 struct client_obd *cli = data;
3425 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3427 osc_io_unplug(env, cli, NULL);
3431 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3433 struct client_obd *cli = &obd->u.cli;
3439 rc = ptlrpcd_addref();
3443 rc = client_obd_setup(obd, lcfg);
3445 GOTO(out_ptlrpcd, rc);
3448 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3449 if (IS_ERR(handler))
3450 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3451 cli->cl_writeback_work = handler;
3453 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3454 if (IS_ERR(handler))
3455 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3456 cli->cl_lru_work = handler;
3458 rc = osc_quota_setup(obd);
3460 GOTO(out_ptlrpcd_work, rc);
3462 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3463 osc_update_next_shrink(cli);
3468 if (cli->cl_writeback_work != NULL) {
3469 ptlrpcd_destroy_work(cli->cl_writeback_work);
3470 cli->cl_writeback_work = NULL;
3472 if (cli->cl_lru_work != NULL) {
3473 ptlrpcd_destroy_work(cli->cl_lru_work);
3474 cli->cl_lru_work = NULL;
3476 client_obd_cleanup(obd);
3481 EXPORT_SYMBOL(osc_setup_common);
3483 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3485 struct client_obd *cli = &obd->u.cli;
3493 rc = osc_setup_common(obd, lcfg);
3497 rc = osc_tunables_init(obd);
3502 * We try to control the total number of requests with a upper limit
3503 * osc_reqpool_maxreqcount. There might be some race which will cause
3504 * over-limit allocation, but it is fine.
3506 req_count = atomic_read(&osc_pool_req_count);
3507 if (req_count < osc_reqpool_maxreqcount) {
3508 adding = cli->cl_max_rpcs_in_flight + 2;
3509 if (req_count + adding > osc_reqpool_maxreqcount)
3510 adding = osc_reqpool_maxreqcount - req_count;
3512 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3513 atomic_add(added, &osc_pool_req_count);
3516 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3518 spin_lock(&osc_shrink_lock);
3519 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3520 spin_unlock(&osc_shrink_lock);
3521 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3522 cli->cl_import->imp_idle_debug = D_HA;
3527 int osc_precleanup_common(struct obd_device *obd)
3529 struct client_obd *cli = &obd->u.cli;
3533 * for echo client, export may be on zombie list, wait for
3534 * zombie thread to cull it, because cli.cl_import will be
3535 * cleared in client_disconnect_export():
3536 * class_export_destroy() -> obd_cleanup() ->
3537 * echo_device_free() -> echo_client_cleanup() ->
3538 * obd_disconnect() -> osc_disconnect() ->
3539 * client_disconnect_export()
3541 obd_zombie_barrier();
3542 if (cli->cl_writeback_work) {
3543 ptlrpcd_destroy_work(cli->cl_writeback_work);
3544 cli->cl_writeback_work = NULL;
3547 if (cli->cl_lru_work) {
3548 ptlrpcd_destroy_work(cli->cl_lru_work);
3549 cli->cl_lru_work = NULL;
3552 obd_cleanup_client_import(obd);
3555 EXPORT_SYMBOL(osc_precleanup_common);
3557 static int osc_precleanup(struct obd_device *obd)
3561 osc_precleanup_common(obd);
3563 ptlrpc_lprocfs_unregister_obd(obd);
3567 int osc_cleanup_common(struct obd_device *obd)
3569 struct client_obd *cli = &obd->u.cli;
3574 spin_lock(&osc_shrink_lock);
3575 list_del(&cli->cl_shrink_list);
3576 spin_unlock(&osc_shrink_lock);
3579 if (cli->cl_cache != NULL) {
3580 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3581 spin_lock(&cli->cl_cache->ccc_lru_lock);
3582 list_del_init(&cli->cl_lru_osc);
3583 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3584 cli->cl_lru_left = NULL;
3585 cl_cache_decref(cli->cl_cache);
3586 cli->cl_cache = NULL;
3589 /* free memory of osc quota cache */
3590 osc_quota_cleanup(obd);
3592 rc = client_obd_cleanup(obd);
3597 EXPORT_SYMBOL(osc_cleanup_common);
3599 static const struct obd_ops osc_obd_ops = {
3600 .o_owner = THIS_MODULE,
3601 .o_setup = osc_setup,
3602 .o_precleanup = osc_precleanup,
3603 .o_cleanup = osc_cleanup_common,
3604 .o_add_conn = client_import_add_conn,
3605 .o_del_conn = client_import_del_conn,
3606 .o_connect = client_connect_import,
3607 .o_reconnect = osc_reconnect,
3608 .o_disconnect = osc_disconnect,
3609 .o_statfs = osc_statfs,
3610 .o_statfs_async = osc_statfs_async,
3611 .o_create = osc_create,
3612 .o_destroy = osc_destroy,
3613 .o_getattr = osc_getattr,
3614 .o_setattr = osc_setattr,
3615 .o_iocontrol = osc_iocontrol,
3616 .o_set_info_async = osc_set_info_async,
3617 .o_import_event = osc_import_event,
3618 .o_quotactl = osc_quotactl,
3621 static struct shrinker *osc_cache_shrinker;
3622 LIST_HEAD(osc_shrink_list);
3623 DEFINE_SPINLOCK(osc_shrink_lock);
3625 #ifndef HAVE_SHRINKER_COUNT
3626 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3628 struct shrink_control scv = {
3629 .nr_to_scan = shrink_param(sc, nr_to_scan),
3630 .gfp_mask = shrink_param(sc, gfp_mask)
3632 (void)osc_cache_shrink_scan(shrinker, &scv);
3634 return osc_cache_shrink_count(shrinker, &scv);
3638 static int __init osc_init(void)
3640 unsigned int reqpool_size;
3641 unsigned int reqsize;
3643 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3644 osc_cache_shrink_count, osc_cache_shrink_scan);
3647 /* print an address of _any_ initialized kernel symbol from this
3648 * module, to allow debugging with gdb that doesn't support data
3649 * symbols from modules.*/
3650 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3652 rc = lu_kmem_init(osc_caches);
3656 rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3657 LUSTRE_OSC_NAME, &osc_device_type);
3661 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3663 /* This is obviously too much memory, only prevent overflow here */
3664 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3665 GOTO(out_type, rc = -EINVAL);
3667 reqpool_size = osc_reqpool_mem_max << 20;
3670 while (reqsize < OST_IO_MAXREQSIZE)
3671 reqsize = reqsize << 1;
3674 * We don't enlarge the request count in OSC pool according to
3675 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3676 * tried after normal allocation failed. So a small OSC pool won't
3677 * cause much performance degression in most of cases.
3679 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3681 atomic_set(&osc_pool_req_count, 0);
3682 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3683 ptlrpc_add_rqs_to_pool);
3685 if (osc_rq_pool == NULL)
3686 GOTO(out_type, rc = -ENOMEM);
3688 rc = osc_start_grant_work();
3690 GOTO(out_req_pool, rc);
3695 ptlrpc_free_rq_pool(osc_rq_pool);
3697 class_unregister_type(LUSTRE_OSC_NAME);
3699 lu_kmem_fini(osc_caches);
3704 static void __exit osc_exit(void)
3706 osc_stop_grant_work();
3707 remove_shrinker(osc_cache_shrinker);
3708 class_unregister_type(LUSTRE_OSC_NAME);
3709 lu_kmem_fini(osc_caches);
3710 ptlrpc_free_rq_pool(osc_rq_pool);
3713 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3714 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3715 MODULE_VERSION(LUSTRE_VERSION_STRING);
3716 MODULE_LICENSE("GPL");
3718 module_init(osc_init);
3719 module_exit(osc_exit);