4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
49 #include "osc_internal.h"
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
62 #define osc_grant_args osc_brw_async_args
64 struct osc_setattr_args {
66 obd_enqueue_update_f sa_upcall;
70 struct osc_fsync_args {
71 struct osc_object *fa_obj;
73 obd_enqueue_update_f fa_upcall;
77 struct osc_ladvise_args {
79 obd_enqueue_update_f la_upcall;
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
89 struct ost_body *body;
91 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100 struct ptlrpc_request *req;
101 struct ost_body *body;
105 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
111 ptlrpc_request_free(req);
115 osc_pack_req_body(req, oa);
117 ptlrpc_request_set_replen(req);
119 rc = ptlrpc_queue_wait(req);
123 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
125 GOTO(out, rc = -EPROTO);
127 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
130 oa->o_blksize = cli_brw_size(exp->exp_obd);
131 oa->o_valid |= OBD_MD_FLBLKSZ;
135 ptlrpc_req_finished(req);
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143 struct ptlrpc_request *req;
144 struct ost_body *body;
148 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
150 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
156 ptlrpc_request_free(req);
160 osc_pack_req_body(req, oa);
162 ptlrpc_request_set_replen(req);
164 rc = ptlrpc_queue_wait(req);
168 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
170 GOTO(out, rc = -EPROTO);
172 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176 ptlrpc_req_finished(req);
181 static int osc_setattr_interpret(const struct lu_env *env,
182 struct ptlrpc_request *req, void *args, int rc)
184 struct osc_setattr_args *sa = args;
185 struct ost_body *body;
192 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194 GOTO(out, rc = -EPROTO);
196 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199 rc = sa->sa_upcall(sa->sa_cookie, rc);
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204 obd_enqueue_update_f upcall, void *cookie,
205 struct ptlrpc_request_set *rqset)
207 struct ptlrpc_request *req;
208 struct osc_setattr_args *sa;
213 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
219 ptlrpc_request_free(req);
223 osc_pack_req_body(req, oa);
225 ptlrpc_request_set_replen(req);
227 /* do mds to ost setattr asynchronously */
229 /* Do not wait for response. */
230 ptlrpcd_add_req(req);
232 req->rq_interpret_reply = osc_setattr_interpret;
234 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
235 sa = ptlrpc_req_async_args(req);
237 sa->sa_upcall = upcall;
238 sa->sa_cookie = cookie;
240 if (rqset == PTLRPCD_SET)
241 ptlrpcd_add_req(req);
243 ptlrpc_set_add_req(rqset, req);
249 static int osc_ladvise_interpret(const struct lu_env *env,
250 struct ptlrpc_request *req,
253 struct osc_ladvise_args *la = arg;
254 struct ost_body *body;
260 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
262 GOTO(out, rc = -EPROTO);
264 *la->la_oa = body->oa;
266 rc = la->la_upcall(la->la_cookie, rc);
271 * If rqset is NULL, do not wait for response. Upcall and cookie could also
272 * be NULL in this case
274 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
275 struct ladvise_hdr *ladvise_hdr,
276 obd_enqueue_update_f upcall, void *cookie,
277 struct ptlrpc_request_set *rqset)
279 struct ptlrpc_request *req;
280 struct ost_body *body;
281 struct osc_ladvise_args *la;
283 struct lu_ladvise *req_ladvise;
284 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
285 int num_advise = ladvise_hdr->lah_count;
286 struct ladvise_hdr *req_ladvise_hdr;
289 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
293 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
294 num_advise * sizeof(*ladvise));
295 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
297 ptlrpc_request_free(req);
300 req->rq_request_portal = OST_IO_PORTAL;
301 ptlrpc_at_set_req_timeout(req);
303 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
305 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
308 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
309 &RMF_OST_LADVISE_HDR);
310 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
312 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
313 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
314 ptlrpc_request_set_replen(req);
317 /* Do not wait for response. */
318 ptlrpcd_add_req(req);
322 req->rq_interpret_reply = osc_ladvise_interpret;
323 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
324 la = ptlrpc_req_async_args(req);
326 la->la_upcall = upcall;
327 la->la_cookie = cookie;
329 if (rqset == PTLRPCD_SET)
330 ptlrpcd_add_req(req);
332 ptlrpc_set_add_req(rqset, req);
337 static int osc_create(const struct lu_env *env, struct obd_export *exp,
340 struct ptlrpc_request *req;
341 struct ost_body *body;
346 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
347 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
349 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
351 GOTO(out, rc = -ENOMEM);
353 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
355 ptlrpc_request_free(req);
359 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
362 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
364 ptlrpc_request_set_replen(req);
366 rc = ptlrpc_queue_wait(req);
370 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
372 GOTO(out_req, rc = -EPROTO);
374 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
375 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
377 oa->o_blksize = cli_brw_size(exp->exp_obd);
378 oa->o_valid |= OBD_MD_FLBLKSZ;
380 CDEBUG(D_HA, "transno: %lld\n",
381 lustre_msg_get_transno(req->rq_repmsg));
383 ptlrpc_req_finished(req);
388 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
389 obd_enqueue_update_f upcall, void *cookie)
391 struct ptlrpc_request *req;
392 struct osc_setattr_args *sa;
393 struct obd_import *imp = class_exp2cliimp(exp);
394 struct ost_body *body;
399 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
403 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
405 ptlrpc_request_free(req);
409 osc_set_io_portal(req);
411 ptlrpc_at_set_req_timeout(req);
413 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
415 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
417 ptlrpc_request_set_replen(req);
419 req->rq_interpret_reply = osc_setattr_interpret;
420 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
421 sa = ptlrpc_req_async_args(req);
423 sa->sa_upcall = upcall;
424 sa->sa_cookie = cookie;
426 ptlrpcd_add_req(req);
430 EXPORT_SYMBOL(osc_punch_send);
432 static int osc_sync_interpret(const struct lu_env *env,
433 struct ptlrpc_request *req, void *args, int rc)
435 struct osc_fsync_args *fa = args;
436 struct ost_body *body;
437 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
438 unsigned long valid = 0;
439 struct cl_object *obj;
445 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
447 CERROR("can't unpack ost_body\n");
448 GOTO(out, rc = -EPROTO);
451 *fa->fa_oa = body->oa;
452 obj = osc2cl(fa->fa_obj);
454 /* Update osc object's blocks attribute */
455 cl_object_attr_lock(obj);
456 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
457 attr->cat_blocks = body->oa.o_blocks;
462 cl_object_attr_update(env, obj, attr, valid);
463 cl_object_attr_unlock(obj);
466 rc = fa->fa_upcall(fa->fa_cookie, rc);
470 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
471 obd_enqueue_update_f upcall, void *cookie,
472 struct ptlrpc_request_set *rqset)
474 struct obd_export *exp = osc_export(obj);
475 struct ptlrpc_request *req;
476 struct ost_body *body;
477 struct osc_fsync_args *fa;
481 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
485 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
487 ptlrpc_request_free(req);
491 /* overload the size and blocks fields in the oa with start/end */
492 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
494 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
496 ptlrpc_request_set_replen(req);
497 req->rq_interpret_reply = osc_sync_interpret;
499 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
500 fa = ptlrpc_req_async_args(req);
503 fa->fa_upcall = upcall;
504 fa->fa_cookie = cookie;
506 if (rqset == PTLRPCD_SET)
507 ptlrpcd_add_req(req);
509 ptlrpc_set_add_req(rqset, req);
514 /* Find and cancel locally locks matched by @mode in the resource found by
515 * @objid. Found locks are added into @cancel list. Returns the amount of
516 * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518 struct list_head *cancels,
519 enum ldlm_mode mode, __u64 lock_flags)
521 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522 struct ldlm_res_id res_id;
523 struct ldlm_resource *res;
527 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528 * export) but disabled through procfs (flag in NS).
530 * This distinguishes from a case when ELC is not supported originally,
531 * when we still want to cancel locks in advance and just cancel them
532 * locally, without sending any RPC. */
533 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
536 ostid_build_res_name(&oa->o_oi, &res_id);
537 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
541 LDLM_RESOURCE_ADDREF(res);
542 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543 lock_flags, 0, NULL);
544 LDLM_RESOURCE_DELREF(res);
545 ldlm_resource_putref(res);
549 static int osc_destroy_interpret(const struct lu_env *env,
550 struct ptlrpc_request *req, void *args, int rc)
552 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
554 atomic_dec(&cli->cl_destroy_in_flight);
555 wake_up(&cli->cl_destroy_waitq);
560 static int osc_can_send_destroy(struct client_obd *cli)
562 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563 cli->cl_max_rpcs_in_flight) {
564 /* The destroy request can be sent */
567 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568 cli->cl_max_rpcs_in_flight) {
570 * The counter has been modified between the two atomic
573 wake_up(&cli->cl_destroy_waitq);
578 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
581 struct client_obd *cli = &exp->exp_obd->u.cli;
582 struct ptlrpc_request *req;
583 struct ost_body *body;
584 struct list_head cancels = LIST_HEAD_INIT(cancels);
589 CDEBUG(D_INFO, "oa NULL\n");
593 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
594 LDLM_FL_DISCARD_DATA);
596 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
598 ldlm_lock_list_put(&cancels, l_bl_ast, count);
602 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
605 ptlrpc_request_free(req);
609 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
610 ptlrpc_at_set_req_timeout(req);
612 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
616 ptlrpc_request_set_replen(req);
618 req->rq_interpret_reply = osc_destroy_interpret;
619 if (!osc_can_send_destroy(cli)) {
620 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
623 * Wait until the number of on-going destroy RPCs drops
624 * under max_rpc_in_flight
626 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
627 osc_can_send_destroy(cli), &lwi);
629 ptlrpc_req_finished(req);
634 /* Do not wait for response */
635 ptlrpcd_add_req(req);
639 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
642 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
644 LASSERT(!(oa->o_valid & bits));
647 spin_lock(&cli->cl_loi_list_lock);
648 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
649 oa->o_dirty = cli->cl_dirty_grant;
651 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
652 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
653 cli->cl_dirty_max_pages)) {
654 CERROR("dirty %lu - %lu > dirty_max %lu\n",
655 cli->cl_dirty_pages, cli->cl_dirty_transit,
656 cli->cl_dirty_max_pages);
658 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
659 atomic_long_read(&obd_dirty_transit_pages) >
660 (long)(obd_max_dirty_pages + 1))) {
661 /* The atomic_read() allowing the atomic_inc() are
662 * not covered by a lock thus they may safely race and trip
663 * this CERROR() unless we add in a small fudge factor (+1). */
664 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
665 cli_name(cli), atomic_long_read(&obd_dirty_pages),
666 atomic_long_read(&obd_dirty_transit_pages),
667 obd_max_dirty_pages);
669 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
671 CERROR("dirty %lu - dirty_max %lu too big???\n",
672 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
675 unsigned long nrpages;
676 unsigned long undirty;
678 nrpages = cli->cl_max_pages_per_rpc;
679 nrpages *= cli->cl_max_rpcs_in_flight + 1;
680 nrpages = max(nrpages, cli->cl_dirty_max_pages);
681 undirty = nrpages << PAGE_SHIFT;
682 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
686 /* take extent tax into account when asking for more
688 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
689 cli->cl_max_extent_pages;
690 undirty += nrextents * cli->cl_grant_extent_tax;
692 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
693 * to add extent tax, etc.
695 oa->o_undirty = min(undirty, OBD_MAX_GRANT -
696 (PTLRPC_MAX_BRW_PAGES << PAGE_SHIFT)*4UL);
698 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
699 oa->o_dropped = cli->cl_lost_grant;
700 cli->cl_lost_grant = 0;
701 spin_unlock(&cli->cl_loi_list_lock);
702 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
703 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
706 void osc_update_next_shrink(struct client_obd *cli)
708 cli->cl_next_shrink_grant = ktime_get_seconds() +
709 cli->cl_grant_shrink_interval;
711 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
712 cli->cl_next_shrink_grant);
715 static void __osc_update_grant(struct client_obd *cli, u64 grant)
717 spin_lock(&cli->cl_loi_list_lock);
718 cli->cl_avail_grant += grant;
719 spin_unlock(&cli->cl_loi_list_lock);
722 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
724 if (body->oa.o_valid & OBD_MD_FLGRANT) {
725 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
726 __osc_update_grant(cli, body->oa.o_grant);
731 * grant thread data for shrinking space.
733 struct grant_thread_data {
734 struct list_head gtd_clients;
735 struct mutex gtd_mutex;
736 unsigned long gtd_stopped:1;
738 static struct grant_thread_data client_gtd;
740 static int osc_shrink_grant_interpret(const struct lu_env *env,
741 struct ptlrpc_request *req,
744 struct osc_grant_args *aa = args;
745 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
746 struct ost_body *body;
749 __osc_update_grant(cli, aa->aa_oa->o_grant);
753 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
755 osc_update_grant(cli, body);
757 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
762 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
764 spin_lock(&cli->cl_loi_list_lock);
765 oa->o_grant = cli->cl_avail_grant / 4;
766 cli->cl_avail_grant -= oa->o_grant;
767 spin_unlock(&cli->cl_loi_list_lock);
768 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
769 oa->o_valid |= OBD_MD_FLFLAGS;
772 oa->o_flags |= OBD_FL_SHRINK_GRANT;
773 osc_update_next_shrink(cli);
776 /* Shrink the current grant, either from some large amount to enough for a
777 * full set of in-flight RPCs, or if we have already shrunk to that limit
778 * then to enough for a single RPC. This avoids keeping more grant than
779 * needed, and avoids shrinking the grant piecemeal. */
780 static int osc_shrink_grant(struct client_obd *cli)
782 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
783 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
785 spin_lock(&cli->cl_loi_list_lock);
786 if (cli->cl_avail_grant <= target_bytes)
787 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
788 spin_unlock(&cli->cl_loi_list_lock);
790 return osc_shrink_grant_to_target(cli, target_bytes);
793 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
796 struct ost_body *body;
799 spin_lock(&cli->cl_loi_list_lock);
800 /* Don't shrink if we are already above or below the desired limit
801 * We don't want to shrink below a single RPC, as that will negatively
802 * impact block allocation and long-term performance. */
803 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
804 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
806 if (target_bytes >= cli->cl_avail_grant) {
807 spin_unlock(&cli->cl_loi_list_lock);
810 spin_unlock(&cli->cl_loi_list_lock);
816 osc_announce_cached(cli, &body->oa, 0);
818 spin_lock(&cli->cl_loi_list_lock);
819 if (target_bytes >= cli->cl_avail_grant) {
820 /* available grant has changed since target calculation */
821 spin_unlock(&cli->cl_loi_list_lock);
822 GOTO(out_free, rc = 0);
824 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
825 cli->cl_avail_grant = target_bytes;
826 spin_unlock(&cli->cl_loi_list_lock);
827 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
828 body->oa.o_valid |= OBD_MD_FLFLAGS;
829 body->oa.o_flags = 0;
831 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
832 osc_update_next_shrink(cli);
834 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
835 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
836 sizeof(*body), body, NULL);
838 __osc_update_grant(cli, body->oa.o_grant);
844 static int osc_should_shrink_grant(struct client_obd *client)
846 time64_t next_shrink = client->cl_next_shrink_grant;
848 if (client->cl_import == NULL)
851 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
852 OBD_CONNECT_GRANT_SHRINK) == 0)
855 if (ktime_get_seconds() >= next_shrink - 5) {
856 /* Get the current RPC size directly, instead of going via:
857 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
858 * Keep comment here so that it can be found by searching. */
859 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
861 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
862 client->cl_avail_grant > brw_size)
865 osc_update_next_shrink(client);
870 #define GRANT_SHRINK_RPC_BATCH 100
872 static struct delayed_work work;
874 static void osc_grant_work_handler(struct work_struct *data)
876 struct client_obd *cli;
878 bool init_next_shrink = true;
879 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
882 mutex_lock(&client_gtd.gtd_mutex);
883 list_for_each_entry(cli, &client_gtd.gtd_clients,
885 if (++rpc_sent < GRANT_SHRINK_RPC_BATCH &&
886 osc_should_shrink_grant(cli))
887 osc_shrink_grant(cli);
889 if (!init_next_shrink) {
890 if (cli->cl_next_shrink_grant < next_shrink &&
891 cli->cl_next_shrink_grant > ktime_get_seconds())
892 next_shrink = cli->cl_next_shrink_grant;
894 init_next_shrink = false;
895 next_shrink = cli->cl_next_shrink_grant;
898 mutex_unlock(&client_gtd.gtd_mutex);
900 if (client_gtd.gtd_stopped == 1)
903 if (next_shrink > ktime_get_seconds())
904 schedule_delayed_work(&work, msecs_to_jiffies(
905 (next_shrink - ktime_get_seconds()) *
908 schedule_work(&work.work);
912 * Start grant thread for returing grant to server for idle clients.
914 static int osc_start_grant_work(void)
916 client_gtd.gtd_stopped = 0;
917 mutex_init(&client_gtd.gtd_mutex);
918 INIT_LIST_HEAD(&client_gtd.gtd_clients);
920 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
921 schedule_work(&work.work);
926 static void osc_stop_grant_work(void)
928 client_gtd.gtd_stopped = 1;
929 cancel_delayed_work_sync(&work);
932 static void osc_add_grant_list(struct client_obd *client)
934 mutex_lock(&client_gtd.gtd_mutex);
935 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
936 mutex_unlock(&client_gtd.gtd_mutex);
939 static void osc_del_grant_list(struct client_obd *client)
941 if (list_empty(&client->cl_grant_chain))
944 mutex_lock(&client_gtd.gtd_mutex);
945 list_del_init(&client->cl_grant_chain);
946 mutex_unlock(&client_gtd.gtd_mutex);
949 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
952 * ocd_grant is the total grant amount we're expect to hold: if we've
953 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
954 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
957 * race is tolerable here: if we're evicted, but imp_state already
958 * left EVICTED state, then cl_dirty_pages must be 0 already.
960 spin_lock(&cli->cl_loi_list_lock);
961 cli->cl_avail_grant = ocd->ocd_grant;
962 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
963 cli->cl_avail_grant -= cli->cl_reserved_grant;
964 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
965 cli->cl_avail_grant -= cli->cl_dirty_grant;
967 cli->cl_avail_grant -=
968 cli->cl_dirty_pages << PAGE_SHIFT;
971 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
975 /* overhead for each extent insertion */
976 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
977 /* determine the appropriate chunk size used by osc_extent. */
978 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
979 ocd->ocd_grant_blkbits);
980 /* max_pages_per_rpc must be chunk aligned */
981 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
982 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
983 ~chunk_mask) & chunk_mask;
984 /* determine maximum extent size, in #pages */
985 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
986 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
987 if (cli->cl_max_extent_pages == 0)
988 cli->cl_max_extent_pages = 1;
990 cli->cl_grant_extent_tax = 0;
991 cli->cl_chunkbits = PAGE_SHIFT;
992 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
994 spin_unlock(&cli->cl_loi_list_lock);
996 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
997 "chunk bits: %d cl_max_extent_pages: %d\n",
999 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1000 cli->cl_max_extent_pages);
1002 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1003 osc_add_grant_list(cli);
1005 EXPORT_SYMBOL(osc_init_grant);
1007 /* We assume that the reason this OSC got a short read is because it read
1008 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1009 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1010 * this stripe never got written at or beyond this stripe offset yet. */
1011 static void handle_short_read(int nob_read, size_t page_count,
1012 struct brw_page **pga)
1017 /* skip bytes read OK */
1018 while (nob_read > 0) {
1019 LASSERT (page_count > 0);
1021 if (pga[i]->count > nob_read) {
1022 /* EOF inside this page */
1023 ptr = kmap(pga[i]->pg) +
1024 (pga[i]->off & ~PAGE_MASK);
1025 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1032 nob_read -= pga[i]->count;
1037 /* zero remaining pages */
1038 while (page_count-- > 0) {
1039 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1040 memset(ptr, 0, pga[i]->count);
1046 static int check_write_rcs(struct ptlrpc_request *req,
1047 int requested_nob, int niocount,
1048 size_t page_count, struct brw_page **pga)
1053 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1054 sizeof(*remote_rcs) *
1056 if (remote_rcs == NULL) {
1057 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1061 /* return error if any niobuf was in error */
1062 for (i = 0; i < niocount; i++) {
1063 if ((int)remote_rcs[i] < 0)
1064 return(remote_rcs[i]);
1066 if (remote_rcs[i] != 0) {
1067 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1068 i, remote_rcs[i], req);
1072 if (req->rq_bulk != NULL &&
1073 req->rq_bulk->bd_nob_transferred != requested_nob) {
1074 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1075 req->rq_bulk->bd_nob_transferred, requested_nob);
1082 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1084 if (p1->flag != p2->flag) {
1085 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1086 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1087 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1089 /* warn if we try to combine flags that we don't know to be
1090 * safe to combine */
1091 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1092 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1093 "report this at https://jira.whamcloud.com/\n",
1094 p1->flag, p2->flag);
1099 return (p1->off + p1->count == p2->off);
1102 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1103 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1104 size_t pg_count, struct brw_page **pga,
1105 int opc, obd_dif_csum_fn *fn,
1109 struct ahash_request *req;
1110 /* Used Adler as the default checksum type on top of DIF tags */
1111 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1112 struct page *__page;
1113 unsigned char *buffer;
1115 unsigned int bufsize;
1117 int used_number = 0;
1123 LASSERT(pg_count > 0);
1125 __page = alloc_page(GFP_KERNEL);
1129 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1132 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1133 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1137 buffer = kmap(__page);
1138 guard_start = (__u16 *)buffer;
1139 guard_number = PAGE_SIZE / sizeof(*guard_start);
1140 while (nob > 0 && pg_count > 0) {
1141 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1143 /* corrupt the data before we compute the checksum, to
1144 * simulate an OST->client data error */
1145 if (unlikely(i == 0 && opc == OST_READ &&
1146 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1147 unsigned char *ptr = kmap(pga[i]->pg);
1148 int off = pga[i]->off & ~PAGE_MASK;
1150 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1155 * The left guard number should be able to hold checksums of a
1158 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1159 pga[i]->off & ~PAGE_MASK,
1161 guard_start + used_number,
1162 guard_number - used_number,
1168 used_number += used;
1169 if (used_number == guard_number) {
1170 cfs_crypto_hash_update_page(req, __page, 0,
1171 used_number * sizeof(*guard_start));
1175 nob -= pga[i]->count;
1183 if (used_number != 0)
1184 cfs_crypto_hash_update_page(req, __page, 0,
1185 used_number * sizeof(*guard_start));
1187 bufsize = sizeof(cksum);
1188 cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1190 /* For sending we only compute the wrong checksum instead
1191 * of corrupting the data so it is still correct on a redo */
1192 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1197 __free_page(__page);
1200 #else /* !CONFIG_CRC_T10DIF */
1201 #define obd_dif_ip_fn NULL
1202 #define obd_dif_crc_fn NULL
1203 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \
1205 #endif /* CONFIG_CRC_T10DIF */
1207 static int osc_checksum_bulk(int nob, size_t pg_count,
1208 struct brw_page **pga, int opc,
1209 enum cksum_types cksum_type,
1213 struct ahash_request *req;
1214 unsigned int bufsize;
1215 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1217 LASSERT(pg_count > 0);
1219 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1221 CERROR("Unable to initialize checksum hash %s\n",
1222 cfs_crypto_hash_name(cfs_alg));
1223 return PTR_ERR(req);
1226 while (nob > 0 && pg_count > 0) {
1227 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1229 /* corrupt the data before we compute the checksum, to
1230 * simulate an OST->client data error */
1231 if (i == 0 && opc == OST_READ &&
1232 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1233 unsigned char *ptr = kmap(pga[i]->pg);
1234 int off = pga[i]->off & ~PAGE_MASK;
1236 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1239 cfs_crypto_hash_update_page(req, pga[i]->pg,
1240 pga[i]->off & ~PAGE_MASK,
1242 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1243 (int)(pga[i]->off & ~PAGE_MASK));
1245 nob -= pga[i]->count;
1250 bufsize = sizeof(*cksum);
1251 cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1253 /* For sending we only compute the wrong checksum instead
1254 * of corrupting the data so it is still correct on a redo */
1255 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1261 static int osc_checksum_bulk_rw(const char *obd_name,
1262 enum cksum_types cksum_type,
1263 int nob, size_t pg_count,
1264 struct brw_page **pga, int opc,
1267 obd_dif_csum_fn *fn = NULL;
1268 int sector_size = 0;
1272 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1275 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1276 opc, fn, sector_size, check_sum);
1278 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1285 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1286 u32 page_count, struct brw_page **pga,
1287 struct ptlrpc_request **reqp, int resend)
1289 struct ptlrpc_request *req;
1290 struct ptlrpc_bulk_desc *desc;
1291 struct ost_body *body;
1292 struct obd_ioobj *ioobj;
1293 struct niobuf_remote *niobuf;
1294 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1295 struct osc_brw_async_args *aa;
1296 struct req_capsule *pill;
1297 struct brw_page *pg_prev;
1299 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1302 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1303 RETURN(-ENOMEM); /* Recoverable */
1304 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1305 RETURN(-EINVAL); /* Fatal */
1307 if ((cmd & OBD_BRW_WRITE) != 0) {
1309 req = ptlrpc_request_alloc_pool(cli->cl_import,
1311 &RQF_OST_BRW_WRITE);
1314 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1319 for (niocount = i = 1; i < page_count; i++) {
1320 if (!can_merge_pages(pga[i - 1], pga[i]))
1324 pill = &req->rq_pill;
1325 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1327 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1328 niocount * sizeof(*niobuf));
1330 for (i = 0; i < page_count; i++)
1331 short_io_size += pga[i]->count;
1333 /* Check if read/write is small enough to be a short io. */
1334 if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1335 !imp_connect_shortio(cli->cl_import))
1338 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1339 opc == OST_READ ? 0 : short_io_size);
1340 if (opc == OST_READ)
1341 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1344 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1346 ptlrpc_request_free(req);
1349 osc_set_io_portal(req);
1351 ptlrpc_at_set_req_timeout(req);
1352 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1354 req->rq_no_retry_einprogress = 1;
1356 if (short_io_size != 0) {
1358 short_io_buf = NULL;
1362 desc = ptlrpc_prep_bulk_imp(req, page_count,
1363 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1364 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1365 PTLRPC_BULK_PUT_SINK) |
1366 PTLRPC_BULK_BUF_KIOV,
1368 &ptlrpc_bulk_kiov_pin_ops);
1371 GOTO(out, rc = -ENOMEM);
1372 /* NB request now owns desc and will free it when it gets freed */
1374 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1375 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1376 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1377 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1379 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1381 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1382 * and from_kgid(), because they are asynchronous. Fortunately, variable
1383 * oa contains valid o_uid and o_gid in these two operations.
1384 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1385 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1386 * other process logic */
1387 body->oa.o_uid = oa->o_uid;
1388 body->oa.o_gid = oa->o_gid;
1390 obdo_to_ioobj(oa, ioobj);
1391 ioobj->ioo_bufcnt = niocount;
1392 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1393 * that might be send for this request. The actual number is decided
1394 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1395 * "max - 1" for old client compatibility sending "0", and also so the
1396 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1398 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1400 ioobj_max_brw_set(ioobj, 0);
1402 if (short_io_size != 0) {
1403 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1404 body->oa.o_valid |= OBD_MD_FLFLAGS;
1405 body->oa.o_flags = 0;
1407 body->oa.o_flags |= OBD_FL_SHORT_IO;
1408 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1410 if (opc == OST_WRITE) {
1411 short_io_buf = req_capsule_client_get(pill,
1413 LASSERT(short_io_buf != NULL);
1417 LASSERT(page_count > 0);
1419 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1420 struct brw_page *pg = pga[i];
1421 int poff = pg->off & ~PAGE_MASK;
1423 LASSERT(pg->count > 0);
1424 /* make sure there is no gap in the middle of page array */
1425 LASSERTF(page_count == 1 ||
1426 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1427 ergo(i > 0 && i < page_count - 1,
1428 poff == 0 && pg->count == PAGE_SIZE) &&
1429 ergo(i == page_count - 1, poff == 0)),
1430 "i: %d/%d pg: %p off: %llu, count: %u\n",
1431 i, page_count, pg, pg->off, pg->count);
1432 LASSERTF(i == 0 || pg->off > pg_prev->off,
1433 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1434 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1436 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1437 pg_prev->pg, page_private(pg_prev->pg),
1438 pg_prev->pg->index, pg_prev->off);
1439 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1440 (pg->flag & OBD_BRW_SRVLOCK));
1441 if (short_io_size != 0 && opc == OST_WRITE) {
1442 unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1444 LASSERT(short_io_size >= requested_nob + pg->count);
1445 memcpy(short_io_buf + requested_nob,
1448 ll_kunmap_atomic(ptr, KM_USER0);
1449 } else if (short_io_size == 0) {
1450 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1453 requested_nob += pg->count;
1455 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1457 niobuf->rnb_len += pg->count;
1459 niobuf->rnb_offset = pg->off;
1460 niobuf->rnb_len = pg->count;
1461 niobuf->rnb_flags = pg->flag;
1466 LASSERTF((void *)(niobuf - niocount) ==
1467 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1468 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1469 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1471 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1473 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1474 body->oa.o_valid |= OBD_MD_FLFLAGS;
1475 body->oa.o_flags = 0;
1477 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1480 if (osc_should_shrink_grant(cli))
1481 osc_shrink_grant_local(cli, &body->oa);
1483 /* size[REQ_REC_OFF] still sizeof (*body) */
1484 if (opc == OST_WRITE) {
1485 if (cli->cl_checksum &&
1486 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1487 /* store cl_cksum_type in a local variable since
1488 * it can be changed via lprocfs */
1489 enum cksum_types cksum_type = cli->cl_cksum_type;
1491 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1492 body->oa.o_flags = 0;
1494 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1496 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1498 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1499 requested_nob, page_count,
1503 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1507 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1510 /* save this in 'oa', too, for later checking */
1511 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1512 oa->o_flags |= obd_cksum_type_pack(obd_name,
1515 /* clear out the checksum flag, in case this is a
1516 * resend but cl_checksum is no longer set. b=11238 */
1517 oa->o_valid &= ~OBD_MD_FLCKSUM;
1519 oa->o_cksum = body->oa.o_cksum;
1520 /* 1 RC per niobuf */
1521 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1522 sizeof(__u32) * niocount);
1524 if (cli->cl_checksum &&
1525 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1526 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1527 body->oa.o_flags = 0;
1528 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1529 cli->cl_cksum_type);
1530 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1533 /* Client cksum has been already copied to wire obdo in previous
1534 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1535 * resent due to cksum error, this will allow Server to
1536 * check+dump pages on its side */
1538 ptlrpc_request_set_replen(req);
1540 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1541 aa = ptlrpc_req_async_args(req);
1543 aa->aa_requested_nob = requested_nob;
1544 aa->aa_nio_count = niocount;
1545 aa->aa_page_count = page_count;
1549 INIT_LIST_HEAD(&aa->aa_oaps);
1552 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1553 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1554 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1555 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1559 ptlrpc_req_finished(req);
1563 char dbgcksum_file_name[PATH_MAX];
1565 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1566 struct brw_page **pga, __u32 server_cksum,
1574 /* will only keep dump of pages on first error for the same range in
1575 * file/fid, not during the resends/retries. */
1576 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1577 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1578 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1579 libcfs_debug_file_path_arr :
1580 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1581 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1582 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1583 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1585 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1586 client_cksum, server_cksum);
1587 filp = filp_open(dbgcksum_file_name,
1588 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1592 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1593 "checksum error: rc = %d\n", dbgcksum_file_name,
1596 CERROR("%s: can't open to dump pages with checksum "
1597 "error: rc = %d\n", dbgcksum_file_name, rc);
1601 for (i = 0; i < page_count; i++) {
1602 len = pga[i]->count;
1603 buf = kmap(pga[i]->pg);
1605 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1607 CERROR("%s: wanted to write %u but got %d "
1608 "error\n", dbgcksum_file_name, len, rc);
1613 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1614 dbgcksum_file_name, rc);
1619 rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1621 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1622 filp_close(filp, NULL);
1627 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1628 __u32 client_cksum, __u32 server_cksum,
1629 struct osc_brw_async_args *aa)
1631 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1632 enum cksum_types cksum_type;
1633 obd_dif_csum_fn *fn = NULL;
1634 int sector_size = 0;
1639 if (server_cksum == client_cksum) {
1640 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1644 if (aa->aa_cli->cl_checksum_dump)
1645 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1646 server_cksum, client_cksum);
1648 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1651 switch (cksum_type) {
1652 case OBD_CKSUM_T10IP512:
1656 case OBD_CKSUM_T10IP4K:
1660 case OBD_CKSUM_T10CRC512:
1661 fn = obd_dif_crc_fn;
1664 case OBD_CKSUM_T10CRC4K:
1665 fn = obd_dif_crc_fn;
1673 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1674 aa->aa_page_count, aa->aa_ppga,
1675 OST_WRITE, fn, sector_size,
1678 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1679 aa->aa_ppga, OST_WRITE, cksum_type,
1683 msg = "failed to calculate the client write checksum";
1684 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1685 msg = "the server did not use the checksum type specified in "
1686 "the original request - likely a protocol problem";
1687 else if (new_cksum == server_cksum)
1688 msg = "changed on the client after we checksummed it - "
1689 "likely false positive due to mmap IO (bug 11742)";
1690 else if (new_cksum == client_cksum)
1691 msg = "changed in transit before arrival at OST";
1693 msg = "changed in transit AND doesn't match the original - "
1694 "likely false positive due to mmap IO (bug 11742)";
1696 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1697 DFID " object "DOSTID" extent [%llu-%llu], original "
1698 "client csum %x (type %x), server csum %x (type %x),"
1699 " client csum now %x\n",
1700 obd_name, msg, libcfs_nid2str(peer->nid),
1701 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1702 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1703 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1704 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1705 aa->aa_ppga[aa->aa_page_count - 1]->off +
1706 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1708 obd_cksum_type_unpack(aa->aa_oa->o_flags),
1709 server_cksum, cksum_type, new_cksum);
1713 /* Note rc enters this function as number of bytes transferred */
1714 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1716 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1717 struct client_obd *cli = aa->aa_cli;
1718 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1719 const struct lnet_process_id *peer =
1720 &req->rq_import->imp_connection->c_peer;
1721 struct ost_body *body;
1722 u32 client_cksum = 0;
1725 if (rc < 0 && rc != -EDQUOT) {
1726 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1730 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1731 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1733 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1737 /* set/clear over quota flag for a uid/gid/projid */
1738 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1739 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1740 unsigned qid[LL_MAXQUOTAS] = {
1741 body->oa.o_uid, body->oa.o_gid,
1742 body->oa.o_projid };
1743 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1744 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1745 body->oa.o_valid, body->oa.o_flags);
1746 osc_quota_setdq(cli, qid, body->oa.o_valid,
1750 osc_update_grant(cli, body);
1755 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1756 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1758 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1760 CERROR("Unexpected +ve rc %d\n", rc);
1764 if (req->rq_bulk != NULL &&
1765 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1768 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1769 check_write_checksum(&body->oa, peer, client_cksum,
1770 body->oa.o_cksum, aa))
1773 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1774 aa->aa_page_count, aa->aa_ppga);
1778 /* The rest of this function executes only for OST_READs */
1780 if (req->rq_bulk == NULL) {
1781 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1783 LASSERT(rc == req->rq_status);
1785 /* if unwrap_bulk failed, return -EAGAIN to retry */
1786 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1789 GOTO(out, rc = -EAGAIN);
1791 if (rc > aa->aa_requested_nob) {
1792 CERROR("Unexpected rc %d (%d requested)\n", rc,
1793 aa->aa_requested_nob);
1797 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1798 CERROR ("Unexpected rc %d (%d transferred)\n",
1799 rc, req->rq_bulk->bd_nob_transferred);
1803 if (req->rq_bulk == NULL) {
1805 int nob, pg_count, i = 0;
1808 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1809 pg_count = aa->aa_page_count;
1810 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1813 while (nob > 0 && pg_count > 0) {
1815 int count = aa->aa_ppga[i]->count > nob ?
1816 nob : aa->aa_ppga[i]->count;
1818 CDEBUG(D_CACHE, "page %p count %d\n",
1819 aa->aa_ppga[i]->pg, count);
1820 ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1821 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1823 ll_kunmap_atomic((void *) ptr, KM_USER0);
1832 if (rc < aa->aa_requested_nob)
1833 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1835 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1836 static int cksum_counter;
1837 u32 server_cksum = body->oa.o_cksum;
1840 enum cksum_types cksum_type;
1841 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1842 body->oa.o_flags : 0;
1844 cksum_type = obd_cksum_type_unpack(o_flags);
1845 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1846 aa->aa_page_count, aa->aa_ppga,
1847 OST_READ, &client_cksum);
1851 if (req->rq_bulk != NULL &&
1852 peer->nid != req->rq_bulk->bd_sender) {
1854 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1857 if (server_cksum != client_cksum) {
1858 struct ost_body *clbody;
1859 u32 page_count = aa->aa_page_count;
1861 clbody = req_capsule_client_get(&req->rq_pill,
1863 if (cli->cl_checksum_dump)
1864 dump_all_bulk_pages(&clbody->oa, page_count,
1865 aa->aa_ppga, server_cksum,
1868 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1869 "%s%s%s inode "DFID" object "DOSTID
1870 " extent [%llu-%llu], client %x, "
1871 "server %x, cksum_type %x\n",
1873 libcfs_nid2str(peer->nid),
1875 clbody->oa.o_valid & OBD_MD_FLFID ?
1876 clbody->oa.o_parent_seq : 0ULL,
1877 clbody->oa.o_valid & OBD_MD_FLFID ?
1878 clbody->oa.o_parent_oid : 0,
1879 clbody->oa.o_valid & OBD_MD_FLFID ?
1880 clbody->oa.o_parent_ver : 0,
1881 POSTID(&body->oa.o_oi),
1882 aa->aa_ppga[0]->off,
1883 aa->aa_ppga[page_count-1]->off +
1884 aa->aa_ppga[page_count-1]->count - 1,
1885 client_cksum, server_cksum,
1888 aa->aa_oa->o_cksum = client_cksum;
1892 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1895 } else if (unlikely(client_cksum)) {
1896 static int cksum_missed;
1899 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1900 CERROR("Checksum %u requested from %s but not sent\n",
1901 cksum_missed, libcfs_nid2str(peer->nid));
1907 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1908 aa->aa_oa, &body->oa);
1913 static int osc_brw_redo_request(struct ptlrpc_request *request,
1914 struct osc_brw_async_args *aa, int rc)
1916 struct ptlrpc_request *new_req;
1917 struct osc_brw_async_args *new_aa;
1918 struct osc_async_page *oap;
1921 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1922 "redo for recoverable error %d", rc);
1924 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1925 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1926 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1927 aa->aa_ppga, &new_req, 1);
1931 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1932 if (oap->oap_request != NULL) {
1933 LASSERTF(request == oap->oap_request,
1934 "request %p != oap_request %p\n",
1935 request, oap->oap_request);
1936 if (oap->oap_interrupted) {
1937 ptlrpc_req_finished(new_req);
1943 * New request takes over pga and oaps from old request.
1944 * Note that copying a list_head doesn't work, need to move it...
1947 new_req->rq_interpret_reply = request->rq_interpret_reply;
1948 new_req->rq_async_args = request->rq_async_args;
1949 new_req->rq_commit_cb = request->rq_commit_cb;
1950 /* cap resend delay to the current request timeout, this is similar to
1951 * what ptlrpc does (see after_reply()) */
1952 if (aa->aa_resends > new_req->rq_timeout)
1953 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1955 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1956 new_req->rq_generation_set = 1;
1957 new_req->rq_import_generation = request->rq_import_generation;
1959 new_aa = ptlrpc_req_async_args(new_req);
1961 INIT_LIST_HEAD(&new_aa->aa_oaps);
1962 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1963 INIT_LIST_HEAD(&new_aa->aa_exts);
1964 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1965 new_aa->aa_resends = aa->aa_resends;
1967 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1968 if (oap->oap_request) {
1969 ptlrpc_req_finished(oap->oap_request);
1970 oap->oap_request = ptlrpc_request_addref(new_req);
1974 /* XXX: This code will run into problem if we're going to support
1975 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1976 * and wait for all of them to be finished. We should inherit request
1977 * set from old request. */
1978 ptlrpcd_add_req(new_req);
1980 DEBUG_REQ(D_INFO, new_req, "new request");
1985 * ugh, we want disk allocation on the target to happen in offset order. we'll
1986 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1987 * fine for our small page arrays and doesn't require allocation. its an
1988 * insertion sort that swaps elements that are strides apart, shrinking the
1989 * stride down until its '1' and the array is sorted.
1991 static void sort_brw_pages(struct brw_page **array, int num)
1994 struct brw_page *tmp;
1998 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2003 for (i = stride ; i < num ; i++) {
2006 while (j >= stride && array[j - stride]->off > tmp->off) {
2007 array[j] = array[j - stride];
2012 } while (stride > 1);
2015 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2017 LASSERT(ppga != NULL);
2018 OBD_FREE(ppga, sizeof(*ppga) * count);
2021 static int brw_interpret(const struct lu_env *env,
2022 struct ptlrpc_request *req, void *args, int rc)
2024 struct osc_brw_async_args *aa = args;
2025 struct osc_extent *ext;
2026 struct osc_extent *tmp;
2027 struct client_obd *cli = aa->aa_cli;
2028 unsigned long transferred = 0;
2032 rc = osc_brw_fini_request(req, rc);
2033 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2035 * When server returns -EINPROGRESS, client should always retry
2036 * regardless of the number of times the bulk was resent already.
2038 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2039 if (req->rq_import_generation !=
2040 req->rq_import->imp_generation) {
2041 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2042 ""DOSTID", rc = %d.\n",
2043 req->rq_import->imp_obd->obd_name,
2044 POSTID(&aa->aa_oa->o_oi), rc);
2045 } else if (rc == -EINPROGRESS ||
2046 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2047 rc = osc_brw_redo_request(req, aa, rc);
2049 CERROR("%s: too many resent retries for object: "
2050 "%llu:%llu, rc = %d.\n",
2051 req->rq_import->imp_obd->obd_name,
2052 POSTID(&aa->aa_oa->o_oi), rc);
2057 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2062 struct obdo *oa = aa->aa_oa;
2063 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2064 unsigned long valid = 0;
2065 struct cl_object *obj;
2066 struct osc_async_page *last;
2068 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2069 obj = osc2cl(last->oap_obj);
2071 cl_object_attr_lock(obj);
2072 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2073 attr->cat_blocks = oa->o_blocks;
2074 valid |= CAT_BLOCKS;
2076 if (oa->o_valid & OBD_MD_FLMTIME) {
2077 attr->cat_mtime = oa->o_mtime;
2080 if (oa->o_valid & OBD_MD_FLATIME) {
2081 attr->cat_atime = oa->o_atime;
2084 if (oa->o_valid & OBD_MD_FLCTIME) {
2085 attr->cat_ctime = oa->o_ctime;
2089 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2090 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2091 loff_t last_off = last->oap_count + last->oap_obj_off +
2094 /* Change file size if this is an out of quota or
2095 * direct IO write and it extends the file size */
2096 if (loi->loi_lvb.lvb_size < last_off) {
2097 attr->cat_size = last_off;
2100 /* Extend KMS if it's not a lockless write */
2101 if (loi->loi_kms < last_off &&
2102 oap2osc_page(last)->ops_srvlock == 0) {
2103 attr->cat_kms = last_off;
2109 cl_object_attr_update(env, obj, attr, valid);
2110 cl_object_attr_unlock(obj);
2112 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2114 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2115 osc_inc_unstable_pages(req);
2117 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2118 list_del_init(&ext->oe_link);
2119 osc_extent_finish(env, ext, 1,
2120 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2122 LASSERT(list_empty(&aa->aa_exts));
2123 LASSERT(list_empty(&aa->aa_oaps));
2125 transferred = (req->rq_bulk == NULL ? /* short io */
2126 aa->aa_requested_nob :
2127 req->rq_bulk->bd_nob_transferred);
2129 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2130 ptlrpc_lprocfs_brw(req, transferred);
2132 spin_lock(&cli->cl_loi_list_lock);
2133 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2134 * is called so we know whether to go to sync BRWs or wait for more
2135 * RPCs to complete */
2136 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2137 cli->cl_w_in_flight--;
2139 cli->cl_r_in_flight--;
2140 osc_wake_cache_waiters(cli);
2141 spin_unlock(&cli->cl_loi_list_lock);
2143 osc_io_unplug(env, cli, NULL);
2147 static void brw_commit(struct ptlrpc_request *req)
2149 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2150 * this called via the rq_commit_cb, I need to ensure
2151 * osc_dec_unstable_pages is still called. Otherwise unstable
2152 * pages may be leaked. */
2153 spin_lock(&req->rq_lock);
2154 if (likely(req->rq_unstable)) {
2155 req->rq_unstable = 0;
2156 spin_unlock(&req->rq_lock);
2158 osc_dec_unstable_pages(req);
2160 req->rq_committed = 1;
2161 spin_unlock(&req->rq_lock);
2166 * Build an RPC by the list of extent @ext_list. The caller must ensure
2167 * that the total pages in this list are NOT over max pages per RPC.
2168 * Extents in the list must be in OES_RPC state.
2170 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2171 struct list_head *ext_list, int cmd)
2173 struct ptlrpc_request *req = NULL;
2174 struct osc_extent *ext;
2175 struct brw_page **pga = NULL;
2176 struct osc_brw_async_args *aa = NULL;
2177 struct obdo *oa = NULL;
2178 struct osc_async_page *oap;
2179 struct osc_object *obj = NULL;
2180 struct cl_req_attr *crattr = NULL;
2181 loff_t starting_offset = OBD_OBJECT_EOF;
2182 loff_t ending_offset = 0;
2186 bool soft_sync = false;
2187 bool interrupted = false;
2188 bool ndelay = false;
2192 __u32 layout_version = 0;
2193 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
2194 struct ost_body *body;
2196 LASSERT(!list_empty(ext_list));
2198 /* add pages into rpc_list to build BRW rpc */
2199 list_for_each_entry(ext, ext_list, oe_link) {
2200 LASSERT(ext->oe_state == OES_RPC);
2201 mem_tight |= ext->oe_memalloc;
2202 grant += ext->oe_grants;
2203 page_count += ext->oe_nr_pages;
2204 layout_version = MAX(layout_version, ext->oe_layout_version);
2209 soft_sync = osc_over_unstable_soft_limit(cli);
2211 mpflag = cfs_memory_pressure_get_and_set();
2213 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2215 GOTO(out, rc = -ENOMEM);
2217 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2219 GOTO(out, rc = -ENOMEM);
2222 list_for_each_entry(ext, ext_list, oe_link) {
2223 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2225 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2227 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2228 pga[i] = &oap->oap_brw_page;
2229 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2232 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2233 if (starting_offset == OBD_OBJECT_EOF ||
2234 starting_offset > oap->oap_obj_off)
2235 starting_offset = oap->oap_obj_off;
2237 LASSERT(oap->oap_page_off == 0);
2238 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2239 ending_offset = oap->oap_obj_off +
2242 LASSERT(oap->oap_page_off + oap->oap_count ==
2244 if (oap->oap_interrupted)
2251 /* first page in the list */
2252 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2254 crattr = &osc_env_info(env)->oti_req_attr;
2255 memset(crattr, 0, sizeof(*crattr));
2256 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2257 crattr->cra_flags = ~0ULL;
2258 crattr->cra_page = oap2cl_page(oap);
2259 crattr->cra_oa = oa;
2260 cl_req_attr_set(env, osc2cl(obj), crattr);
2262 if (cmd == OBD_BRW_WRITE) {
2263 oa->o_grant_used = grant;
2264 if (layout_version > 0) {
2265 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2266 PFID(&oa->o_oi.oi_fid), layout_version);
2268 oa->o_layout_version = layout_version;
2269 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2273 sort_brw_pages(pga, page_count);
2274 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2276 CERROR("prep_req failed: %d\n", rc);
2280 req->rq_commit_cb = brw_commit;
2281 req->rq_interpret_reply = brw_interpret;
2282 req->rq_memalloc = mem_tight != 0;
2283 oap->oap_request = ptlrpc_request_addref(req);
2284 if (interrupted && !req->rq_intr)
2285 ptlrpc_mark_interrupted(req);
2287 req->rq_no_resend = req->rq_no_delay = 1;
2288 /* probably set a shorter timeout value.
2289 * to handle ETIMEDOUT in brw_interpret() correctly. */
2290 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2293 /* Need to update the timestamps after the request is built in case
2294 * we race with setattr (locally or in queue at OST). If OST gets
2295 * later setattr before earlier BRW (as determined by the request xid),
2296 * the OST will not use BRW timestamps. Sadly, there is no obvious
2297 * way to do this in a single call. bug 10150 */
2298 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2299 crattr->cra_oa = &body->oa;
2300 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2301 cl_req_attr_set(env, osc2cl(obj), crattr);
2302 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2304 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2305 aa = ptlrpc_req_async_args(req);
2306 INIT_LIST_HEAD(&aa->aa_oaps);
2307 list_splice_init(&rpc_list, &aa->aa_oaps);
2308 INIT_LIST_HEAD(&aa->aa_exts);
2309 list_splice_init(ext_list, &aa->aa_exts);
2311 spin_lock(&cli->cl_loi_list_lock);
2312 starting_offset >>= PAGE_SHIFT;
2313 if (cmd == OBD_BRW_READ) {
2314 cli->cl_r_in_flight++;
2315 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2316 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2317 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2318 starting_offset + 1);
2320 cli->cl_w_in_flight++;
2321 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2322 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2323 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2324 starting_offset + 1);
2326 spin_unlock(&cli->cl_loi_list_lock);
2328 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2329 page_count, aa, cli->cl_r_in_flight,
2330 cli->cl_w_in_flight);
2331 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2333 ptlrpcd_add_req(req);
2339 cfs_memory_pressure_restore(mpflag);
2342 LASSERT(req == NULL);
2345 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2347 OBD_FREE(pga, sizeof(*pga) * page_count);
2348 /* this should happen rarely and is pretty bad, it makes the
2349 * pending list not follow the dirty order */
2350 while (!list_empty(ext_list)) {
2351 ext = list_entry(ext_list->next, struct osc_extent,
2353 list_del_init(&ext->oe_link);
2354 osc_extent_finish(env, ext, 0, rc);
2360 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2364 LASSERT(lock != NULL);
2366 lock_res_and_lock(lock);
2368 if (lock->l_ast_data == NULL)
2369 lock->l_ast_data = data;
2370 if (lock->l_ast_data == data)
2373 unlock_res_and_lock(lock);
2378 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2379 void *cookie, struct lustre_handle *lockh,
2380 enum ldlm_mode mode, __u64 *flags, bool speculative,
2383 bool intent = *flags & LDLM_FL_HAS_INTENT;
2387 /* The request was created before ldlm_cli_enqueue call. */
2388 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2389 struct ldlm_reply *rep;
2391 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2392 LASSERT(rep != NULL);
2394 rep->lock_policy_res1 =
2395 ptlrpc_status_ntoh(rep->lock_policy_res1);
2396 if (rep->lock_policy_res1)
2397 errcode = rep->lock_policy_res1;
2399 *flags |= LDLM_FL_LVB_READY;
2400 } else if (errcode == ELDLM_OK) {
2401 *flags |= LDLM_FL_LVB_READY;
2404 /* Call the update callback. */
2405 rc = (*upcall)(cookie, lockh, errcode);
2407 /* release the reference taken in ldlm_cli_enqueue() */
2408 if (errcode == ELDLM_LOCK_MATCHED)
2410 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2411 ldlm_lock_decref(lockh, mode);
2416 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2419 struct osc_enqueue_args *aa = args;
2420 struct ldlm_lock *lock;
2421 struct lustre_handle *lockh = &aa->oa_lockh;
2422 enum ldlm_mode mode = aa->oa_mode;
2423 struct ost_lvb *lvb = aa->oa_lvb;
2424 __u32 lvb_len = sizeof(*lvb);
2429 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2431 lock = ldlm_handle2lock(lockh);
2432 LASSERTF(lock != NULL,
2433 "lockh %#llx, req %p, aa %p - client evicted?\n",
2434 lockh->cookie, req, aa);
2436 /* Take an additional reference so that a blocking AST that
2437 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2438 * to arrive after an upcall has been executed by
2439 * osc_enqueue_fini(). */
2440 ldlm_lock_addref(lockh, mode);
2442 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2443 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2445 /* Let CP AST to grant the lock first. */
2446 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2448 if (aa->oa_speculative) {
2449 LASSERT(aa->oa_lvb == NULL);
2450 LASSERT(aa->oa_flags == NULL);
2451 aa->oa_flags = &flags;
2454 /* Complete obtaining the lock procedure. */
2455 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2456 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2458 /* Complete osc stuff. */
2459 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2460 aa->oa_flags, aa->oa_speculative, rc);
2462 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2464 ldlm_lock_decref(lockh, mode);
2465 LDLM_LOCK_PUT(lock);
2469 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2471 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2472 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2473 * other synchronous requests, however keeping some locks and trying to obtain
2474 * others may take a considerable amount of time in a case of ost failure; and
2475 * when other sync requests do not get released lock from a client, the client
2476 * is evicted from the cluster -- such scenarious make the life difficult, so
2477 * release locks just after they are obtained. */
2478 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2479 __u64 *flags, union ldlm_policy_data *policy,
2480 struct ost_lvb *lvb, int kms_valid,
2481 osc_enqueue_upcall_f upcall, void *cookie,
2482 struct ldlm_enqueue_info *einfo,
2483 struct ptlrpc_request_set *rqset, int async,
2486 struct obd_device *obd = exp->exp_obd;
2487 struct lustre_handle lockh = { 0 };
2488 struct ptlrpc_request *req = NULL;
2489 int intent = *flags & LDLM_FL_HAS_INTENT;
2490 __u64 match_flags = *flags;
2491 enum ldlm_mode mode;
2495 /* Filesystem lock extents are extended to page boundaries so that
2496 * dealing with the page cache is a little smoother. */
2497 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2498 policy->l_extent.end |= ~PAGE_MASK;
2501 * kms is not valid when either object is completely fresh (so that no
2502 * locks are cached), or object was evicted. In the latter case cached
2503 * lock cannot be used, because it would prime inode state with
2504 * potentially stale LVB.
2509 /* Next, search for already existing extent locks that will cover us */
2510 /* If we're trying to read, we also search for an existing PW lock. The
2511 * VFS and page cache already protect us locally, so lots of readers/
2512 * writers can share a single PW lock.
2514 * There are problems with conversion deadlocks, so instead of
2515 * converting a read lock to a write lock, we'll just enqueue a new
2518 * At some point we should cancel the read lock instead of making them
2519 * send us a blocking callback, but there are problems with canceling
2520 * locks out from other users right now, too. */
2521 mode = einfo->ei_mode;
2522 if (einfo->ei_mode == LCK_PR)
2524 /* Normal lock requests must wait for the LVB to be ready before
2525 * matching a lock; speculative lock requests do not need to,
2526 * because they will not actually use the lock. */
2528 match_flags |= LDLM_FL_LVB_READY;
2530 match_flags |= LDLM_FL_BLOCK_GRANTED;
2531 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2532 einfo->ei_type, policy, mode, &lockh, 0);
2534 struct ldlm_lock *matched;
2536 if (*flags & LDLM_FL_TEST_LOCK)
2539 matched = ldlm_handle2lock(&lockh);
2541 /* This DLM lock request is speculative, and does not
2542 * have an associated IO request. Therefore if there
2543 * is already a DLM lock, it wll just inform the
2544 * caller to cancel the request for this stripe.*/
2545 lock_res_and_lock(matched);
2546 if (ldlm_extent_equal(&policy->l_extent,
2547 &matched->l_policy_data.l_extent))
2551 unlock_res_and_lock(matched);
2553 ldlm_lock_decref(&lockh, mode);
2554 LDLM_LOCK_PUT(matched);
2556 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2557 *flags |= LDLM_FL_LVB_READY;
2559 /* We already have a lock, and it's referenced. */
2560 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2562 ldlm_lock_decref(&lockh, mode);
2563 LDLM_LOCK_PUT(matched);
2566 ldlm_lock_decref(&lockh, mode);
2567 LDLM_LOCK_PUT(matched);
2572 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2576 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2577 &RQF_LDLM_ENQUEUE_LVB);
2581 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2583 ptlrpc_request_free(req);
2587 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2589 ptlrpc_request_set_replen(req);
2592 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2593 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2595 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2596 sizeof(*lvb), LVB_T_OST, &lockh, async);
2599 struct osc_enqueue_args *aa;
2600 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2601 aa = ptlrpc_req_async_args(req);
2603 aa->oa_mode = einfo->ei_mode;
2604 aa->oa_type = einfo->ei_type;
2605 lustre_handle_copy(&aa->oa_lockh, &lockh);
2606 aa->oa_upcall = upcall;
2607 aa->oa_cookie = cookie;
2608 aa->oa_speculative = speculative;
2610 aa->oa_flags = flags;
2613 /* speculative locks are essentially to enqueue
2614 * a DLM lock in advance, so we don't care
2615 * about the result of the enqueue. */
2617 aa->oa_flags = NULL;
2620 req->rq_interpret_reply = osc_enqueue_interpret;
2621 if (rqset == PTLRPCD_SET)
2622 ptlrpcd_add_req(req);
2624 ptlrpc_set_add_req(rqset, req);
2625 } else if (intent) {
2626 ptlrpc_req_finished(req);
2631 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2632 flags, speculative, rc);
2634 ptlrpc_req_finished(req);
2639 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2640 enum ldlm_type type, union ldlm_policy_data *policy,
2641 enum ldlm_mode mode, __u64 *flags, void *data,
2642 struct lustre_handle *lockh, int unref)
2644 struct obd_device *obd = exp->exp_obd;
2645 __u64 lflags = *flags;
2649 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2652 /* Filesystem lock extents are extended to page boundaries so that
2653 * dealing with the page cache is a little smoother */
2654 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2655 policy->l_extent.end |= ~PAGE_MASK;
2657 /* Next, search for already existing extent locks that will cover us */
2658 /* If we're trying to read, we also search for an existing PW lock. The
2659 * VFS and page cache already protect us locally, so lots of readers/
2660 * writers can share a single PW lock. */
2664 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2665 res_id, type, policy, rc, lockh, unref);
2666 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2670 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2672 LASSERT(lock != NULL);
2673 if (!osc_set_lock_data(lock, data)) {
2674 ldlm_lock_decref(lockh, rc);
2677 LDLM_LOCK_PUT(lock);
2682 static int osc_statfs_interpret(const struct lu_env *env,
2683 struct ptlrpc_request *req, void *args, int rc)
2685 struct osc_async_args *aa = args;
2686 struct obd_statfs *msfs;
2691 * The request has in fact never been sent due to issues at
2692 * a higher level (LOV). Exit immediately since the caller
2693 * is aware of the problem and takes care of the clean up.
2697 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2698 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2704 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2706 GOTO(out, rc = -EPROTO);
2708 *aa->aa_oi->oi_osfs = *msfs;
2710 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2715 static int osc_statfs_async(struct obd_export *exp,
2716 struct obd_info *oinfo, time64_t max_age,
2717 struct ptlrpc_request_set *rqset)
2719 struct obd_device *obd = class_exp2obd(exp);
2720 struct ptlrpc_request *req;
2721 struct osc_async_args *aa;
2725 /* We could possibly pass max_age in the request (as an absolute
2726 * timestamp or a "seconds.usec ago") so the target can avoid doing
2727 * extra calls into the filesystem if that isn't necessary (e.g.
2728 * during mount that would help a bit). Having relative timestamps
2729 * is not so great if request processing is slow, while absolute
2730 * timestamps are not ideal because they need time synchronization. */
2731 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2735 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2737 ptlrpc_request_free(req);
2740 ptlrpc_request_set_replen(req);
2741 req->rq_request_portal = OST_CREATE_PORTAL;
2742 ptlrpc_at_set_req_timeout(req);
2744 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2745 /* procfs requests not want stat in wait for avoid deadlock */
2746 req->rq_no_resend = 1;
2747 req->rq_no_delay = 1;
2750 req->rq_interpret_reply = osc_statfs_interpret;
2751 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2752 aa = ptlrpc_req_async_args(req);
2755 ptlrpc_set_add_req(rqset, req);
2759 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2760 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2762 struct obd_device *obd = class_exp2obd(exp);
2763 struct obd_statfs *msfs;
2764 struct ptlrpc_request *req;
2765 struct obd_import *imp = NULL;
2770 /*Since the request might also come from lprocfs, so we need
2771 *sync this with client_disconnect_export Bug15684*/
2772 down_read(&obd->u.cli.cl_sem);
2773 if (obd->u.cli.cl_import)
2774 imp = class_import_get(obd->u.cli.cl_import);
2775 up_read(&obd->u.cli.cl_sem);
2779 /* We could possibly pass max_age in the request (as an absolute
2780 * timestamp or a "seconds.usec ago") so the target can avoid doing
2781 * extra calls into the filesystem if that isn't necessary (e.g.
2782 * during mount that would help a bit). Having relative timestamps
2783 * is not so great if request processing is slow, while absolute
2784 * timestamps are not ideal because they need time synchronization. */
2785 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2787 class_import_put(imp);
2792 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2794 ptlrpc_request_free(req);
2797 ptlrpc_request_set_replen(req);
2798 req->rq_request_portal = OST_CREATE_PORTAL;
2799 ptlrpc_at_set_req_timeout(req);
2801 if (flags & OBD_STATFS_NODELAY) {
2802 /* procfs requests not want stat in wait for avoid deadlock */
2803 req->rq_no_resend = 1;
2804 req->rq_no_delay = 1;
2807 rc = ptlrpc_queue_wait(req);
2811 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2813 GOTO(out, rc = -EPROTO);
2819 ptlrpc_req_finished(req);
2823 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2824 void *karg, void __user *uarg)
2826 struct obd_device *obd = exp->exp_obd;
2827 struct obd_ioctl_data *data = karg;
2831 if (!try_module_get(THIS_MODULE)) {
2832 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2833 module_name(THIS_MODULE));
2837 case OBD_IOC_CLIENT_RECOVER:
2838 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2839 data->ioc_inlbuf1, 0);
2843 case IOC_OSC_SET_ACTIVE:
2844 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2847 case OBD_IOC_PING_TARGET:
2848 err = ptlrpc_obd_ping(obd);
2851 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2852 cmd, current_comm());
2853 GOTO(out, err = -ENOTTY);
2856 module_put(THIS_MODULE);
2860 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2861 u32 keylen, void *key, u32 vallen, void *val,
2862 struct ptlrpc_request_set *set)
2864 struct ptlrpc_request *req;
2865 struct obd_device *obd = exp->exp_obd;
2866 struct obd_import *imp = class_exp2cliimp(exp);
2871 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2873 if (KEY_IS(KEY_CHECKSUM)) {
2874 if (vallen != sizeof(int))
2876 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2880 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2881 sptlrpc_conf_client_adapt(obd);
2885 if (KEY_IS(KEY_FLUSH_CTX)) {
2886 sptlrpc_import_flush_my_ctx(imp);
2890 if (KEY_IS(KEY_CACHE_SET)) {
2891 struct client_obd *cli = &obd->u.cli;
2893 LASSERT(cli->cl_cache == NULL); /* only once */
2894 cli->cl_cache = (struct cl_client_cache *)val;
2895 cl_cache_incref(cli->cl_cache);
2896 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2898 /* add this osc into entity list */
2899 LASSERT(list_empty(&cli->cl_lru_osc));
2900 spin_lock(&cli->cl_cache->ccc_lru_lock);
2901 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2902 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2907 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2908 struct client_obd *cli = &obd->u.cli;
2909 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2910 long target = *(long *)val;
2912 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2917 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2920 /* We pass all other commands directly to OST. Since nobody calls osc
2921 methods directly and everybody is supposed to go through LOV, we
2922 assume lov checked invalid values for us.
2923 The only recognised values so far are evict_by_nid and mds_conn.
2924 Even if something bad goes through, we'd get a -EINVAL from OST
2927 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2928 &RQF_OST_SET_GRANT_INFO :
2933 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2934 RCL_CLIENT, keylen);
2935 if (!KEY_IS(KEY_GRANT_SHRINK))
2936 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2937 RCL_CLIENT, vallen);
2938 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2940 ptlrpc_request_free(req);
2944 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2945 memcpy(tmp, key, keylen);
2946 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2949 memcpy(tmp, val, vallen);
2951 if (KEY_IS(KEY_GRANT_SHRINK)) {
2952 struct osc_grant_args *aa;
2955 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2956 aa = ptlrpc_req_async_args(req);
2957 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2959 ptlrpc_req_finished(req);
2962 *oa = ((struct ost_body *)val)->oa;
2964 req->rq_interpret_reply = osc_shrink_grant_interpret;
2967 ptlrpc_request_set_replen(req);
2968 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2969 LASSERT(set != NULL);
2970 ptlrpc_set_add_req(set, req);
2971 ptlrpc_check_set(NULL, set);
2973 ptlrpcd_add_req(req);
2978 EXPORT_SYMBOL(osc_set_info_async);
2980 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2981 struct obd_device *obd, struct obd_uuid *cluuid,
2982 struct obd_connect_data *data, void *localdata)
2984 struct client_obd *cli = &obd->u.cli;
2986 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2990 spin_lock(&cli->cl_loi_list_lock);
2991 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2992 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2993 grant += cli->cl_dirty_grant;
2995 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2996 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2997 lost_grant = cli->cl_lost_grant;
2998 cli->cl_lost_grant = 0;
2999 spin_unlock(&cli->cl_loi_list_lock);
3001 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3002 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3003 data->ocd_version, data->ocd_grant, lost_grant);
3008 EXPORT_SYMBOL(osc_reconnect);
3010 int osc_disconnect(struct obd_export *exp)
3012 struct obd_device *obd = class_exp2obd(exp);
3015 rc = client_disconnect_export(exp);
3017 * Initially we put del_shrink_grant before disconnect_export, but it
3018 * causes the following problem if setup (connect) and cleanup
3019 * (disconnect) are tangled together.
3020 * connect p1 disconnect p2
3021 * ptlrpc_connect_import
3022 * ............... class_manual_cleanup
3025 * ptlrpc_connect_interrupt
3027 * add this client to shrink list
3029 * Bang! grant shrink thread trigger the shrink. BUG18662
3031 osc_del_grant_list(&obd->u.cli);
3034 EXPORT_SYMBOL(osc_disconnect);
3036 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3037 struct hlist_node *hnode, void *arg)
3039 struct lu_env *env = arg;
3040 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3041 struct ldlm_lock *lock;
3042 struct osc_object *osc = NULL;
3046 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3047 if (lock->l_ast_data != NULL && osc == NULL) {
3048 osc = lock->l_ast_data;
3049 cl_object_get(osc2cl(osc));
3052 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3053 * by the 2nd round of ldlm_namespace_clean() call in
3054 * osc_import_event(). */
3055 ldlm_clear_cleaned(lock);
3060 osc_object_invalidate(env, osc);
3061 cl_object_put(env, osc2cl(osc));
3066 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3068 static int osc_import_event(struct obd_device *obd,
3069 struct obd_import *imp,
3070 enum obd_import_event event)
3072 struct client_obd *cli;
3076 LASSERT(imp->imp_obd == obd);
3079 case IMP_EVENT_DISCON: {
3081 spin_lock(&cli->cl_loi_list_lock);
3082 cli->cl_avail_grant = 0;
3083 cli->cl_lost_grant = 0;
3084 spin_unlock(&cli->cl_loi_list_lock);
3087 case IMP_EVENT_INACTIVE: {
3088 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3091 case IMP_EVENT_INVALIDATE: {
3092 struct ldlm_namespace *ns = obd->obd_namespace;
3096 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3098 env = cl_env_get(&refcheck);
3100 osc_io_unplug(env, &obd->u.cli, NULL);
3102 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3103 osc_ldlm_resource_invalidate,
3105 cl_env_put(env, &refcheck);
3107 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3112 case IMP_EVENT_ACTIVE: {
3113 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3116 case IMP_EVENT_OCD: {
3117 struct obd_connect_data *ocd = &imp->imp_connect_data;
3119 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3120 osc_init_grant(&obd->u.cli, ocd);
3123 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3124 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3126 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3129 case IMP_EVENT_DEACTIVATE: {
3130 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3133 case IMP_EVENT_ACTIVATE: {
3134 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3138 CERROR("Unknown import event %d\n", event);
3145 * Determine whether the lock can be canceled before replaying the lock
3146 * during recovery, see bug16774 for detailed information.
3148 * \retval zero the lock can't be canceled
3149 * \retval other ok to cancel
3151 static int osc_cancel_weight(struct ldlm_lock *lock)
3154 * Cancel all unused and granted extent lock.
3156 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3157 ldlm_is_granted(lock) &&
3158 osc_ldlm_weigh_ast(lock) == 0)
3164 static int brw_queue_work(const struct lu_env *env, void *data)
3166 struct client_obd *cli = data;
3168 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3170 osc_io_unplug(env, cli, NULL);
3174 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3176 struct client_obd *cli = &obd->u.cli;
3182 rc = ptlrpcd_addref();
3186 rc = client_obd_setup(obd, lcfg);
3188 GOTO(out_ptlrpcd, rc);
3191 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3192 if (IS_ERR(handler))
3193 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3194 cli->cl_writeback_work = handler;
3196 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3197 if (IS_ERR(handler))
3198 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3199 cli->cl_lru_work = handler;
3201 rc = osc_quota_setup(obd);
3203 GOTO(out_ptlrpcd_work, rc);
3205 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3206 osc_update_next_shrink(cli);
3211 if (cli->cl_writeback_work != NULL) {
3212 ptlrpcd_destroy_work(cli->cl_writeback_work);
3213 cli->cl_writeback_work = NULL;
3215 if (cli->cl_lru_work != NULL) {
3216 ptlrpcd_destroy_work(cli->cl_lru_work);
3217 cli->cl_lru_work = NULL;
3219 client_obd_cleanup(obd);
3224 EXPORT_SYMBOL(osc_setup_common);
3226 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3228 struct client_obd *cli = &obd->u.cli;
3236 rc = osc_setup_common(obd, lcfg);
3240 rc = osc_tunables_init(obd);
3245 * We try to control the total number of requests with a upper limit
3246 * osc_reqpool_maxreqcount. There might be some race which will cause
3247 * over-limit allocation, but it is fine.
3249 req_count = atomic_read(&osc_pool_req_count);
3250 if (req_count < osc_reqpool_maxreqcount) {
3251 adding = cli->cl_max_rpcs_in_flight + 2;
3252 if (req_count + adding > osc_reqpool_maxreqcount)
3253 adding = osc_reqpool_maxreqcount - req_count;
3255 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3256 atomic_add(added, &osc_pool_req_count);
3259 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3261 spin_lock(&osc_shrink_lock);
3262 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3263 spin_unlock(&osc_shrink_lock);
3264 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3265 cli->cl_import->imp_idle_debug = D_HA;
3270 int osc_precleanup_common(struct obd_device *obd)
3272 struct client_obd *cli = &obd->u.cli;
3276 * for echo client, export may be on zombie list, wait for
3277 * zombie thread to cull it, because cli.cl_import will be
3278 * cleared in client_disconnect_export():
3279 * class_export_destroy() -> obd_cleanup() ->
3280 * echo_device_free() -> echo_client_cleanup() ->
3281 * obd_disconnect() -> osc_disconnect() ->
3282 * client_disconnect_export()
3284 obd_zombie_barrier();
3285 if (cli->cl_writeback_work) {
3286 ptlrpcd_destroy_work(cli->cl_writeback_work);
3287 cli->cl_writeback_work = NULL;
3290 if (cli->cl_lru_work) {
3291 ptlrpcd_destroy_work(cli->cl_lru_work);
3292 cli->cl_lru_work = NULL;
3295 obd_cleanup_client_import(obd);
3298 EXPORT_SYMBOL(osc_precleanup_common);
3300 static int osc_precleanup(struct obd_device *obd)
3304 osc_precleanup_common(obd);
3306 ptlrpc_lprocfs_unregister_obd(obd);
3310 int osc_cleanup_common(struct obd_device *obd)
3312 struct client_obd *cli = &obd->u.cli;
3317 spin_lock(&osc_shrink_lock);
3318 list_del(&cli->cl_shrink_list);
3319 spin_unlock(&osc_shrink_lock);
3322 if (cli->cl_cache != NULL) {
3323 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3324 spin_lock(&cli->cl_cache->ccc_lru_lock);
3325 list_del_init(&cli->cl_lru_osc);
3326 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3327 cli->cl_lru_left = NULL;
3328 cl_cache_decref(cli->cl_cache);
3329 cli->cl_cache = NULL;
3332 /* free memory of osc quota cache */
3333 osc_quota_cleanup(obd);
3335 rc = client_obd_cleanup(obd);
3340 EXPORT_SYMBOL(osc_cleanup_common);
3342 static struct obd_ops osc_obd_ops = {
3343 .o_owner = THIS_MODULE,
3344 .o_setup = osc_setup,
3345 .o_precleanup = osc_precleanup,
3346 .o_cleanup = osc_cleanup_common,
3347 .o_add_conn = client_import_add_conn,
3348 .o_del_conn = client_import_del_conn,
3349 .o_connect = client_connect_import,
3350 .o_reconnect = osc_reconnect,
3351 .o_disconnect = osc_disconnect,
3352 .o_statfs = osc_statfs,
3353 .o_statfs_async = osc_statfs_async,
3354 .o_create = osc_create,
3355 .o_destroy = osc_destroy,
3356 .o_getattr = osc_getattr,
3357 .o_setattr = osc_setattr,
3358 .o_iocontrol = osc_iocontrol,
3359 .o_set_info_async = osc_set_info_async,
3360 .o_import_event = osc_import_event,
3361 .o_quotactl = osc_quotactl,
3364 static struct shrinker *osc_cache_shrinker;
3365 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3366 DEFINE_SPINLOCK(osc_shrink_lock);
3368 #ifndef HAVE_SHRINKER_COUNT
3369 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3371 struct shrink_control scv = {
3372 .nr_to_scan = shrink_param(sc, nr_to_scan),
3373 .gfp_mask = shrink_param(sc, gfp_mask)
3375 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3376 struct shrinker *shrinker = NULL;
3379 (void)osc_cache_shrink_scan(shrinker, &scv);
3381 return osc_cache_shrink_count(shrinker, &scv);
3385 static int __init osc_init(void)
3387 bool enable_proc = true;
3388 struct obd_type *type;
3389 unsigned int reqpool_size;
3390 unsigned int reqsize;
3392 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3393 osc_cache_shrink_count, osc_cache_shrink_scan);
3396 /* print an address of _any_ initialized kernel symbol from this
3397 * module, to allow debugging with gdb that doesn't support data
3398 * symbols from modules.*/
3399 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3401 rc = lu_kmem_init(osc_caches);
3405 type = class_search_type(LUSTRE_OSP_NAME);
3406 if (type != NULL && type->typ_procsym != NULL)
3407 enable_proc = false;
3409 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3410 LUSTRE_OSC_NAME, &osc_device_type);
3414 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3416 /* This is obviously too much memory, only prevent overflow here */
3417 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3418 GOTO(out_type, rc = -EINVAL);
3420 reqpool_size = osc_reqpool_mem_max << 20;
3423 while (reqsize < OST_IO_MAXREQSIZE)
3424 reqsize = reqsize << 1;
3427 * We don't enlarge the request count in OSC pool according to
3428 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3429 * tried after normal allocation failed. So a small OSC pool won't
3430 * cause much performance degression in most of cases.
3432 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3434 atomic_set(&osc_pool_req_count, 0);
3435 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3436 ptlrpc_add_rqs_to_pool);
3438 if (osc_rq_pool == NULL)
3439 GOTO(out_type, rc = -ENOMEM);
3441 rc = osc_start_grant_work();
3443 GOTO(out_req_pool, rc);
3448 ptlrpc_free_rq_pool(osc_rq_pool);
3450 class_unregister_type(LUSTRE_OSC_NAME);
3452 lu_kmem_fini(osc_caches);
3457 static void __exit osc_exit(void)
3459 osc_stop_grant_work();
3460 remove_shrinker(osc_cache_shrinker);
3461 class_unregister_type(LUSTRE_OSC_NAME);
3462 lu_kmem_fini(osc_caches);
3463 ptlrpc_free_rq_pool(osc_rq_pool);
3466 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3467 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3468 MODULE_VERSION(LUSTRE_VERSION_STRING);
3469 MODULE_LICENSE("GPL");
3471 module_init(osc_init);
3472 module_exit(osc_exit);