4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
49 #include "osc_internal.h"
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
62 #define osc_grant_args osc_brw_async_args
64 struct osc_setattr_args {
66 obd_enqueue_update_f sa_upcall;
70 struct osc_fsync_args {
71 struct osc_object *fa_obj;
73 obd_enqueue_update_f fa_upcall;
77 struct osc_ladvise_args {
79 obd_enqueue_update_f la_upcall;
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
89 struct ost_body *body;
91 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100 struct ptlrpc_request *req;
101 struct ost_body *body;
105 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
111 ptlrpc_request_free(req);
115 osc_pack_req_body(req, oa);
117 ptlrpc_request_set_replen(req);
119 rc = ptlrpc_queue_wait(req);
123 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
125 GOTO(out, rc = -EPROTO);
127 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
130 oa->o_blksize = cli_brw_size(exp->exp_obd);
131 oa->o_valid |= OBD_MD_FLBLKSZ;
135 ptlrpc_req_finished(req);
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143 struct ptlrpc_request *req;
144 struct ost_body *body;
148 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
150 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
156 ptlrpc_request_free(req);
160 osc_pack_req_body(req, oa);
162 ptlrpc_request_set_replen(req);
164 rc = ptlrpc_queue_wait(req);
168 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
170 GOTO(out, rc = -EPROTO);
172 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176 ptlrpc_req_finished(req);
181 static int osc_setattr_interpret(const struct lu_env *env,
182 struct ptlrpc_request *req, void *args, int rc)
184 struct osc_setattr_args *sa = args;
185 struct ost_body *body;
192 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194 GOTO(out, rc = -EPROTO);
196 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199 rc = sa->sa_upcall(sa->sa_cookie, rc);
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204 obd_enqueue_update_f upcall, void *cookie,
205 struct ptlrpc_request_set *rqset)
207 struct ptlrpc_request *req;
208 struct osc_setattr_args *sa;
213 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
219 ptlrpc_request_free(req);
223 osc_pack_req_body(req, oa);
225 ptlrpc_request_set_replen(req);
227 /* do mds to ost setattr asynchronously */
229 /* Do not wait for response. */
230 ptlrpcd_add_req(req);
232 req->rq_interpret_reply = osc_setattr_interpret;
234 sa = ptlrpc_req_async_args(sa, req);
236 sa->sa_upcall = upcall;
237 sa->sa_cookie = cookie;
239 ptlrpc_set_add_req(rqset, req);
245 static int osc_ladvise_interpret(const struct lu_env *env,
246 struct ptlrpc_request *req,
249 struct osc_ladvise_args *la = arg;
250 struct ost_body *body;
256 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
258 GOTO(out, rc = -EPROTO);
260 *la->la_oa = body->oa;
262 rc = la->la_upcall(la->la_cookie, rc);
267 * If rqset is NULL, do not wait for response. Upcall and cookie could also
268 * be NULL in this case
270 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
271 struct ladvise_hdr *ladvise_hdr,
272 obd_enqueue_update_f upcall, void *cookie,
273 struct ptlrpc_request_set *rqset)
275 struct ptlrpc_request *req;
276 struct ost_body *body;
277 struct osc_ladvise_args *la;
279 struct lu_ladvise *req_ladvise;
280 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
281 int num_advise = ladvise_hdr->lah_count;
282 struct ladvise_hdr *req_ladvise_hdr;
285 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
289 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
290 num_advise * sizeof(*ladvise));
291 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
293 ptlrpc_request_free(req);
296 req->rq_request_portal = OST_IO_PORTAL;
297 ptlrpc_at_set_req_timeout(req);
299 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
301 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
304 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
305 &RMF_OST_LADVISE_HDR);
306 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
308 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
309 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
310 ptlrpc_request_set_replen(req);
313 /* Do not wait for response. */
314 ptlrpcd_add_req(req);
318 req->rq_interpret_reply = osc_ladvise_interpret;
319 la = ptlrpc_req_async_args(la, req);
321 la->la_upcall = upcall;
322 la->la_cookie = cookie;
324 ptlrpc_set_add_req(rqset, req);
329 static int osc_create(const struct lu_env *env, struct obd_export *exp,
332 struct ptlrpc_request *req;
333 struct ost_body *body;
338 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
339 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
341 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
343 GOTO(out, rc = -ENOMEM);
345 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
347 ptlrpc_request_free(req);
351 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
354 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
356 ptlrpc_request_set_replen(req);
358 rc = ptlrpc_queue_wait(req);
362 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
364 GOTO(out_req, rc = -EPROTO);
366 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
367 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
369 oa->o_blksize = cli_brw_size(exp->exp_obd);
370 oa->o_valid |= OBD_MD_FLBLKSZ;
372 CDEBUG(D_HA, "transno: %lld\n",
373 lustre_msg_get_transno(req->rq_repmsg));
375 ptlrpc_req_finished(req);
380 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
381 obd_enqueue_update_f upcall, void *cookie)
383 struct ptlrpc_request *req;
384 struct osc_setattr_args *sa;
385 struct obd_import *imp = class_exp2cliimp(exp);
386 struct ost_body *body;
391 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
395 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
397 ptlrpc_request_free(req);
401 osc_set_io_portal(req);
403 ptlrpc_at_set_req_timeout(req);
405 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
407 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
409 ptlrpc_request_set_replen(req);
411 req->rq_interpret_reply = osc_setattr_interpret;
412 sa = ptlrpc_req_async_args(sa, req);
414 sa->sa_upcall = upcall;
415 sa->sa_cookie = cookie;
417 ptlrpcd_add_req(req);
421 EXPORT_SYMBOL(osc_punch_send);
423 static int osc_sync_interpret(const struct lu_env *env,
424 struct ptlrpc_request *req, void *args, int rc)
426 struct osc_fsync_args *fa = args;
427 struct ost_body *body;
428 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
429 unsigned long valid = 0;
430 struct cl_object *obj;
436 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
438 CERROR("can't unpack ost_body\n");
439 GOTO(out, rc = -EPROTO);
442 *fa->fa_oa = body->oa;
443 obj = osc2cl(fa->fa_obj);
445 /* Update osc object's blocks attribute */
446 cl_object_attr_lock(obj);
447 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
448 attr->cat_blocks = body->oa.o_blocks;
453 cl_object_attr_update(env, obj, attr, valid);
454 cl_object_attr_unlock(obj);
457 rc = fa->fa_upcall(fa->fa_cookie, rc);
461 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
462 obd_enqueue_update_f upcall, void *cookie,
463 struct ptlrpc_request_set *rqset)
465 struct obd_export *exp = osc_export(obj);
466 struct ptlrpc_request *req;
467 struct ost_body *body;
468 struct osc_fsync_args *fa;
472 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
476 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
478 ptlrpc_request_free(req);
482 /* overload the size and blocks fields in the oa with start/end */
483 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
485 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
487 ptlrpc_request_set_replen(req);
488 req->rq_interpret_reply = osc_sync_interpret;
490 fa = ptlrpc_req_async_args(fa, req);
493 fa->fa_upcall = upcall;
494 fa->fa_cookie = cookie;
496 ptlrpc_set_add_req(rqset, req);
501 /* Find and cancel locally locks matched by @mode in the resource found by
502 * @objid. Found locks are added into @cancel list. Returns the amount of
503 * locks added to @cancels list. */
504 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
505 struct list_head *cancels,
506 enum ldlm_mode mode, __u64 lock_flags)
508 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
509 struct ldlm_res_id res_id;
510 struct ldlm_resource *res;
514 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
515 * export) but disabled through procfs (flag in NS).
517 * This distinguishes from a case when ELC is not supported originally,
518 * when we still want to cancel locks in advance and just cancel them
519 * locally, without sending any RPC. */
520 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
523 ostid_build_res_name(&oa->o_oi, &res_id);
524 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
528 LDLM_RESOURCE_ADDREF(res);
529 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
530 lock_flags, 0, NULL);
531 LDLM_RESOURCE_DELREF(res);
532 ldlm_resource_putref(res);
536 static int osc_destroy_interpret(const struct lu_env *env,
537 struct ptlrpc_request *req, void *args, int rc)
539 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
541 atomic_dec(&cli->cl_destroy_in_flight);
542 wake_up(&cli->cl_destroy_waitq);
547 static int osc_can_send_destroy(struct client_obd *cli)
549 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
550 cli->cl_max_rpcs_in_flight) {
551 /* The destroy request can be sent */
554 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
555 cli->cl_max_rpcs_in_flight) {
557 * The counter has been modified between the two atomic
560 wake_up(&cli->cl_destroy_waitq);
565 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
568 struct client_obd *cli = &exp->exp_obd->u.cli;
569 struct ptlrpc_request *req;
570 struct ost_body *body;
576 CDEBUG(D_INFO, "oa NULL\n");
580 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
581 LDLM_FL_DISCARD_DATA);
583 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
585 ldlm_lock_list_put(&cancels, l_bl_ast, count);
589 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
592 ptlrpc_request_free(req);
596 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
597 ptlrpc_at_set_req_timeout(req);
599 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
601 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
603 ptlrpc_request_set_replen(req);
605 req->rq_interpret_reply = osc_destroy_interpret;
606 if (!osc_can_send_destroy(cli)) {
608 * Wait until the number of on-going destroy RPCs drops
609 * under max_rpc_in_flight
611 rc = l_wait_event_abortable_exclusive(
612 cli->cl_destroy_waitq,
613 osc_can_send_destroy(cli));
615 ptlrpc_req_finished(req);
620 /* Do not wait for response */
621 ptlrpcd_add_req(req);
625 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
628 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
630 LASSERT(!(oa->o_valid & bits));
633 spin_lock(&cli->cl_loi_list_lock);
634 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
635 oa->o_dirty = cli->cl_dirty_grant;
637 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
638 if (unlikely(cli->cl_dirty_pages > cli->cl_dirty_max_pages)) {
639 CERROR("dirty %lu > dirty_max %lu\n",
641 cli->cl_dirty_max_pages);
643 } else if (unlikely(atomic_long_read(&obd_dirty_pages) >
644 (long)(obd_max_dirty_pages + 1))) {
645 /* The atomic_read() allowing the atomic_inc() are
646 * not covered by a lock thus they may safely race and trip
647 * this CERROR() unless we add in a small fudge factor (+1). */
648 CERROR("%s: dirty %ld > system dirty_max %ld\n",
649 cli_name(cli), atomic_long_read(&obd_dirty_pages),
650 obd_max_dirty_pages);
652 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
654 CERROR("dirty %lu - dirty_max %lu too big???\n",
655 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
658 unsigned long nrpages;
659 unsigned long undirty;
661 nrpages = cli->cl_max_pages_per_rpc;
662 nrpages *= cli->cl_max_rpcs_in_flight + 1;
663 nrpages = max(nrpages, cli->cl_dirty_max_pages);
664 undirty = nrpages << PAGE_SHIFT;
665 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
669 /* take extent tax into account when asking for more
671 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
672 cli->cl_max_extent_pages;
673 undirty += nrextents * cli->cl_grant_extent_tax;
675 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
676 * to add extent tax, etc.
678 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
679 ~(PTLRPC_MAX_BRW_SIZE * 4UL));
681 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
682 oa->o_dropped = cli->cl_lost_grant;
683 cli->cl_lost_grant = 0;
684 spin_unlock(&cli->cl_loi_list_lock);
685 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
686 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
689 void osc_update_next_shrink(struct client_obd *cli)
691 cli->cl_next_shrink_grant = ktime_get_seconds() +
692 cli->cl_grant_shrink_interval;
694 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
695 cli->cl_next_shrink_grant);
698 static void __osc_update_grant(struct client_obd *cli, u64 grant)
700 spin_lock(&cli->cl_loi_list_lock);
701 cli->cl_avail_grant += grant;
702 spin_unlock(&cli->cl_loi_list_lock);
705 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
707 if (body->oa.o_valid & OBD_MD_FLGRANT) {
708 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
709 __osc_update_grant(cli, body->oa.o_grant);
714 * grant thread data for shrinking space.
716 struct grant_thread_data {
717 struct list_head gtd_clients;
718 struct mutex gtd_mutex;
719 unsigned long gtd_stopped:1;
721 static struct grant_thread_data client_gtd;
723 static int osc_shrink_grant_interpret(const struct lu_env *env,
724 struct ptlrpc_request *req,
727 struct osc_grant_args *aa = args;
728 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
729 struct ost_body *body;
732 __osc_update_grant(cli, aa->aa_oa->o_grant);
736 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
738 osc_update_grant(cli, body);
740 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
746 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
748 spin_lock(&cli->cl_loi_list_lock);
749 oa->o_grant = cli->cl_avail_grant / 4;
750 cli->cl_avail_grant -= oa->o_grant;
751 spin_unlock(&cli->cl_loi_list_lock);
752 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
753 oa->o_valid |= OBD_MD_FLFLAGS;
756 oa->o_flags |= OBD_FL_SHRINK_GRANT;
757 osc_update_next_shrink(cli);
760 /* Shrink the current grant, either from some large amount to enough for a
761 * full set of in-flight RPCs, or if we have already shrunk to that limit
762 * then to enough for a single RPC. This avoids keeping more grant than
763 * needed, and avoids shrinking the grant piecemeal. */
764 static int osc_shrink_grant(struct client_obd *cli)
766 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
767 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
769 spin_lock(&cli->cl_loi_list_lock);
770 if (cli->cl_avail_grant <= target_bytes)
771 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
772 spin_unlock(&cli->cl_loi_list_lock);
774 return osc_shrink_grant_to_target(cli, target_bytes);
777 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
780 struct ost_body *body;
783 spin_lock(&cli->cl_loi_list_lock);
784 /* Don't shrink if we are already above or below the desired limit
785 * We don't want to shrink below a single RPC, as that will negatively
786 * impact block allocation and long-term performance. */
787 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
788 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
790 if (target_bytes >= cli->cl_avail_grant) {
791 spin_unlock(&cli->cl_loi_list_lock);
794 spin_unlock(&cli->cl_loi_list_lock);
800 osc_announce_cached(cli, &body->oa, 0);
802 spin_lock(&cli->cl_loi_list_lock);
803 if (target_bytes >= cli->cl_avail_grant) {
804 /* available grant has changed since target calculation */
805 spin_unlock(&cli->cl_loi_list_lock);
806 GOTO(out_free, rc = 0);
808 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
809 cli->cl_avail_grant = target_bytes;
810 spin_unlock(&cli->cl_loi_list_lock);
811 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
812 body->oa.o_valid |= OBD_MD_FLFLAGS;
813 body->oa.o_flags = 0;
815 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
816 osc_update_next_shrink(cli);
818 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
819 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
820 sizeof(*body), body, NULL);
822 __osc_update_grant(cli, body->oa.o_grant);
828 static int osc_should_shrink_grant(struct client_obd *client)
830 time64_t next_shrink = client->cl_next_shrink_grant;
832 if (client->cl_import == NULL)
835 if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
836 client->cl_import->imp_grant_shrink_disabled) {
837 osc_update_next_shrink(client);
841 if (ktime_get_seconds() >= next_shrink - 5) {
842 /* Get the current RPC size directly, instead of going via:
843 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
844 * Keep comment here so that it can be found by searching. */
845 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
847 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
848 client->cl_avail_grant > brw_size)
851 osc_update_next_shrink(client);
856 #define GRANT_SHRINK_RPC_BATCH 100
858 static struct delayed_work work;
860 static void osc_grant_work_handler(struct work_struct *data)
862 struct client_obd *cli;
864 bool init_next_shrink = true;
865 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
868 mutex_lock(&client_gtd.gtd_mutex);
869 list_for_each_entry(cli, &client_gtd.gtd_clients,
871 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
872 osc_should_shrink_grant(cli)) {
873 osc_shrink_grant(cli);
877 if (!init_next_shrink) {
878 if (cli->cl_next_shrink_grant < next_shrink &&
879 cli->cl_next_shrink_grant > ktime_get_seconds())
880 next_shrink = cli->cl_next_shrink_grant;
882 init_next_shrink = false;
883 next_shrink = cli->cl_next_shrink_grant;
886 mutex_unlock(&client_gtd.gtd_mutex);
888 if (client_gtd.gtd_stopped == 1)
891 if (next_shrink > ktime_get_seconds()) {
892 time64_t delay = next_shrink - ktime_get_seconds();
894 schedule_delayed_work(&work, cfs_time_seconds(delay));
896 schedule_work(&work.work);
900 void osc_schedule_grant_work(void)
902 cancel_delayed_work_sync(&work);
903 schedule_work(&work.work);
907 * Start grant thread for returing grant to server for idle clients.
909 static int osc_start_grant_work(void)
911 client_gtd.gtd_stopped = 0;
912 mutex_init(&client_gtd.gtd_mutex);
913 INIT_LIST_HEAD(&client_gtd.gtd_clients);
915 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
916 schedule_work(&work.work);
921 static void osc_stop_grant_work(void)
923 client_gtd.gtd_stopped = 1;
924 cancel_delayed_work_sync(&work);
927 static void osc_add_grant_list(struct client_obd *client)
929 mutex_lock(&client_gtd.gtd_mutex);
930 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
931 mutex_unlock(&client_gtd.gtd_mutex);
934 static void osc_del_grant_list(struct client_obd *client)
936 if (list_empty(&client->cl_grant_chain))
939 mutex_lock(&client_gtd.gtd_mutex);
940 list_del_init(&client->cl_grant_chain);
941 mutex_unlock(&client_gtd.gtd_mutex);
944 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
947 * ocd_grant is the total grant amount we're expect to hold: if we've
948 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
949 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
952 * race is tolerable here: if we're evicted, but imp_state already
953 * left EVICTED state, then cl_dirty_pages must be 0 already.
955 spin_lock(&cli->cl_loi_list_lock);
956 cli->cl_avail_grant = ocd->ocd_grant;
957 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
958 cli->cl_avail_grant -= cli->cl_reserved_grant;
959 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
960 cli->cl_avail_grant -= cli->cl_dirty_grant;
962 cli->cl_avail_grant -=
963 cli->cl_dirty_pages << PAGE_SHIFT;
966 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
970 /* overhead for each extent insertion */
971 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
972 /* determine the appropriate chunk size used by osc_extent. */
973 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
974 ocd->ocd_grant_blkbits);
975 /* max_pages_per_rpc must be chunk aligned */
976 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
977 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
978 ~chunk_mask) & chunk_mask;
979 /* determine maximum extent size, in #pages */
980 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
981 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
982 if (cli->cl_max_extent_pages == 0)
983 cli->cl_max_extent_pages = 1;
985 cli->cl_grant_extent_tax = 0;
986 cli->cl_chunkbits = PAGE_SHIFT;
987 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
989 spin_unlock(&cli->cl_loi_list_lock);
992 "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld. chunk bits: %d cl_max_extent_pages: %d\n",
994 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
995 cli->cl_max_extent_pages);
997 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
998 osc_add_grant_list(cli);
1000 EXPORT_SYMBOL(osc_init_grant);
1002 /* We assume that the reason this OSC got a short read is because it read
1003 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1004 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1005 * this stripe never got written at or beyond this stripe offset yet. */
1006 static void handle_short_read(int nob_read, size_t page_count,
1007 struct brw_page **pga)
1012 /* skip bytes read OK */
1013 while (nob_read > 0) {
1014 LASSERT (page_count > 0);
1016 if (pga[i]->count > nob_read) {
1017 /* EOF inside this page */
1018 ptr = kmap(pga[i]->pg) +
1019 (pga[i]->off & ~PAGE_MASK);
1020 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1027 nob_read -= pga[i]->count;
1032 /* zero remaining pages */
1033 while (page_count-- > 0) {
1034 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1035 memset(ptr, 0, pga[i]->count);
1041 static int check_write_rcs(struct ptlrpc_request *req,
1042 int requested_nob, int niocount,
1043 size_t page_count, struct brw_page **pga)
1048 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1049 sizeof(*remote_rcs) *
1051 if (remote_rcs == NULL) {
1052 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1056 /* return error if any niobuf was in error */
1057 for (i = 0; i < niocount; i++) {
1058 if ((int)remote_rcs[i] < 0) {
1059 CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1060 i, remote_rcs[i], req);
1061 return remote_rcs[i];
1064 if (remote_rcs[i] != 0) {
1065 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1066 i, remote_rcs[i], req);
1070 if (req->rq_bulk != NULL &&
1071 req->rq_bulk->bd_nob_transferred != requested_nob) {
1072 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1073 req->rq_bulk->bd_nob_transferred, requested_nob);
1080 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1082 if (p1->flag != p2->flag) {
1083 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1084 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1085 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1087 /* warn if we try to combine flags that we don't know to be
1088 * safe to combine */
1089 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1090 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1091 "report this at https://jira.whamcloud.com/\n",
1092 p1->flag, p2->flag);
1097 return (p1->off + p1->count == p2->off);
1100 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1101 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1102 size_t pg_count, struct brw_page **pga,
1103 int opc, obd_dif_csum_fn *fn,
1107 struct ahash_request *req;
1108 /* Used Adler as the default checksum type on top of DIF tags */
1109 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1110 struct page *__page;
1111 unsigned char *buffer;
1113 unsigned int bufsize;
1115 int used_number = 0;
1121 LASSERT(pg_count > 0);
1123 __page = alloc_page(GFP_KERNEL);
1127 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1130 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1131 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1135 buffer = kmap(__page);
1136 guard_start = (__u16 *)buffer;
1137 guard_number = PAGE_SIZE / sizeof(*guard_start);
1138 while (nob > 0 && pg_count > 0) {
1139 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1141 /* corrupt the data before we compute the checksum, to
1142 * simulate an OST->client data error */
1143 if (unlikely(i == 0 && opc == OST_READ &&
1144 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1145 unsigned char *ptr = kmap(pga[i]->pg);
1146 int off = pga[i]->off & ~PAGE_MASK;
1148 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1153 * The left guard number should be able to hold checksums of a
1156 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1157 pga[i]->off & ~PAGE_MASK,
1159 guard_start + used_number,
1160 guard_number - used_number,
1166 used_number += used;
1167 if (used_number == guard_number) {
1168 cfs_crypto_hash_update_page(req, __page, 0,
1169 used_number * sizeof(*guard_start));
1173 nob -= pga[i]->count;
1181 if (used_number != 0)
1182 cfs_crypto_hash_update_page(req, __page, 0,
1183 used_number * sizeof(*guard_start));
1185 bufsize = sizeof(cksum);
1186 cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1188 /* For sending we only compute the wrong checksum instead
1189 * of corrupting the data so it is still correct on a redo */
1190 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1195 __free_page(__page);
1198 #else /* !CONFIG_CRC_T10DIF */
1199 #define obd_dif_ip_fn NULL
1200 #define obd_dif_crc_fn NULL
1201 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \
1203 #endif /* CONFIG_CRC_T10DIF */
1205 static int osc_checksum_bulk(int nob, size_t pg_count,
1206 struct brw_page **pga, int opc,
1207 enum cksum_types cksum_type,
1211 struct ahash_request *req;
1212 unsigned int bufsize;
1213 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1215 LASSERT(pg_count > 0);
1217 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1219 CERROR("Unable to initialize checksum hash %s\n",
1220 cfs_crypto_hash_name(cfs_alg));
1221 return PTR_ERR(req);
1224 while (nob > 0 && pg_count > 0) {
1225 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1227 /* corrupt the data before we compute the checksum, to
1228 * simulate an OST->client data error */
1229 if (i == 0 && opc == OST_READ &&
1230 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1231 unsigned char *ptr = kmap(pga[i]->pg);
1232 int off = pga[i]->off & ~PAGE_MASK;
1234 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1237 cfs_crypto_hash_update_page(req, pga[i]->pg,
1238 pga[i]->off & ~PAGE_MASK,
1240 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1241 (int)(pga[i]->off & ~PAGE_MASK));
1243 nob -= pga[i]->count;
1248 bufsize = sizeof(*cksum);
1249 cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1251 /* For sending we only compute the wrong checksum instead
1252 * of corrupting the data so it is still correct on a redo */
1253 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1259 static int osc_checksum_bulk_rw(const char *obd_name,
1260 enum cksum_types cksum_type,
1261 int nob, size_t pg_count,
1262 struct brw_page **pga, int opc,
1265 obd_dif_csum_fn *fn = NULL;
1266 int sector_size = 0;
1270 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1273 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1274 opc, fn, sector_size, check_sum);
1276 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1283 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1284 u32 page_count, struct brw_page **pga,
1285 struct ptlrpc_request **reqp, int resend)
1287 struct ptlrpc_request *req;
1288 struct ptlrpc_bulk_desc *desc;
1289 struct ost_body *body;
1290 struct obd_ioobj *ioobj;
1291 struct niobuf_remote *niobuf;
1292 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1293 struct osc_brw_async_args *aa;
1294 struct req_capsule *pill;
1295 struct brw_page *pg_prev;
1297 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1300 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1301 RETURN(-ENOMEM); /* Recoverable */
1302 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1303 RETURN(-EINVAL); /* Fatal */
1305 if ((cmd & OBD_BRW_WRITE) != 0) {
1307 req = ptlrpc_request_alloc_pool(cli->cl_import,
1309 &RQF_OST_BRW_WRITE);
1312 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1317 for (niocount = i = 1; i < page_count; i++) {
1318 if (!can_merge_pages(pga[i - 1], pga[i]))
1322 pill = &req->rq_pill;
1323 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1325 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1326 niocount * sizeof(*niobuf));
1328 for (i = 0; i < page_count; i++)
1329 short_io_size += pga[i]->count;
1331 /* Check if read/write is small enough to be a short io. */
1332 if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1333 !imp_connect_shortio(cli->cl_import))
1336 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1337 opc == OST_READ ? 0 : short_io_size);
1338 if (opc == OST_READ)
1339 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1342 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1344 ptlrpc_request_free(req);
1347 osc_set_io_portal(req);
1349 ptlrpc_at_set_req_timeout(req);
1350 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1352 req->rq_no_retry_einprogress = 1;
1354 if (short_io_size != 0) {
1356 short_io_buf = NULL;
1360 desc = ptlrpc_prep_bulk_imp(req, page_count,
1361 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1362 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1363 PTLRPC_BULK_PUT_SINK),
1365 &ptlrpc_bulk_kiov_pin_ops);
1368 GOTO(out, rc = -ENOMEM);
1369 /* NB request now owns desc and will free it when it gets freed */
1371 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1372 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1373 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1374 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1376 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1378 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1379 * and from_kgid(), because they are asynchronous. Fortunately, variable
1380 * oa contains valid o_uid and o_gid in these two operations.
1381 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1382 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1383 * other process logic */
1384 body->oa.o_uid = oa->o_uid;
1385 body->oa.o_gid = oa->o_gid;
1387 obdo_to_ioobj(oa, ioobj);
1388 ioobj->ioo_bufcnt = niocount;
1389 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1390 * that might be send for this request. The actual number is decided
1391 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1392 * "max - 1" for old client compatibility sending "0", and also so the
1393 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1395 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1397 ioobj_max_brw_set(ioobj, 0);
1399 if (short_io_size != 0) {
1400 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1401 body->oa.o_valid |= OBD_MD_FLFLAGS;
1402 body->oa.o_flags = 0;
1404 body->oa.o_flags |= OBD_FL_SHORT_IO;
1405 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1407 if (opc == OST_WRITE) {
1408 short_io_buf = req_capsule_client_get(pill,
1410 LASSERT(short_io_buf != NULL);
1414 LASSERT(page_count > 0);
1416 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1417 struct brw_page *pg = pga[i];
1418 int poff = pg->off & ~PAGE_MASK;
1420 LASSERT(pg->count > 0);
1421 /* make sure there is no gap in the middle of page array */
1422 LASSERTF(page_count == 1 ||
1423 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1424 ergo(i > 0 && i < page_count - 1,
1425 poff == 0 && pg->count == PAGE_SIZE) &&
1426 ergo(i == page_count - 1, poff == 0)),
1427 "i: %d/%d pg: %p off: %llu, count: %u\n",
1428 i, page_count, pg, pg->off, pg->count);
1429 LASSERTF(i == 0 || pg->off > pg_prev->off,
1430 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1431 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1433 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1434 pg_prev->pg, page_private(pg_prev->pg),
1435 pg_prev->pg->index, pg_prev->off);
1436 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1437 (pg->flag & OBD_BRW_SRVLOCK));
1438 if (short_io_size != 0 && opc == OST_WRITE) {
1439 unsigned char *ptr = kmap_atomic(pg->pg);
1441 LASSERT(short_io_size >= requested_nob + pg->count);
1442 memcpy(short_io_buf + requested_nob,
1446 } else if (short_io_size == 0) {
1447 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1450 requested_nob += pg->count;
1452 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1454 niobuf->rnb_len += pg->count;
1456 niobuf->rnb_offset = pg->off;
1457 niobuf->rnb_len = pg->count;
1458 niobuf->rnb_flags = pg->flag;
1463 LASSERTF((void *)(niobuf - niocount) ==
1464 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1465 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1466 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1468 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1470 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1471 body->oa.o_valid |= OBD_MD_FLFLAGS;
1472 body->oa.o_flags = 0;
1474 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1477 if (osc_should_shrink_grant(cli))
1478 osc_shrink_grant_local(cli, &body->oa);
1480 /* size[REQ_REC_OFF] still sizeof (*body) */
1481 if (opc == OST_WRITE) {
1482 if (cli->cl_checksum &&
1483 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1484 /* store cl_cksum_type in a local variable since
1485 * it can be changed via lprocfs */
1486 enum cksum_types cksum_type = cli->cl_cksum_type;
1488 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1489 body->oa.o_flags = 0;
1491 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1493 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1495 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1496 requested_nob, page_count,
1500 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1504 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1507 /* save this in 'oa', too, for later checking */
1508 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1509 oa->o_flags |= obd_cksum_type_pack(obd_name,
1512 /* clear out the checksum flag, in case this is a
1513 * resend but cl_checksum is no longer set. b=11238 */
1514 oa->o_valid &= ~OBD_MD_FLCKSUM;
1516 oa->o_cksum = body->oa.o_cksum;
1517 /* 1 RC per niobuf */
1518 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1519 sizeof(__u32) * niocount);
1521 if (cli->cl_checksum &&
1522 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1523 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1524 body->oa.o_flags = 0;
1525 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1526 cli->cl_cksum_type);
1527 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1530 /* Client cksum has been already copied to wire obdo in previous
1531 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1532 * resent due to cksum error, this will allow Server to
1533 * check+dump pages on its side */
1535 ptlrpc_request_set_replen(req);
1537 aa = ptlrpc_req_async_args(aa, req);
1539 aa->aa_requested_nob = requested_nob;
1540 aa->aa_nio_count = niocount;
1541 aa->aa_page_count = page_count;
1545 INIT_LIST_HEAD(&aa->aa_oaps);
1548 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1549 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1550 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1551 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1555 ptlrpc_req_finished(req);
1559 char dbgcksum_file_name[PATH_MAX];
1561 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1562 struct brw_page **pga, __u32 server_cksum,
1570 /* will only keep dump of pages on first error for the same range in
1571 * file/fid, not during the resends/retries. */
1572 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1573 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1574 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1575 libcfs_debug_file_path_arr :
1576 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1577 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1578 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1579 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1581 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1582 client_cksum, server_cksum);
1583 filp = filp_open(dbgcksum_file_name,
1584 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1588 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1589 "checksum error: rc = %d\n", dbgcksum_file_name,
1592 CERROR("%s: can't open to dump pages with checksum "
1593 "error: rc = %d\n", dbgcksum_file_name, rc);
1597 for (i = 0; i < page_count; i++) {
1598 len = pga[i]->count;
1599 buf = kmap(pga[i]->pg);
1601 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1603 CERROR("%s: wanted to write %u but got %d "
1604 "error\n", dbgcksum_file_name, len, rc);
1609 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1610 dbgcksum_file_name, rc);
1615 rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1617 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1618 filp_close(filp, NULL);
1622 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1623 __u32 client_cksum, __u32 server_cksum,
1624 struct osc_brw_async_args *aa)
1626 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1627 enum cksum_types cksum_type;
1628 obd_dif_csum_fn *fn = NULL;
1629 int sector_size = 0;
1634 if (server_cksum == client_cksum) {
1635 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1639 if (aa->aa_cli->cl_checksum_dump)
1640 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1641 server_cksum, client_cksum);
1643 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1646 switch (cksum_type) {
1647 case OBD_CKSUM_T10IP512:
1651 case OBD_CKSUM_T10IP4K:
1655 case OBD_CKSUM_T10CRC512:
1656 fn = obd_dif_crc_fn;
1659 case OBD_CKSUM_T10CRC4K:
1660 fn = obd_dif_crc_fn;
1668 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1669 aa->aa_page_count, aa->aa_ppga,
1670 OST_WRITE, fn, sector_size,
1673 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1674 aa->aa_ppga, OST_WRITE, cksum_type,
1678 msg = "failed to calculate the client write checksum";
1679 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1680 msg = "the server did not use the checksum type specified in "
1681 "the original request - likely a protocol problem";
1682 else if (new_cksum == server_cksum)
1683 msg = "changed on the client after we checksummed it - "
1684 "likely false positive due to mmap IO (bug 11742)";
1685 else if (new_cksum == client_cksum)
1686 msg = "changed in transit before arrival at OST";
1688 msg = "changed in transit AND doesn't match the original - "
1689 "likely false positive due to mmap IO (bug 11742)";
1691 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1692 DFID " object "DOSTID" extent [%llu-%llu], original "
1693 "client csum %x (type %x), server csum %x (type %x),"
1694 " client csum now %x\n",
1695 obd_name, msg, libcfs_nid2str(peer->nid),
1696 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1697 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1698 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1699 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1700 aa->aa_ppga[aa->aa_page_count - 1]->off +
1701 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1703 obd_cksum_type_unpack(aa->aa_oa->o_flags),
1704 server_cksum, cksum_type, new_cksum);
1708 /* Note rc enters this function as number of bytes transferred */
1709 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1711 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1712 struct client_obd *cli = aa->aa_cli;
1713 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1714 const struct lnet_process_id *peer =
1715 &req->rq_import->imp_connection->c_peer;
1716 struct ost_body *body;
1717 u32 client_cksum = 0;
1721 if (rc < 0 && rc != -EDQUOT) {
1722 DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
1726 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1727 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1729 DEBUG_REQ(D_INFO, req, "cannot unpack body");
1733 /* set/clear over quota flag for a uid/gid/projid */
1734 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1735 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1736 unsigned qid[LL_MAXQUOTAS] = {
1737 body->oa.o_uid, body->oa.o_gid,
1738 body->oa.o_projid };
1740 "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1741 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1742 body->oa.o_valid, body->oa.o_flags);
1743 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1747 osc_update_grant(cli, body);
1752 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1753 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1755 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1757 CERROR("%s: unexpected positive size %d\n",
1762 if (req->rq_bulk != NULL &&
1763 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1766 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1767 check_write_checksum(&body->oa, peer, client_cksum,
1768 body->oa.o_cksum, aa))
1771 rc = check_write_rcs(req, aa->aa_requested_nob,
1772 aa->aa_nio_count, aa->aa_page_count,
1777 /* The rest of this function executes only for OST_READs */
1779 if (req->rq_bulk == NULL) {
1780 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1782 LASSERT(rc == req->rq_status);
1784 /* if unwrap_bulk failed, return -EAGAIN to retry */
1785 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1788 GOTO(out, rc = -EAGAIN);
1790 if (rc > aa->aa_requested_nob) {
1791 CERROR("%s: unexpected size %d, requested %d\n", obd_name,
1792 rc, aa->aa_requested_nob);
1796 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1797 CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
1798 rc, req->rq_bulk->bd_nob_transferred);
1802 if (req->rq_bulk == NULL) {
1804 int nob, pg_count, i = 0;
1807 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1808 pg_count = aa->aa_page_count;
1809 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1812 while (nob > 0 && pg_count > 0) {
1814 int count = aa->aa_ppga[i]->count > nob ?
1815 nob : aa->aa_ppga[i]->count;
1817 CDEBUG(D_CACHE, "page %p count %d\n",
1818 aa->aa_ppga[i]->pg, count);
1819 ptr = kmap_atomic(aa->aa_ppga[i]->pg);
1820 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1822 kunmap_atomic((void *) ptr);
1831 if (rc < aa->aa_requested_nob)
1832 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1834 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1835 static int cksum_counter;
1836 u32 server_cksum = body->oa.o_cksum;
1839 enum cksum_types cksum_type;
1840 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1841 body->oa.o_flags : 0;
1843 cksum_type = obd_cksum_type_unpack(o_flags);
1844 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1845 aa->aa_page_count, aa->aa_ppga,
1846 OST_READ, &client_cksum);
1850 if (req->rq_bulk != NULL &&
1851 peer->nid != req->rq_bulk->bd_sender) {
1853 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1856 if (server_cksum != client_cksum) {
1857 struct ost_body *clbody;
1858 u32 page_count = aa->aa_page_count;
1860 clbody = req_capsule_client_get(&req->rq_pill,
1862 if (cli->cl_checksum_dump)
1863 dump_all_bulk_pages(&clbody->oa, page_count,
1864 aa->aa_ppga, server_cksum,
1867 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1868 "%s%s%s inode "DFID" object "DOSTID
1869 " extent [%llu-%llu], client %x, "
1870 "server %x, cksum_type %x\n",
1872 libcfs_nid2str(peer->nid),
1874 clbody->oa.o_valid & OBD_MD_FLFID ?
1875 clbody->oa.o_parent_seq : 0ULL,
1876 clbody->oa.o_valid & OBD_MD_FLFID ?
1877 clbody->oa.o_parent_oid : 0,
1878 clbody->oa.o_valid & OBD_MD_FLFID ?
1879 clbody->oa.o_parent_ver : 0,
1880 POSTID(&body->oa.o_oi),
1881 aa->aa_ppga[0]->off,
1882 aa->aa_ppga[page_count-1]->off +
1883 aa->aa_ppga[page_count-1]->count - 1,
1884 client_cksum, server_cksum,
1887 aa->aa_oa->o_cksum = client_cksum;
1891 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1894 } else if (unlikely(client_cksum)) {
1895 static int cksum_missed;
1898 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1899 CERROR("%s: checksum %u requested from %s but not sent\n",
1900 obd_name, cksum_missed,
1901 libcfs_nid2str(peer->nid));
1907 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1908 aa->aa_oa, &body->oa);
1913 static int osc_brw_redo_request(struct ptlrpc_request *request,
1914 struct osc_brw_async_args *aa, int rc)
1916 struct ptlrpc_request *new_req;
1917 struct osc_brw_async_args *new_aa;
1918 struct osc_async_page *oap;
1921 /* The below message is checked in replay-ost-single.sh test_8ae*/
1922 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1923 "redo for recoverable error %d", rc);
1925 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1926 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1927 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1928 aa->aa_ppga, &new_req, 1);
1932 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1933 if (oap->oap_request != NULL) {
1934 LASSERTF(request == oap->oap_request,
1935 "request %p != oap_request %p\n",
1936 request, oap->oap_request);
1940 * New request takes over pga and oaps from old request.
1941 * Note that copying a list_head doesn't work, need to move it...
1944 new_req->rq_interpret_reply = request->rq_interpret_reply;
1945 new_req->rq_async_args = request->rq_async_args;
1946 new_req->rq_commit_cb = request->rq_commit_cb;
1947 /* cap resend delay to the current request timeout, this is similar to
1948 * what ptlrpc does (see after_reply()) */
1949 if (aa->aa_resends > new_req->rq_timeout)
1950 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1952 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1953 new_req->rq_generation_set = 1;
1954 new_req->rq_import_generation = request->rq_import_generation;
1956 new_aa = ptlrpc_req_async_args(new_aa, new_req);
1958 INIT_LIST_HEAD(&new_aa->aa_oaps);
1959 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1960 INIT_LIST_HEAD(&new_aa->aa_exts);
1961 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1962 new_aa->aa_resends = aa->aa_resends;
1964 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1965 if (oap->oap_request) {
1966 ptlrpc_req_finished(oap->oap_request);
1967 oap->oap_request = ptlrpc_request_addref(new_req);
1971 /* XXX: This code will run into problem if we're going to support
1972 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1973 * and wait for all of them to be finished. We should inherit request
1974 * set from old request. */
1975 ptlrpcd_add_req(new_req);
1977 DEBUG_REQ(D_INFO, new_req, "new request");
1982 * ugh, we want disk allocation on the target to happen in offset order. we'll
1983 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1984 * fine for our small page arrays and doesn't require allocation. its an
1985 * insertion sort that swaps elements that are strides apart, shrinking the
1986 * stride down until its '1' and the array is sorted.
1988 static void sort_brw_pages(struct brw_page **array, int num)
1991 struct brw_page *tmp;
1995 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2000 for (i = stride ; i < num ; i++) {
2003 while (j >= stride && array[j - stride]->off > tmp->off) {
2004 array[j] = array[j - stride];
2009 } while (stride > 1);
2012 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2014 LASSERT(ppga != NULL);
2015 OBD_FREE(ppga, sizeof(*ppga) * count);
2018 static int brw_interpret(const struct lu_env *env,
2019 struct ptlrpc_request *req, void *args, int rc)
2021 struct osc_brw_async_args *aa = args;
2022 struct osc_extent *ext;
2023 struct osc_extent *tmp;
2024 struct client_obd *cli = aa->aa_cli;
2025 unsigned long transferred = 0;
2029 rc = osc_brw_fini_request(req, rc);
2030 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2032 * When server returns -EINPROGRESS, client should always retry
2033 * regardless of the number of times the bulk was resent already.
2035 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2036 if (req->rq_import_generation !=
2037 req->rq_import->imp_generation) {
2038 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2039 ""DOSTID", rc = %d.\n",
2040 req->rq_import->imp_obd->obd_name,
2041 POSTID(&aa->aa_oa->o_oi), rc);
2042 } else if (rc == -EINPROGRESS ||
2043 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2044 rc = osc_brw_redo_request(req, aa, rc);
2046 CERROR("%s: too many resent retries for object: "
2047 "%llu:%llu, rc = %d.\n",
2048 req->rq_import->imp_obd->obd_name,
2049 POSTID(&aa->aa_oa->o_oi), rc);
2054 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2059 struct obdo *oa = aa->aa_oa;
2060 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2061 unsigned long valid = 0;
2062 struct cl_object *obj;
2063 struct osc_async_page *last;
2065 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2066 obj = osc2cl(last->oap_obj);
2068 cl_object_attr_lock(obj);
2069 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2070 attr->cat_blocks = oa->o_blocks;
2071 valid |= CAT_BLOCKS;
2073 if (oa->o_valid & OBD_MD_FLMTIME) {
2074 attr->cat_mtime = oa->o_mtime;
2077 if (oa->o_valid & OBD_MD_FLATIME) {
2078 attr->cat_atime = oa->o_atime;
2081 if (oa->o_valid & OBD_MD_FLCTIME) {
2082 attr->cat_ctime = oa->o_ctime;
2086 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2087 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2088 loff_t last_off = last->oap_count + last->oap_obj_off +
2091 /* Change file size if this is an out of quota or
2092 * direct IO write and it extends the file size */
2093 if (loi->loi_lvb.lvb_size < last_off) {
2094 attr->cat_size = last_off;
2097 /* Extend KMS if it's not a lockless write */
2098 if (loi->loi_kms < last_off &&
2099 oap2osc_page(last)->ops_srvlock == 0) {
2100 attr->cat_kms = last_off;
2106 cl_object_attr_update(env, obj, attr, valid);
2107 cl_object_attr_unlock(obj);
2109 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2112 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2113 osc_inc_unstable_pages(req);
2115 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2116 list_del_init(&ext->oe_link);
2117 osc_extent_finish(env, ext, 1,
2118 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2120 LASSERT(list_empty(&aa->aa_exts));
2121 LASSERT(list_empty(&aa->aa_oaps));
2123 transferred = (req->rq_bulk == NULL ? /* short io */
2124 aa->aa_requested_nob :
2125 req->rq_bulk->bd_nob_transferred);
2127 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2128 ptlrpc_lprocfs_brw(req, transferred);
2130 spin_lock(&cli->cl_loi_list_lock);
2131 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2132 * is called so we know whether to go to sync BRWs or wait for more
2133 * RPCs to complete */
2134 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2135 cli->cl_w_in_flight--;
2137 cli->cl_r_in_flight--;
2138 osc_wake_cache_waiters(cli);
2139 spin_unlock(&cli->cl_loi_list_lock);
2141 osc_io_unplug(env, cli, NULL);
2145 static void brw_commit(struct ptlrpc_request *req)
2147 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2148 * this called via the rq_commit_cb, I need to ensure
2149 * osc_dec_unstable_pages is still called. Otherwise unstable
2150 * pages may be leaked. */
2151 spin_lock(&req->rq_lock);
2152 if (likely(req->rq_unstable)) {
2153 req->rq_unstable = 0;
2154 spin_unlock(&req->rq_lock);
2156 osc_dec_unstable_pages(req);
2158 req->rq_committed = 1;
2159 spin_unlock(&req->rq_lock);
2164 * Build an RPC by the list of extent @ext_list. The caller must ensure
2165 * that the total pages in this list are NOT over max pages per RPC.
2166 * Extents in the list must be in OES_RPC state.
2168 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2169 struct list_head *ext_list, int cmd)
2171 struct ptlrpc_request *req = NULL;
2172 struct osc_extent *ext;
2173 struct brw_page **pga = NULL;
2174 struct osc_brw_async_args *aa = NULL;
2175 struct obdo *oa = NULL;
2176 struct osc_async_page *oap;
2177 struct osc_object *obj = NULL;
2178 struct cl_req_attr *crattr = NULL;
2179 loff_t starting_offset = OBD_OBJECT_EOF;
2180 loff_t ending_offset = 0;
2184 bool soft_sync = false;
2185 bool ndelay = false;
2189 __u32 layout_version = 0;
2190 LIST_HEAD(rpc_list);
2191 struct ost_body *body;
2193 LASSERT(!list_empty(ext_list));
2195 /* add pages into rpc_list to build BRW rpc */
2196 list_for_each_entry(ext, ext_list, oe_link) {
2197 LASSERT(ext->oe_state == OES_RPC);
2198 mem_tight |= ext->oe_memalloc;
2199 grant += ext->oe_grants;
2200 page_count += ext->oe_nr_pages;
2201 layout_version = max(layout_version, ext->oe_layout_version);
2206 soft_sync = osc_over_unstable_soft_limit(cli);
2208 mpflag = cfs_memory_pressure_get_and_set();
2210 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2212 GOTO(out, rc = -ENOMEM);
2214 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2216 GOTO(out, rc = -ENOMEM);
2219 list_for_each_entry(ext, ext_list, oe_link) {
2220 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2222 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2224 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2225 pga[i] = &oap->oap_brw_page;
2226 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2229 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2230 if (starting_offset == OBD_OBJECT_EOF ||
2231 starting_offset > oap->oap_obj_off)
2232 starting_offset = oap->oap_obj_off;
2234 LASSERT(oap->oap_page_off == 0);
2235 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2236 ending_offset = oap->oap_obj_off +
2239 LASSERT(oap->oap_page_off + oap->oap_count ==
2246 /* first page in the list */
2247 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2249 crattr = &osc_env_info(env)->oti_req_attr;
2250 memset(crattr, 0, sizeof(*crattr));
2251 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2252 crattr->cra_flags = ~0ULL;
2253 crattr->cra_page = oap2cl_page(oap);
2254 crattr->cra_oa = oa;
2255 cl_req_attr_set(env, osc2cl(obj), crattr);
2257 if (cmd == OBD_BRW_WRITE) {
2258 oa->o_grant_used = grant;
2259 if (layout_version > 0) {
2260 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2261 PFID(&oa->o_oi.oi_fid), layout_version);
2263 oa->o_layout_version = layout_version;
2264 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2268 sort_brw_pages(pga, page_count);
2269 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2271 CERROR("prep_req failed: %d\n", rc);
2275 req->rq_commit_cb = brw_commit;
2276 req->rq_interpret_reply = brw_interpret;
2277 req->rq_memalloc = mem_tight != 0;
2278 oap->oap_request = ptlrpc_request_addref(req);
2280 req->rq_no_resend = req->rq_no_delay = 1;
2281 /* probably set a shorter timeout value.
2282 * to handle ETIMEDOUT in brw_interpret() correctly. */
2283 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2286 /* Need to update the timestamps after the request is built in case
2287 * we race with setattr (locally or in queue at OST). If OST gets
2288 * later setattr before earlier BRW (as determined by the request xid),
2289 * the OST will not use BRW timestamps. Sadly, there is no obvious
2290 * way to do this in a single call. bug 10150 */
2291 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2292 crattr->cra_oa = &body->oa;
2293 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2294 cl_req_attr_set(env, osc2cl(obj), crattr);
2295 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2297 aa = ptlrpc_req_async_args(aa, req);
2298 INIT_LIST_HEAD(&aa->aa_oaps);
2299 list_splice_init(&rpc_list, &aa->aa_oaps);
2300 INIT_LIST_HEAD(&aa->aa_exts);
2301 list_splice_init(ext_list, &aa->aa_exts);
2303 spin_lock(&cli->cl_loi_list_lock);
2304 starting_offset >>= PAGE_SHIFT;
2305 if (cmd == OBD_BRW_READ) {
2306 cli->cl_r_in_flight++;
2307 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2308 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2309 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2310 starting_offset + 1);
2312 cli->cl_w_in_flight++;
2313 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2314 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2315 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2316 starting_offset + 1);
2318 spin_unlock(&cli->cl_loi_list_lock);
2320 DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
2321 page_count, aa, cli->cl_r_in_flight,
2322 cli->cl_w_in_flight);
2323 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2325 ptlrpcd_add_req(req);
2331 cfs_memory_pressure_restore(mpflag);
2334 LASSERT(req == NULL);
2337 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2339 OBD_FREE(pga, sizeof(*pga) * page_count);
2340 /* this should happen rarely and is pretty bad, it makes the
2341 * pending list not follow the dirty order */
2342 while (!list_empty(ext_list)) {
2343 ext = list_entry(ext_list->next, struct osc_extent,
2345 list_del_init(&ext->oe_link);
2346 osc_extent_finish(env, ext, 0, rc);
2352 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2356 LASSERT(lock != NULL);
2358 lock_res_and_lock(lock);
2360 if (lock->l_ast_data == NULL)
2361 lock->l_ast_data = data;
2362 if (lock->l_ast_data == data)
2365 unlock_res_and_lock(lock);
2370 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2371 void *cookie, struct lustre_handle *lockh,
2372 enum ldlm_mode mode, __u64 *flags, bool speculative,
2375 bool intent = *flags & LDLM_FL_HAS_INTENT;
2379 /* The request was created before ldlm_cli_enqueue call. */
2380 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2381 struct ldlm_reply *rep;
2383 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2384 LASSERT(rep != NULL);
2386 rep->lock_policy_res1 =
2387 ptlrpc_status_ntoh(rep->lock_policy_res1);
2388 if (rep->lock_policy_res1)
2389 errcode = rep->lock_policy_res1;
2391 *flags |= LDLM_FL_LVB_READY;
2392 } else if (errcode == ELDLM_OK) {
2393 *flags |= LDLM_FL_LVB_READY;
2396 /* Call the update callback. */
2397 rc = (*upcall)(cookie, lockh, errcode);
2399 /* release the reference taken in ldlm_cli_enqueue() */
2400 if (errcode == ELDLM_LOCK_MATCHED)
2402 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2403 ldlm_lock_decref(lockh, mode);
2408 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2411 struct osc_enqueue_args *aa = args;
2412 struct ldlm_lock *lock;
2413 struct lustre_handle *lockh = &aa->oa_lockh;
2414 enum ldlm_mode mode = aa->oa_mode;
2415 struct ost_lvb *lvb = aa->oa_lvb;
2416 __u32 lvb_len = sizeof(*lvb);
2421 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2423 lock = ldlm_handle2lock(lockh);
2424 LASSERTF(lock != NULL,
2425 "lockh %#llx, req %p, aa %p - client evicted?\n",
2426 lockh->cookie, req, aa);
2428 /* Take an additional reference so that a blocking AST that
2429 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2430 * to arrive after an upcall has been executed by
2431 * osc_enqueue_fini(). */
2432 ldlm_lock_addref(lockh, mode);
2434 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2435 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2437 /* Let CP AST to grant the lock first. */
2438 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2440 if (aa->oa_speculative) {
2441 LASSERT(aa->oa_lvb == NULL);
2442 LASSERT(aa->oa_flags == NULL);
2443 aa->oa_flags = &flags;
2446 /* Complete obtaining the lock procedure. */
2447 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2448 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2450 /* Complete osc stuff. */
2451 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2452 aa->oa_flags, aa->oa_speculative, rc);
2454 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2456 ldlm_lock_decref(lockh, mode);
2457 LDLM_LOCK_PUT(lock);
2461 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2462 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2463 * other synchronous requests, however keeping some locks and trying to obtain
2464 * others may take a considerable amount of time in a case of ost failure; and
2465 * when other sync requests do not get released lock from a client, the client
2466 * is evicted from the cluster -- such scenarious make the life difficult, so
2467 * release locks just after they are obtained. */
2468 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2469 __u64 *flags, union ldlm_policy_data *policy,
2470 struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
2471 void *cookie, struct ldlm_enqueue_info *einfo,
2472 struct ptlrpc_request_set *rqset, int async,
2475 struct obd_device *obd = exp->exp_obd;
2476 struct lustre_handle lockh = { 0 };
2477 struct ptlrpc_request *req = NULL;
2478 int intent = *flags & LDLM_FL_HAS_INTENT;
2479 __u64 match_flags = *flags;
2480 enum ldlm_mode mode;
2484 /* Filesystem lock extents are extended to page boundaries so that
2485 * dealing with the page cache is a little smoother. */
2486 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2487 policy->l_extent.end |= ~PAGE_MASK;
2489 /* Next, search for already existing extent locks that will cover us */
2490 /* If we're trying to read, we also search for an existing PW lock. The
2491 * VFS and page cache already protect us locally, so lots of readers/
2492 * writers can share a single PW lock.
2494 * There are problems with conversion deadlocks, so instead of
2495 * converting a read lock to a write lock, we'll just enqueue a new
2498 * At some point we should cancel the read lock instead of making them
2499 * send us a blocking callback, but there are problems with canceling
2500 * locks out from other users right now, too. */
2501 mode = einfo->ei_mode;
2502 if (einfo->ei_mode == LCK_PR)
2504 /* Normal lock requests must wait for the LVB to be ready before
2505 * matching a lock; speculative lock requests do not need to,
2506 * because they will not actually use the lock. */
2508 match_flags |= LDLM_FL_LVB_READY;
2510 match_flags |= LDLM_FL_BLOCK_GRANTED;
2511 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2512 einfo->ei_type, policy, mode, &lockh, 0);
2514 struct ldlm_lock *matched;
2516 if (*flags & LDLM_FL_TEST_LOCK)
2519 matched = ldlm_handle2lock(&lockh);
2521 /* This DLM lock request is speculative, and does not
2522 * have an associated IO request. Therefore if there
2523 * is already a DLM lock, it wll just inform the
2524 * caller to cancel the request for this stripe.*/
2525 lock_res_and_lock(matched);
2526 if (ldlm_extent_equal(&policy->l_extent,
2527 &matched->l_policy_data.l_extent))
2531 unlock_res_and_lock(matched);
2533 ldlm_lock_decref(&lockh, mode);
2534 LDLM_LOCK_PUT(matched);
2536 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2537 *flags |= LDLM_FL_LVB_READY;
2539 /* We already have a lock, and it's referenced. */
2540 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2542 ldlm_lock_decref(&lockh, mode);
2543 LDLM_LOCK_PUT(matched);
2546 ldlm_lock_decref(&lockh, mode);
2547 LDLM_LOCK_PUT(matched);
2551 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2555 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2556 &RQF_LDLM_ENQUEUE_LVB);
2560 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2562 ptlrpc_request_free(req);
2566 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2568 ptlrpc_request_set_replen(req);
2571 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2572 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2574 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2575 sizeof(*lvb), LVB_T_OST, &lockh, async);
2578 struct osc_enqueue_args *aa;
2579 aa = ptlrpc_req_async_args(aa, req);
2581 aa->oa_mode = einfo->ei_mode;
2582 aa->oa_type = einfo->ei_type;
2583 lustre_handle_copy(&aa->oa_lockh, &lockh);
2584 aa->oa_upcall = upcall;
2585 aa->oa_cookie = cookie;
2586 aa->oa_speculative = speculative;
2588 aa->oa_flags = flags;
2591 /* speculative locks are essentially to enqueue
2592 * a DLM lock in advance, so we don't care
2593 * about the result of the enqueue. */
2595 aa->oa_flags = NULL;
2598 req->rq_interpret_reply = osc_enqueue_interpret;
2599 ptlrpc_set_add_req(rqset, req);
2600 } else if (intent) {
2601 ptlrpc_req_finished(req);
2606 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2607 flags, speculative, rc);
2609 ptlrpc_req_finished(req);
2614 int osc_match_base(const struct lu_env *env, struct obd_export *exp,
2615 struct ldlm_res_id *res_id, enum ldlm_type type,
2616 union ldlm_policy_data *policy, enum ldlm_mode mode,
2617 __u64 *flags, struct osc_object *obj,
2618 struct lustre_handle *lockh, int unref)
2620 struct obd_device *obd = exp->exp_obd;
2621 __u64 lflags = *flags;
2625 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2628 /* Filesystem lock extents are extended to page boundaries so that
2629 * dealing with the page cache is a little smoother */
2630 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2631 policy->l_extent.end |= ~PAGE_MASK;
2633 /* Next, search for already existing extent locks that will cover us */
2634 /* If we're trying to read, we also search for an existing PW lock. The
2635 * VFS and page cache already protect us locally, so lots of readers/
2636 * writers can share a single PW lock. */
2640 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2641 res_id, type, policy, rc, lockh, unref);
2642 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2646 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2648 LASSERT(lock != NULL);
2649 if (osc_set_lock_data(lock, obj)) {
2650 lock_res_and_lock(lock);
2651 if (!ldlm_is_lvb_cached(lock)) {
2652 LASSERT(lock->l_ast_data == obj);
2653 osc_lock_lvb_update(env, obj, lock, NULL);
2654 ldlm_set_lvb_cached(lock);
2656 unlock_res_and_lock(lock);
2658 ldlm_lock_decref(lockh, rc);
2661 LDLM_LOCK_PUT(lock);
2666 static int osc_statfs_interpret(const struct lu_env *env,
2667 struct ptlrpc_request *req, void *args, int rc)
2669 struct osc_async_args *aa = args;
2670 struct obd_statfs *msfs;
2675 * The request has in fact never been sent due to issues at
2676 * a higher level (LOV). Exit immediately since the caller
2677 * is aware of the problem and takes care of the clean up.
2681 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2682 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2688 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2690 GOTO(out, rc = -EPROTO);
2692 *aa->aa_oi->oi_osfs = *msfs;
2694 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2699 static int osc_statfs_async(struct obd_export *exp,
2700 struct obd_info *oinfo, time64_t max_age,
2701 struct ptlrpc_request_set *rqset)
2703 struct obd_device *obd = class_exp2obd(exp);
2704 struct ptlrpc_request *req;
2705 struct osc_async_args *aa;
2709 if (obd->obd_osfs_age >= max_age) {
2711 "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2712 obd->obd_name, &obd->obd_osfs,
2713 obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2714 obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2715 spin_lock(&obd->obd_osfs_lock);
2716 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2717 spin_unlock(&obd->obd_osfs_lock);
2718 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2719 if (oinfo->oi_cb_up)
2720 oinfo->oi_cb_up(oinfo, 0);
2725 /* We could possibly pass max_age in the request (as an absolute
2726 * timestamp or a "seconds.usec ago") so the target can avoid doing
2727 * extra calls into the filesystem if that isn't necessary (e.g.
2728 * during mount that would help a bit). Having relative timestamps
2729 * is not so great if request processing is slow, while absolute
2730 * timestamps are not ideal because they need time synchronization. */
2731 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2735 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2737 ptlrpc_request_free(req);
2740 ptlrpc_request_set_replen(req);
2741 req->rq_request_portal = OST_CREATE_PORTAL;
2742 ptlrpc_at_set_req_timeout(req);
2744 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2745 /* procfs requests not want stat in wait for avoid deadlock */
2746 req->rq_no_resend = 1;
2747 req->rq_no_delay = 1;
2750 req->rq_interpret_reply = osc_statfs_interpret;
2751 aa = ptlrpc_req_async_args(aa, req);
2754 ptlrpc_set_add_req(rqset, req);
2758 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2759 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2761 struct obd_device *obd = class_exp2obd(exp);
2762 struct obd_statfs *msfs;
2763 struct ptlrpc_request *req;
2764 struct obd_import *imp = NULL;
2769 /*Since the request might also come from lprocfs, so we need
2770 *sync this with client_disconnect_export Bug15684*/
2771 down_read(&obd->u.cli.cl_sem);
2772 if (obd->u.cli.cl_import)
2773 imp = class_import_get(obd->u.cli.cl_import);
2774 up_read(&obd->u.cli.cl_sem);
2778 /* We could possibly pass max_age in the request (as an absolute
2779 * timestamp or a "seconds.usec ago") so the target can avoid doing
2780 * extra calls into the filesystem if that isn't necessary (e.g.
2781 * during mount that would help a bit). Having relative timestamps
2782 * is not so great if request processing is slow, while absolute
2783 * timestamps are not ideal because they need time synchronization. */
2784 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2786 class_import_put(imp);
2791 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2793 ptlrpc_request_free(req);
2796 ptlrpc_request_set_replen(req);
2797 req->rq_request_portal = OST_CREATE_PORTAL;
2798 ptlrpc_at_set_req_timeout(req);
2800 if (flags & OBD_STATFS_NODELAY) {
2801 /* procfs requests not want stat in wait for avoid deadlock */
2802 req->rq_no_resend = 1;
2803 req->rq_no_delay = 1;
2806 rc = ptlrpc_queue_wait(req);
2810 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2812 GOTO(out, rc = -EPROTO);
2818 ptlrpc_req_finished(req);
2822 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2823 void *karg, void __user *uarg)
2825 struct obd_device *obd = exp->exp_obd;
2826 struct obd_ioctl_data *data = karg;
2830 if (!try_module_get(THIS_MODULE)) {
2831 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2832 module_name(THIS_MODULE));
2836 case OBD_IOC_CLIENT_RECOVER:
2837 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2838 data->ioc_inlbuf1, 0);
2842 case IOC_OSC_SET_ACTIVE:
2843 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2848 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2849 obd->obd_name, cmd, current_comm(), rc);
2853 module_put(THIS_MODULE);
2857 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2858 u32 keylen, void *key, u32 vallen, void *val,
2859 struct ptlrpc_request_set *set)
2861 struct ptlrpc_request *req;
2862 struct obd_device *obd = exp->exp_obd;
2863 struct obd_import *imp = class_exp2cliimp(exp);
2868 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2870 if (KEY_IS(KEY_CHECKSUM)) {
2871 if (vallen != sizeof(int))
2873 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2877 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2878 sptlrpc_conf_client_adapt(obd);
2882 if (KEY_IS(KEY_FLUSH_CTX)) {
2883 sptlrpc_import_flush_my_ctx(imp);
2887 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2888 struct client_obd *cli = &obd->u.cli;
2889 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2890 long target = *(long *)val;
2892 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2897 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2900 /* We pass all other commands directly to OST. Since nobody calls osc
2901 methods directly and everybody is supposed to go through LOV, we
2902 assume lov checked invalid values for us.
2903 The only recognised values so far are evict_by_nid and mds_conn.
2904 Even if something bad goes through, we'd get a -EINVAL from OST
2907 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2908 &RQF_OST_SET_GRANT_INFO :
2913 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2914 RCL_CLIENT, keylen);
2915 if (!KEY_IS(KEY_GRANT_SHRINK))
2916 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2917 RCL_CLIENT, vallen);
2918 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2920 ptlrpc_request_free(req);
2924 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2925 memcpy(tmp, key, keylen);
2926 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2929 memcpy(tmp, val, vallen);
2931 if (KEY_IS(KEY_GRANT_SHRINK)) {
2932 struct osc_grant_args *aa;
2935 aa = ptlrpc_req_async_args(aa, req);
2936 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2938 ptlrpc_req_finished(req);
2941 *oa = ((struct ost_body *)val)->oa;
2943 req->rq_interpret_reply = osc_shrink_grant_interpret;
2946 ptlrpc_request_set_replen(req);
2947 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2948 LASSERT(set != NULL);
2949 ptlrpc_set_add_req(set, req);
2950 ptlrpc_check_set(NULL, set);
2952 ptlrpcd_add_req(req);
2957 EXPORT_SYMBOL(osc_set_info_async);
2959 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2960 struct obd_device *obd, struct obd_uuid *cluuid,
2961 struct obd_connect_data *data, void *localdata)
2963 struct client_obd *cli = &obd->u.cli;
2965 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2969 spin_lock(&cli->cl_loi_list_lock);
2970 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2971 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2972 /* restore ocd_grant_blkbits as client page bits */
2973 data->ocd_grant_blkbits = PAGE_SHIFT;
2974 grant += cli->cl_dirty_grant;
2976 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2978 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2979 lost_grant = cli->cl_lost_grant;
2980 cli->cl_lost_grant = 0;
2981 spin_unlock(&cli->cl_loi_list_lock);
2983 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2984 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2985 data->ocd_version, data->ocd_grant, lost_grant);
2990 EXPORT_SYMBOL(osc_reconnect);
2992 int osc_disconnect(struct obd_export *exp)
2994 struct obd_device *obd = class_exp2obd(exp);
2997 rc = client_disconnect_export(exp);
2999 * Initially we put del_shrink_grant before disconnect_export, but it
3000 * causes the following problem if setup (connect) and cleanup
3001 * (disconnect) are tangled together.
3002 * connect p1 disconnect p2
3003 * ptlrpc_connect_import
3004 * ............... class_manual_cleanup
3007 * ptlrpc_connect_interrupt
3009 * add this client to shrink list
3011 * Bang! grant shrink thread trigger the shrink. BUG18662
3013 osc_del_grant_list(&obd->u.cli);
3016 EXPORT_SYMBOL(osc_disconnect);
3018 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3019 struct hlist_node *hnode, void *arg)
3021 struct lu_env *env = arg;
3022 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3023 struct ldlm_lock *lock;
3024 struct osc_object *osc = NULL;
3028 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3029 if (lock->l_ast_data != NULL && osc == NULL) {
3030 osc = lock->l_ast_data;
3031 cl_object_get(osc2cl(osc));
3034 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3035 * by the 2nd round of ldlm_namespace_clean() call in
3036 * osc_import_event(). */
3037 ldlm_clear_cleaned(lock);
3042 osc_object_invalidate(env, osc);
3043 cl_object_put(env, osc2cl(osc));
3048 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3050 static int osc_import_event(struct obd_device *obd,
3051 struct obd_import *imp,
3052 enum obd_import_event event)
3054 struct client_obd *cli;
3058 LASSERT(imp->imp_obd == obd);
3061 case IMP_EVENT_DISCON: {
3063 spin_lock(&cli->cl_loi_list_lock);
3064 cli->cl_avail_grant = 0;
3065 cli->cl_lost_grant = 0;
3066 spin_unlock(&cli->cl_loi_list_lock);
3069 case IMP_EVENT_INACTIVE: {
3070 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3073 case IMP_EVENT_INVALIDATE: {
3074 struct ldlm_namespace *ns = obd->obd_namespace;
3078 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3080 env = cl_env_get(&refcheck);
3082 osc_io_unplug(env, &obd->u.cli, NULL);
3084 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3085 osc_ldlm_resource_invalidate,
3087 cl_env_put(env, &refcheck);
3089 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3094 case IMP_EVENT_ACTIVE: {
3095 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3098 case IMP_EVENT_OCD: {
3099 struct obd_connect_data *ocd = &imp->imp_connect_data;
3101 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3102 osc_init_grant(&obd->u.cli, ocd);
3105 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3106 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3108 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3111 case IMP_EVENT_DEACTIVATE: {
3112 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3115 case IMP_EVENT_ACTIVATE: {
3116 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3120 CERROR("Unknown import event %d\n", event);
3127 * Determine whether the lock can be canceled before replaying the lock
3128 * during recovery, see bug16774 for detailed information.
3130 * \retval zero the lock can't be canceled
3131 * \retval other ok to cancel
3133 static int osc_cancel_weight(struct ldlm_lock *lock)
3136 * Cancel all unused and granted extent lock.
3138 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3139 ldlm_is_granted(lock) &&
3140 osc_ldlm_weigh_ast(lock) == 0)
3146 static int brw_queue_work(const struct lu_env *env, void *data)
3148 struct client_obd *cli = data;
3150 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3152 osc_io_unplug(env, cli, NULL);
3156 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3158 struct client_obd *cli = &obd->u.cli;
3164 rc = ptlrpcd_addref();
3168 rc = client_obd_setup(obd, lcfg);
3170 GOTO(out_ptlrpcd, rc);
3173 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3174 if (IS_ERR(handler))
3175 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3176 cli->cl_writeback_work = handler;
3178 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3179 if (IS_ERR(handler))
3180 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3181 cli->cl_lru_work = handler;
3183 rc = osc_quota_setup(obd);
3185 GOTO(out_ptlrpcd_work, rc);
3187 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3188 osc_update_next_shrink(cli);
3193 if (cli->cl_writeback_work != NULL) {
3194 ptlrpcd_destroy_work(cli->cl_writeback_work);
3195 cli->cl_writeback_work = NULL;
3197 if (cli->cl_lru_work != NULL) {
3198 ptlrpcd_destroy_work(cli->cl_lru_work);
3199 cli->cl_lru_work = NULL;
3201 client_obd_cleanup(obd);
3206 EXPORT_SYMBOL(osc_setup_common);
3208 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3210 struct client_obd *cli = &obd->u.cli;
3218 rc = osc_setup_common(obd, lcfg);
3222 rc = osc_tunables_init(obd);
3227 * We try to control the total number of requests with a upper limit
3228 * osc_reqpool_maxreqcount. There might be some race which will cause
3229 * over-limit allocation, but it is fine.
3231 req_count = atomic_read(&osc_pool_req_count);
3232 if (req_count < osc_reqpool_maxreqcount) {
3233 adding = cli->cl_max_rpcs_in_flight + 2;
3234 if (req_count + adding > osc_reqpool_maxreqcount)
3235 adding = osc_reqpool_maxreqcount - req_count;
3237 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3238 atomic_add(added, &osc_pool_req_count);
3241 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3243 spin_lock(&osc_shrink_lock);
3244 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3245 spin_unlock(&osc_shrink_lock);
3246 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3247 cli->cl_import->imp_idle_debug = D_HA;
3252 int osc_precleanup_common(struct obd_device *obd)
3254 struct client_obd *cli = &obd->u.cli;
3258 * for echo client, export may be on zombie list, wait for
3259 * zombie thread to cull it, because cli.cl_import will be
3260 * cleared in client_disconnect_export():
3261 * class_export_destroy() -> obd_cleanup() ->
3262 * echo_device_free() -> echo_client_cleanup() ->
3263 * obd_disconnect() -> osc_disconnect() ->
3264 * client_disconnect_export()
3266 obd_zombie_barrier();
3267 if (cli->cl_writeback_work) {
3268 ptlrpcd_destroy_work(cli->cl_writeback_work);
3269 cli->cl_writeback_work = NULL;
3272 if (cli->cl_lru_work) {
3273 ptlrpcd_destroy_work(cli->cl_lru_work);
3274 cli->cl_lru_work = NULL;
3277 obd_cleanup_client_import(obd);
3280 EXPORT_SYMBOL(osc_precleanup_common);
3282 static int osc_precleanup(struct obd_device *obd)
3286 osc_precleanup_common(obd);
3288 ptlrpc_lprocfs_unregister_obd(obd);
3292 int osc_cleanup_common(struct obd_device *obd)
3294 struct client_obd *cli = &obd->u.cli;
3299 spin_lock(&osc_shrink_lock);
3300 list_del(&cli->cl_shrink_list);
3301 spin_unlock(&osc_shrink_lock);
3304 if (cli->cl_cache != NULL) {
3305 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3306 spin_lock(&cli->cl_cache->ccc_lru_lock);
3307 list_del_init(&cli->cl_lru_osc);
3308 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3309 cli->cl_lru_left = NULL;
3310 cl_cache_decref(cli->cl_cache);
3311 cli->cl_cache = NULL;
3314 /* free memory of osc quota cache */
3315 osc_quota_cleanup(obd);
3317 rc = client_obd_cleanup(obd);
3322 EXPORT_SYMBOL(osc_cleanup_common);
3324 static const struct obd_ops osc_obd_ops = {
3325 .o_owner = THIS_MODULE,
3326 .o_setup = osc_setup,
3327 .o_precleanup = osc_precleanup,
3328 .o_cleanup = osc_cleanup_common,
3329 .o_add_conn = client_import_add_conn,
3330 .o_del_conn = client_import_del_conn,
3331 .o_connect = client_connect_import,
3332 .o_reconnect = osc_reconnect,
3333 .o_disconnect = osc_disconnect,
3334 .o_statfs = osc_statfs,
3335 .o_statfs_async = osc_statfs_async,
3336 .o_create = osc_create,
3337 .o_destroy = osc_destroy,
3338 .o_getattr = osc_getattr,
3339 .o_setattr = osc_setattr,
3340 .o_iocontrol = osc_iocontrol,
3341 .o_set_info_async = osc_set_info_async,
3342 .o_import_event = osc_import_event,
3343 .o_quotactl = osc_quotactl,
3346 static struct shrinker *osc_cache_shrinker;
3347 LIST_HEAD(osc_shrink_list);
3348 DEFINE_SPINLOCK(osc_shrink_lock);
3350 #ifndef HAVE_SHRINKER_COUNT
3351 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3353 struct shrink_control scv = {
3354 .nr_to_scan = shrink_param(sc, nr_to_scan),
3355 .gfp_mask = shrink_param(sc, gfp_mask)
3357 (void)osc_cache_shrink_scan(shrinker, &scv);
3359 return osc_cache_shrink_count(shrinker, &scv);
3363 static int __init osc_init(void)
3365 unsigned int reqpool_size;
3366 unsigned int reqsize;
3368 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3369 osc_cache_shrink_count, osc_cache_shrink_scan);
3372 /* print an address of _any_ initialized kernel symbol from this
3373 * module, to allow debugging with gdb that doesn't support data
3374 * symbols from modules.*/
3375 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3377 rc = lu_kmem_init(osc_caches);
3381 rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3382 LUSTRE_OSC_NAME, &osc_device_type);
3386 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3388 /* This is obviously too much memory, only prevent overflow here */
3389 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3390 GOTO(out_type, rc = -EINVAL);
3392 reqpool_size = osc_reqpool_mem_max << 20;
3395 while (reqsize < OST_IO_MAXREQSIZE)
3396 reqsize = reqsize << 1;
3399 * We don't enlarge the request count in OSC pool according to
3400 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3401 * tried after normal allocation failed. So a small OSC pool won't
3402 * cause much performance degression in most of cases.
3404 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3406 atomic_set(&osc_pool_req_count, 0);
3407 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3408 ptlrpc_add_rqs_to_pool);
3410 if (osc_rq_pool == NULL)
3411 GOTO(out_type, rc = -ENOMEM);
3413 rc = osc_start_grant_work();
3415 GOTO(out_req_pool, rc);
3420 ptlrpc_free_rq_pool(osc_rq_pool);
3422 class_unregister_type(LUSTRE_OSC_NAME);
3424 lu_kmem_fini(osc_caches);
3429 static void __exit osc_exit(void)
3431 osc_stop_grant_work();
3432 remove_shrinker(osc_cache_shrinker);
3433 class_unregister_type(LUSTRE_OSC_NAME);
3434 lu_kmem_fini(osc_caches);
3435 ptlrpc_free_rq_pool(osc_rq_pool);
3438 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3439 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3440 MODULE_VERSION(LUSTRE_VERSION_STRING);
3441 MODULE_LICENSE("GPL");
3443 module_init(osc_init);
3444 module_exit(osc_exit);