4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
49 #include "osc_internal.h"
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
62 #define osc_grant_args osc_brw_async_args
64 struct osc_setattr_args {
66 obd_enqueue_update_f sa_upcall;
70 struct osc_fsync_args {
71 struct osc_object *fa_obj;
73 obd_enqueue_update_f fa_upcall;
77 struct osc_ladvise_args {
79 obd_enqueue_update_f la_upcall;
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
89 struct ost_body *body;
91 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100 struct ptlrpc_request *req;
101 struct ost_body *body;
105 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
111 ptlrpc_request_free(req);
115 osc_pack_req_body(req, oa);
117 ptlrpc_request_set_replen(req);
119 rc = ptlrpc_queue_wait(req);
123 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
125 GOTO(out, rc = -EPROTO);
127 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
130 oa->o_blksize = cli_brw_size(exp->exp_obd);
131 oa->o_valid |= OBD_MD_FLBLKSZ;
135 ptlrpc_req_finished(req);
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143 struct ptlrpc_request *req;
144 struct ost_body *body;
148 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
150 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
156 ptlrpc_request_free(req);
160 osc_pack_req_body(req, oa);
162 ptlrpc_request_set_replen(req);
164 rc = ptlrpc_queue_wait(req);
168 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
170 GOTO(out, rc = -EPROTO);
172 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176 ptlrpc_req_finished(req);
181 static int osc_setattr_interpret(const struct lu_env *env,
182 struct ptlrpc_request *req, void *args, int rc)
184 struct osc_setattr_args *sa = args;
185 struct ost_body *body;
192 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194 GOTO(out, rc = -EPROTO);
196 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199 rc = sa->sa_upcall(sa->sa_cookie, rc);
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204 obd_enqueue_update_f upcall, void *cookie,
205 struct ptlrpc_request_set *rqset)
207 struct ptlrpc_request *req;
208 struct osc_setattr_args *sa;
213 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
219 ptlrpc_request_free(req);
223 osc_pack_req_body(req, oa);
225 ptlrpc_request_set_replen(req);
227 /* do mds to ost setattr asynchronously */
229 /* Do not wait for response. */
230 ptlrpcd_add_req(req);
232 req->rq_interpret_reply = osc_setattr_interpret;
234 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
235 sa = ptlrpc_req_async_args(req);
237 sa->sa_upcall = upcall;
238 sa->sa_cookie = cookie;
240 if (rqset == PTLRPCD_SET)
241 ptlrpcd_add_req(req);
243 ptlrpc_set_add_req(rqset, req);
249 static int osc_ladvise_interpret(const struct lu_env *env,
250 struct ptlrpc_request *req,
253 struct osc_ladvise_args *la = arg;
254 struct ost_body *body;
260 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
262 GOTO(out, rc = -EPROTO);
264 *la->la_oa = body->oa;
266 rc = la->la_upcall(la->la_cookie, rc);
271 * If rqset is NULL, do not wait for response. Upcall and cookie could also
272 * be NULL in this case
274 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
275 struct ladvise_hdr *ladvise_hdr,
276 obd_enqueue_update_f upcall, void *cookie,
277 struct ptlrpc_request_set *rqset)
279 struct ptlrpc_request *req;
280 struct ost_body *body;
281 struct osc_ladvise_args *la;
283 struct lu_ladvise *req_ladvise;
284 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
285 int num_advise = ladvise_hdr->lah_count;
286 struct ladvise_hdr *req_ladvise_hdr;
289 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
293 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
294 num_advise * sizeof(*ladvise));
295 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
297 ptlrpc_request_free(req);
300 req->rq_request_portal = OST_IO_PORTAL;
301 ptlrpc_at_set_req_timeout(req);
303 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
305 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
308 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
309 &RMF_OST_LADVISE_HDR);
310 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
312 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
313 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
314 ptlrpc_request_set_replen(req);
317 /* Do not wait for response. */
318 ptlrpcd_add_req(req);
322 req->rq_interpret_reply = osc_ladvise_interpret;
323 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
324 la = ptlrpc_req_async_args(req);
326 la->la_upcall = upcall;
327 la->la_cookie = cookie;
329 if (rqset == PTLRPCD_SET)
330 ptlrpcd_add_req(req);
332 ptlrpc_set_add_req(rqset, req);
337 static int osc_create(const struct lu_env *env, struct obd_export *exp,
340 struct ptlrpc_request *req;
341 struct ost_body *body;
346 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
347 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
349 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
351 GOTO(out, rc = -ENOMEM);
353 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
355 ptlrpc_request_free(req);
359 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
362 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
364 ptlrpc_request_set_replen(req);
366 rc = ptlrpc_queue_wait(req);
370 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
372 GOTO(out_req, rc = -EPROTO);
374 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
375 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
377 oa->o_blksize = cli_brw_size(exp->exp_obd);
378 oa->o_valid |= OBD_MD_FLBLKSZ;
380 CDEBUG(D_HA, "transno: %lld\n",
381 lustre_msg_get_transno(req->rq_repmsg));
383 ptlrpc_req_finished(req);
388 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
389 obd_enqueue_update_f upcall, void *cookie)
391 struct ptlrpc_request *req;
392 struct osc_setattr_args *sa;
393 struct obd_import *imp = class_exp2cliimp(exp);
394 struct ost_body *body;
399 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
403 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
405 ptlrpc_request_free(req);
409 osc_set_io_portal(req);
411 ptlrpc_at_set_req_timeout(req);
413 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
415 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
417 ptlrpc_request_set_replen(req);
419 req->rq_interpret_reply = osc_setattr_interpret;
420 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
421 sa = ptlrpc_req_async_args(req);
423 sa->sa_upcall = upcall;
424 sa->sa_cookie = cookie;
426 ptlrpcd_add_req(req);
430 EXPORT_SYMBOL(osc_punch_send);
432 static int osc_sync_interpret(const struct lu_env *env,
433 struct ptlrpc_request *req, void *args, int rc)
435 struct osc_fsync_args *fa = args;
436 struct ost_body *body;
437 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
438 unsigned long valid = 0;
439 struct cl_object *obj;
445 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
447 CERROR("can't unpack ost_body\n");
448 GOTO(out, rc = -EPROTO);
451 *fa->fa_oa = body->oa;
452 obj = osc2cl(fa->fa_obj);
454 /* Update osc object's blocks attribute */
455 cl_object_attr_lock(obj);
456 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
457 attr->cat_blocks = body->oa.o_blocks;
462 cl_object_attr_update(env, obj, attr, valid);
463 cl_object_attr_unlock(obj);
466 rc = fa->fa_upcall(fa->fa_cookie, rc);
470 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
471 obd_enqueue_update_f upcall, void *cookie,
472 struct ptlrpc_request_set *rqset)
474 struct obd_export *exp = osc_export(obj);
475 struct ptlrpc_request *req;
476 struct ost_body *body;
477 struct osc_fsync_args *fa;
481 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
485 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
487 ptlrpc_request_free(req);
491 /* overload the size and blocks fields in the oa with start/end */
492 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
494 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
496 ptlrpc_request_set_replen(req);
497 req->rq_interpret_reply = osc_sync_interpret;
499 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
500 fa = ptlrpc_req_async_args(req);
503 fa->fa_upcall = upcall;
504 fa->fa_cookie = cookie;
506 if (rqset == PTLRPCD_SET)
507 ptlrpcd_add_req(req);
509 ptlrpc_set_add_req(rqset, req);
514 /* Find and cancel locally locks matched by @mode in the resource found by
515 * @objid. Found locks are added into @cancel list. Returns the amount of
516 * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518 struct list_head *cancels,
519 enum ldlm_mode mode, __u64 lock_flags)
521 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522 struct ldlm_res_id res_id;
523 struct ldlm_resource *res;
527 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528 * export) but disabled through procfs (flag in NS).
530 * This distinguishes from a case when ELC is not supported originally,
531 * when we still want to cancel locks in advance and just cancel them
532 * locally, without sending any RPC. */
533 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
536 ostid_build_res_name(&oa->o_oi, &res_id);
537 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
541 LDLM_RESOURCE_ADDREF(res);
542 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543 lock_flags, 0, NULL);
544 LDLM_RESOURCE_DELREF(res);
545 ldlm_resource_putref(res);
549 static int osc_destroy_interpret(const struct lu_env *env,
550 struct ptlrpc_request *req, void *args, int rc)
552 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
554 atomic_dec(&cli->cl_destroy_in_flight);
555 wake_up(&cli->cl_destroy_waitq);
560 static int osc_can_send_destroy(struct client_obd *cli)
562 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563 cli->cl_max_rpcs_in_flight) {
564 /* The destroy request can be sent */
567 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568 cli->cl_max_rpcs_in_flight) {
570 * The counter has been modified between the two atomic
573 wake_up(&cli->cl_destroy_waitq);
578 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
581 struct client_obd *cli = &exp->exp_obd->u.cli;
582 struct ptlrpc_request *req;
583 struct ost_body *body;
584 struct list_head cancels = LIST_HEAD_INIT(cancels);
589 CDEBUG(D_INFO, "oa NULL\n");
593 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
594 LDLM_FL_DISCARD_DATA);
596 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
598 ldlm_lock_list_put(&cancels, l_bl_ast, count);
602 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
605 ptlrpc_request_free(req);
609 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
610 ptlrpc_at_set_req_timeout(req);
612 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
616 ptlrpc_request_set_replen(req);
618 req->rq_interpret_reply = osc_destroy_interpret;
619 if (!osc_can_send_destroy(cli)) {
620 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
623 * Wait until the number of on-going destroy RPCs drops
624 * under max_rpc_in_flight
626 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
627 osc_can_send_destroy(cli), &lwi);
629 ptlrpc_req_finished(req);
634 /* Do not wait for response */
635 ptlrpcd_add_req(req);
639 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
642 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
644 LASSERT(!(oa->o_valid & bits));
647 spin_lock(&cli->cl_loi_list_lock);
648 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
649 oa->o_dirty = cli->cl_dirty_grant;
651 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
652 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
653 cli->cl_dirty_max_pages)) {
654 CERROR("dirty %lu - %lu > dirty_max %lu\n",
655 cli->cl_dirty_pages, cli->cl_dirty_transit,
656 cli->cl_dirty_max_pages);
658 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
659 atomic_long_read(&obd_dirty_transit_pages) >
660 (long)(obd_max_dirty_pages + 1))) {
661 /* The atomic_read() allowing the atomic_inc() are
662 * not covered by a lock thus they may safely race and trip
663 * this CERROR() unless we add in a small fudge factor (+1). */
664 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
665 cli_name(cli), atomic_long_read(&obd_dirty_pages),
666 atomic_long_read(&obd_dirty_transit_pages),
667 obd_max_dirty_pages);
669 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
671 CERROR("dirty %lu - dirty_max %lu too big???\n",
672 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
675 unsigned long nrpages;
676 unsigned long undirty;
678 nrpages = cli->cl_max_pages_per_rpc;
679 nrpages *= cli->cl_max_rpcs_in_flight + 1;
680 nrpages = max(nrpages, cli->cl_dirty_max_pages);
681 undirty = nrpages << PAGE_SHIFT;
682 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
686 /* take extent tax into account when asking for more
688 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
689 cli->cl_max_extent_pages;
690 undirty += nrextents * cli->cl_grant_extent_tax;
692 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
693 * to add extent tax, etc.
695 oa->o_undirty = min(undirty, OBD_MAX_GRANT -
696 (PTLRPC_MAX_BRW_PAGES << PAGE_SHIFT)*4UL);
698 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
699 oa->o_dropped = cli->cl_lost_grant;
700 cli->cl_lost_grant = 0;
701 spin_unlock(&cli->cl_loi_list_lock);
702 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
703 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
706 void osc_update_next_shrink(struct client_obd *cli)
708 cli->cl_next_shrink_grant = ktime_get_seconds() +
709 cli->cl_grant_shrink_interval;
711 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
712 cli->cl_next_shrink_grant);
715 static void __osc_update_grant(struct client_obd *cli, u64 grant)
717 spin_lock(&cli->cl_loi_list_lock);
718 cli->cl_avail_grant += grant;
719 spin_unlock(&cli->cl_loi_list_lock);
722 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
724 if (body->oa.o_valid & OBD_MD_FLGRANT) {
725 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
726 __osc_update_grant(cli, body->oa.o_grant);
731 * grant thread data for shrinking space.
733 struct grant_thread_data {
734 struct list_head gtd_clients;
735 struct mutex gtd_mutex;
736 unsigned long gtd_stopped:1;
738 static struct grant_thread_data client_gtd;
740 static int osc_shrink_grant_interpret(const struct lu_env *env,
741 struct ptlrpc_request *req,
744 struct osc_grant_args *aa = args;
745 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
746 struct ost_body *body;
749 __osc_update_grant(cli, aa->aa_oa->o_grant);
753 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
755 osc_update_grant(cli, body);
757 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
762 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
764 spin_lock(&cli->cl_loi_list_lock);
765 oa->o_grant = cli->cl_avail_grant / 4;
766 cli->cl_avail_grant -= oa->o_grant;
767 spin_unlock(&cli->cl_loi_list_lock);
768 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
769 oa->o_valid |= OBD_MD_FLFLAGS;
772 oa->o_flags |= OBD_FL_SHRINK_GRANT;
773 osc_update_next_shrink(cli);
776 /* Shrink the current grant, either from some large amount to enough for a
777 * full set of in-flight RPCs, or if we have already shrunk to that limit
778 * then to enough for a single RPC. This avoids keeping more grant than
779 * needed, and avoids shrinking the grant piecemeal. */
780 static int osc_shrink_grant(struct client_obd *cli)
782 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
783 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
785 spin_lock(&cli->cl_loi_list_lock);
786 if (cli->cl_avail_grant <= target_bytes)
787 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
788 spin_unlock(&cli->cl_loi_list_lock);
790 return osc_shrink_grant_to_target(cli, target_bytes);
793 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
796 struct ost_body *body;
799 spin_lock(&cli->cl_loi_list_lock);
800 /* Don't shrink if we are already above or below the desired limit
801 * We don't want to shrink below a single RPC, as that will negatively
802 * impact block allocation and long-term performance. */
803 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
804 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
806 if (target_bytes >= cli->cl_avail_grant) {
807 spin_unlock(&cli->cl_loi_list_lock);
810 spin_unlock(&cli->cl_loi_list_lock);
816 osc_announce_cached(cli, &body->oa, 0);
818 spin_lock(&cli->cl_loi_list_lock);
819 if (target_bytes >= cli->cl_avail_grant) {
820 /* available grant has changed since target calculation */
821 spin_unlock(&cli->cl_loi_list_lock);
822 GOTO(out_free, rc = 0);
824 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
825 cli->cl_avail_grant = target_bytes;
826 spin_unlock(&cli->cl_loi_list_lock);
827 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
828 body->oa.o_valid |= OBD_MD_FLFLAGS;
829 body->oa.o_flags = 0;
831 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
832 osc_update_next_shrink(cli);
834 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
835 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
836 sizeof(*body), body, NULL);
838 __osc_update_grant(cli, body->oa.o_grant);
844 static int osc_should_shrink_grant(struct client_obd *client)
846 time64_t next_shrink = client->cl_next_shrink_grant;
848 if (client->cl_import == NULL)
851 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
852 OBD_CONNECT_GRANT_SHRINK) == 0)
855 if (ktime_get_seconds() >= next_shrink - 5) {
856 /* Get the current RPC size directly, instead of going via:
857 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
858 * Keep comment here so that it can be found by searching. */
859 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
861 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
862 client->cl_avail_grant > brw_size)
865 osc_update_next_shrink(client);
870 #define GRANT_SHRINK_RPC_BATCH 100
872 static struct delayed_work work;
874 static void osc_grant_work_handler(struct work_struct *data)
876 struct client_obd *cli;
878 bool init_next_shrink = true;
879 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
882 mutex_lock(&client_gtd.gtd_mutex);
883 list_for_each_entry(cli, &client_gtd.gtd_clients,
885 if (++rpc_sent < GRANT_SHRINK_RPC_BATCH &&
886 osc_should_shrink_grant(cli))
887 osc_shrink_grant(cli);
889 if (!init_next_shrink) {
890 if (cli->cl_next_shrink_grant < next_shrink &&
891 cli->cl_next_shrink_grant > ktime_get_seconds())
892 next_shrink = cli->cl_next_shrink_grant;
894 init_next_shrink = false;
895 next_shrink = cli->cl_next_shrink_grant;
898 mutex_unlock(&client_gtd.gtd_mutex);
900 if (client_gtd.gtd_stopped == 1)
903 if (next_shrink > ktime_get_seconds())
904 schedule_delayed_work(&work, msecs_to_jiffies(
905 (next_shrink - ktime_get_seconds()) *
908 schedule_work(&work.work);
911 void osc_schedule_grant_work(void)
913 cancel_delayed_work_sync(&work);
914 schedule_work(&work.work);
918 * Start grant thread for returing grant to server for idle clients.
920 static int osc_start_grant_work(void)
922 client_gtd.gtd_stopped = 0;
923 mutex_init(&client_gtd.gtd_mutex);
924 INIT_LIST_HEAD(&client_gtd.gtd_clients);
926 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
927 schedule_work(&work.work);
932 static void osc_stop_grant_work(void)
934 client_gtd.gtd_stopped = 1;
935 cancel_delayed_work_sync(&work);
938 static void osc_add_grant_list(struct client_obd *client)
940 mutex_lock(&client_gtd.gtd_mutex);
941 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
942 mutex_unlock(&client_gtd.gtd_mutex);
945 static void osc_del_grant_list(struct client_obd *client)
947 if (list_empty(&client->cl_grant_chain))
950 mutex_lock(&client_gtd.gtd_mutex);
951 list_del_init(&client->cl_grant_chain);
952 mutex_unlock(&client_gtd.gtd_mutex);
955 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
958 * ocd_grant is the total grant amount we're expect to hold: if we've
959 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
960 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
963 * race is tolerable here: if we're evicted, but imp_state already
964 * left EVICTED state, then cl_dirty_pages must be 0 already.
966 spin_lock(&cli->cl_loi_list_lock);
967 cli->cl_avail_grant = ocd->ocd_grant;
968 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
969 cli->cl_avail_grant -= cli->cl_reserved_grant;
970 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
971 cli->cl_avail_grant -= cli->cl_dirty_grant;
973 cli->cl_avail_grant -=
974 cli->cl_dirty_pages << PAGE_SHIFT;
977 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
981 /* overhead for each extent insertion */
982 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
983 /* determine the appropriate chunk size used by osc_extent. */
984 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
985 ocd->ocd_grant_blkbits);
986 /* max_pages_per_rpc must be chunk aligned */
987 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
988 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
989 ~chunk_mask) & chunk_mask;
990 /* determine maximum extent size, in #pages */
991 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
992 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
993 if (cli->cl_max_extent_pages == 0)
994 cli->cl_max_extent_pages = 1;
996 cli->cl_grant_extent_tax = 0;
997 cli->cl_chunkbits = PAGE_SHIFT;
998 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1000 spin_unlock(&cli->cl_loi_list_lock);
1002 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1003 "chunk bits: %d cl_max_extent_pages: %d\n",
1005 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1006 cli->cl_max_extent_pages);
1008 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1009 osc_add_grant_list(cli);
1011 EXPORT_SYMBOL(osc_init_grant);
1013 /* We assume that the reason this OSC got a short read is because it read
1014 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1015 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1016 * this stripe never got written at or beyond this stripe offset yet. */
1017 static void handle_short_read(int nob_read, size_t page_count,
1018 struct brw_page **pga)
1023 /* skip bytes read OK */
1024 while (nob_read > 0) {
1025 LASSERT (page_count > 0);
1027 if (pga[i]->count > nob_read) {
1028 /* EOF inside this page */
1029 ptr = kmap(pga[i]->pg) +
1030 (pga[i]->off & ~PAGE_MASK);
1031 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1038 nob_read -= pga[i]->count;
1043 /* zero remaining pages */
1044 while (page_count-- > 0) {
1045 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1046 memset(ptr, 0, pga[i]->count);
1052 static int check_write_rcs(struct ptlrpc_request *req,
1053 int requested_nob, int niocount,
1054 size_t page_count, struct brw_page **pga)
1059 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1060 sizeof(*remote_rcs) *
1062 if (remote_rcs == NULL) {
1063 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1067 /* return error if any niobuf was in error */
1068 for (i = 0; i < niocount; i++) {
1069 if ((int)remote_rcs[i] < 0)
1070 return(remote_rcs[i]);
1072 if (remote_rcs[i] != 0) {
1073 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1074 i, remote_rcs[i], req);
1078 if (req->rq_bulk != NULL &&
1079 req->rq_bulk->bd_nob_transferred != requested_nob) {
1080 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1081 req->rq_bulk->bd_nob_transferred, requested_nob);
1088 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1090 if (p1->flag != p2->flag) {
1091 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1092 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1093 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1095 /* warn if we try to combine flags that we don't know to be
1096 * safe to combine */
1097 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1098 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1099 "report this at https://jira.whamcloud.com/\n",
1100 p1->flag, p2->flag);
1105 return (p1->off + p1->count == p2->off);
1108 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1109 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1110 size_t pg_count, struct brw_page **pga,
1111 int opc, obd_dif_csum_fn *fn,
1115 struct ahash_request *req;
1116 /* Used Adler as the default checksum type on top of DIF tags */
1117 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1118 struct page *__page;
1119 unsigned char *buffer;
1121 unsigned int bufsize;
1123 int used_number = 0;
1129 LASSERT(pg_count > 0);
1131 __page = alloc_page(GFP_KERNEL);
1135 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1138 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1139 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1143 buffer = kmap(__page);
1144 guard_start = (__u16 *)buffer;
1145 guard_number = PAGE_SIZE / sizeof(*guard_start);
1146 while (nob > 0 && pg_count > 0) {
1147 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1149 /* corrupt the data before we compute the checksum, to
1150 * simulate an OST->client data error */
1151 if (unlikely(i == 0 && opc == OST_READ &&
1152 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1153 unsigned char *ptr = kmap(pga[i]->pg);
1154 int off = pga[i]->off & ~PAGE_MASK;
1156 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1161 * The left guard number should be able to hold checksums of a
1164 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1165 pga[i]->off & ~PAGE_MASK,
1167 guard_start + used_number,
1168 guard_number - used_number,
1174 used_number += used;
1175 if (used_number == guard_number) {
1176 cfs_crypto_hash_update_page(req, __page, 0,
1177 used_number * sizeof(*guard_start));
1181 nob -= pga[i]->count;
1189 if (used_number != 0)
1190 cfs_crypto_hash_update_page(req, __page, 0,
1191 used_number * sizeof(*guard_start));
1193 bufsize = sizeof(cksum);
1194 cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1196 /* For sending we only compute the wrong checksum instead
1197 * of corrupting the data so it is still correct on a redo */
1198 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1203 __free_page(__page);
1206 #else /* !CONFIG_CRC_T10DIF */
1207 #define obd_dif_ip_fn NULL
1208 #define obd_dif_crc_fn NULL
1209 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \
1211 #endif /* CONFIG_CRC_T10DIF */
1213 static int osc_checksum_bulk(int nob, size_t pg_count,
1214 struct brw_page **pga, int opc,
1215 enum cksum_types cksum_type,
1219 struct ahash_request *req;
1220 unsigned int bufsize;
1221 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1223 LASSERT(pg_count > 0);
1225 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1227 CERROR("Unable to initialize checksum hash %s\n",
1228 cfs_crypto_hash_name(cfs_alg));
1229 return PTR_ERR(req);
1232 while (nob > 0 && pg_count > 0) {
1233 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1235 /* corrupt the data before we compute the checksum, to
1236 * simulate an OST->client data error */
1237 if (i == 0 && opc == OST_READ &&
1238 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1239 unsigned char *ptr = kmap(pga[i]->pg);
1240 int off = pga[i]->off & ~PAGE_MASK;
1242 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1245 cfs_crypto_hash_update_page(req, pga[i]->pg,
1246 pga[i]->off & ~PAGE_MASK,
1248 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1249 (int)(pga[i]->off & ~PAGE_MASK));
1251 nob -= pga[i]->count;
1256 bufsize = sizeof(*cksum);
1257 cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1259 /* For sending we only compute the wrong checksum instead
1260 * of corrupting the data so it is still correct on a redo */
1261 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1267 static int osc_checksum_bulk_rw(const char *obd_name,
1268 enum cksum_types cksum_type,
1269 int nob, size_t pg_count,
1270 struct brw_page **pga, int opc,
1273 obd_dif_csum_fn *fn = NULL;
1274 int sector_size = 0;
1278 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1281 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1282 opc, fn, sector_size, check_sum);
1284 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1291 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1292 u32 page_count, struct brw_page **pga,
1293 struct ptlrpc_request **reqp, int resend)
1295 struct ptlrpc_request *req;
1296 struct ptlrpc_bulk_desc *desc;
1297 struct ost_body *body;
1298 struct obd_ioobj *ioobj;
1299 struct niobuf_remote *niobuf;
1300 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1301 struct osc_brw_async_args *aa;
1302 struct req_capsule *pill;
1303 struct brw_page *pg_prev;
1305 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1308 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1309 RETURN(-ENOMEM); /* Recoverable */
1310 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1311 RETURN(-EINVAL); /* Fatal */
1313 if ((cmd & OBD_BRW_WRITE) != 0) {
1315 req = ptlrpc_request_alloc_pool(cli->cl_import,
1317 &RQF_OST_BRW_WRITE);
1320 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1325 for (niocount = i = 1; i < page_count; i++) {
1326 if (!can_merge_pages(pga[i - 1], pga[i]))
1330 pill = &req->rq_pill;
1331 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1333 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1334 niocount * sizeof(*niobuf));
1336 for (i = 0; i < page_count; i++)
1337 short_io_size += pga[i]->count;
1339 /* Check if read/write is small enough to be a short io. */
1340 if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1341 !imp_connect_shortio(cli->cl_import))
1344 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1345 opc == OST_READ ? 0 : short_io_size);
1346 if (opc == OST_READ)
1347 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1350 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1352 ptlrpc_request_free(req);
1355 osc_set_io_portal(req);
1357 ptlrpc_at_set_req_timeout(req);
1358 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1360 req->rq_no_retry_einprogress = 1;
1362 if (short_io_size != 0) {
1364 short_io_buf = NULL;
1368 desc = ptlrpc_prep_bulk_imp(req, page_count,
1369 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1370 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1371 PTLRPC_BULK_PUT_SINK) |
1372 PTLRPC_BULK_BUF_KIOV,
1374 &ptlrpc_bulk_kiov_pin_ops);
1377 GOTO(out, rc = -ENOMEM);
1378 /* NB request now owns desc and will free it when it gets freed */
1380 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1381 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1382 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1383 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1385 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1387 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1388 * and from_kgid(), because they are asynchronous. Fortunately, variable
1389 * oa contains valid o_uid and o_gid in these two operations.
1390 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1391 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1392 * other process logic */
1393 body->oa.o_uid = oa->o_uid;
1394 body->oa.o_gid = oa->o_gid;
1396 obdo_to_ioobj(oa, ioobj);
1397 ioobj->ioo_bufcnt = niocount;
1398 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1399 * that might be send for this request. The actual number is decided
1400 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1401 * "max - 1" for old client compatibility sending "0", and also so the
1402 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1404 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1406 ioobj_max_brw_set(ioobj, 0);
1408 if (short_io_size != 0) {
1409 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1410 body->oa.o_valid |= OBD_MD_FLFLAGS;
1411 body->oa.o_flags = 0;
1413 body->oa.o_flags |= OBD_FL_SHORT_IO;
1414 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1416 if (opc == OST_WRITE) {
1417 short_io_buf = req_capsule_client_get(pill,
1419 LASSERT(short_io_buf != NULL);
1423 LASSERT(page_count > 0);
1425 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1426 struct brw_page *pg = pga[i];
1427 int poff = pg->off & ~PAGE_MASK;
1429 LASSERT(pg->count > 0);
1430 /* make sure there is no gap in the middle of page array */
1431 LASSERTF(page_count == 1 ||
1432 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1433 ergo(i > 0 && i < page_count - 1,
1434 poff == 0 && pg->count == PAGE_SIZE) &&
1435 ergo(i == page_count - 1, poff == 0)),
1436 "i: %d/%d pg: %p off: %llu, count: %u\n",
1437 i, page_count, pg, pg->off, pg->count);
1438 LASSERTF(i == 0 || pg->off > pg_prev->off,
1439 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1440 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1442 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1443 pg_prev->pg, page_private(pg_prev->pg),
1444 pg_prev->pg->index, pg_prev->off);
1445 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1446 (pg->flag & OBD_BRW_SRVLOCK));
1447 if (short_io_size != 0 && opc == OST_WRITE) {
1448 unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1450 LASSERT(short_io_size >= requested_nob + pg->count);
1451 memcpy(short_io_buf + requested_nob,
1454 ll_kunmap_atomic(ptr, KM_USER0);
1455 } else if (short_io_size == 0) {
1456 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1459 requested_nob += pg->count;
1461 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1463 niobuf->rnb_len += pg->count;
1465 niobuf->rnb_offset = pg->off;
1466 niobuf->rnb_len = pg->count;
1467 niobuf->rnb_flags = pg->flag;
1472 LASSERTF((void *)(niobuf - niocount) ==
1473 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1474 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1475 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1477 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1479 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1480 body->oa.o_valid |= OBD_MD_FLFLAGS;
1481 body->oa.o_flags = 0;
1483 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1486 if (osc_should_shrink_grant(cli))
1487 osc_shrink_grant_local(cli, &body->oa);
1489 /* size[REQ_REC_OFF] still sizeof (*body) */
1490 if (opc == OST_WRITE) {
1491 if (cli->cl_checksum &&
1492 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1493 /* store cl_cksum_type in a local variable since
1494 * it can be changed via lprocfs */
1495 enum cksum_types cksum_type = cli->cl_cksum_type;
1497 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1498 body->oa.o_flags = 0;
1500 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1502 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1504 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1505 requested_nob, page_count,
1509 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1513 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1516 /* save this in 'oa', too, for later checking */
1517 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1518 oa->o_flags |= obd_cksum_type_pack(obd_name,
1521 /* clear out the checksum flag, in case this is a
1522 * resend but cl_checksum is no longer set. b=11238 */
1523 oa->o_valid &= ~OBD_MD_FLCKSUM;
1525 oa->o_cksum = body->oa.o_cksum;
1526 /* 1 RC per niobuf */
1527 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1528 sizeof(__u32) * niocount);
1530 if (cli->cl_checksum &&
1531 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1532 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1533 body->oa.o_flags = 0;
1534 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1535 cli->cl_cksum_type);
1536 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1539 /* Client cksum has been already copied to wire obdo in previous
1540 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1541 * resent due to cksum error, this will allow Server to
1542 * check+dump pages on its side */
1544 ptlrpc_request_set_replen(req);
1546 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1547 aa = ptlrpc_req_async_args(req);
1549 aa->aa_requested_nob = requested_nob;
1550 aa->aa_nio_count = niocount;
1551 aa->aa_page_count = page_count;
1555 INIT_LIST_HEAD(&aa->aa_oaps);
1558 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1559 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1560 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1561 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1565 ptlrpc_req_finished(req);
1569 char dbgcksum_file_name[PATH_MAX];
1571 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1572 struct brw_page **pga, __u32 server_cksum,
1580 /* will only keep dump of pages on first error for the same range in
1581 * file/fid, not during the resends/retries. */
1582 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1583 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1584 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1585 libcfs_debug_file_path_arr :
1586 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1587 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1588 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1589 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1591 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1592 client_cksum, server_cksum);
1593 filp = filp_open(dbgcksum_file_name,
1594 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1598 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1599 "checksum error: rc = %d\n", dbgcksum_file_name,
1602 CERROR("%s: can't open to dump pages with checksum "
1603 "error: rc = %d\n", dbgcksum_file_name, rc);
1607 for (i = 0; i < page_count; i++) {
1608 len = pga[i]->count;
1609 buf = kmap(pga[i]->pg);
1611 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1613 CERROR("%s: wanted to write %u but got %d "
1614 "error\n", dbgcksum_file_name, len, rc);
1619 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1620 dbgcksum_file_name, rc);
1625 rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1627 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1628 filp_close(filp, NULL);
1633 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1634 __u32 client_cksum, __u32 server_cksum,
1635 struct osc_brw_async_args *aa)
1637 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1638 enum cksum_types cksum_type;
1639 obd_dif_csum_fn *fn = NULL;
1640 int sector_size = 0;
1645 if (server_cksum == client_cksum) {
1646 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1650 if (aa->aa_cli->cl_checksum_dump)
1651 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1652 server_cksum, client_cksum);
1654 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1657 switch (cksum_type) {
1658 case OBD_CKSUM_T10IP512:
1662 case OBD_CKSUM_T10IP4K:
1666 case OBD_CKSUM_T10CRC512:
1667 fn = obd_dif_crc_fn;
1670 case OBD_CKSUM_T10CRC4K:
1671 fn = obd_dif_crc_fn;
1679 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1680 aa->aa_page_count, aa->aa_ppga,
1681 OST_WRITE, fn, sector_size,
1684 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1685 aa->aa_ppga, OST_WRITE, cksum_type,
1689 msg = "failed to calculate the client write checksum";
1690 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1691 msg = "the server did not use the checksum type specified in "
1692 "the original request - likely a protocol problem";
1693 else if (new_cksum == server_cksum)
1694 msg = "changed on the client after we checksummed it - "
1695 "likely false positive due to mmap IO (bug 11742)";
1696 else if (new_cksum == client_cksum)
1697 msg = "changed in transit before arrival at OST";
1699 msg = "changed in transit AND doesn't match the original - "
1700 "likely false positive due to mmap IO (bug 11742)";
1702 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1703 DFID " object "DOSTID" extent [%llu-%llu], original "
1704 "client csum %x (type %x), server csum %x (type %x),"
1705 " client csum now %x\n",
1706 obd_name, msg, libcfs_nid2str(peer->nid),
1707 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1708 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1709 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1710 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1711 aa->aa_ppga[aa->aa_page_count - 1]->off +
1712 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1714 obd_cksum_type_unpack(aa->aa_oa->o_flags),
1715 server_cksum, cksum_type, new_cksum);
1719 /* Note rc enters this function as number of bytes transferred */
1720 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1722 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1723 struct client_obd *cli = aa->aa_cli;
1724 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1725 const struct lnet_process_id *peer =
1726 &req->rq_import->imp_connection->c_peer;
1727 struct ost_body *body;
1728 u32 client_cksum = 0;
1731 if (rc < 0 && rc != -EDQUOT) {
1732 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1736 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1737 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1739 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1743 /* set/clear over quota flag for a uid/gid/projid */
1744 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1745 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1746 unsigned qid[LL_MAXQUOTAS] = {
1747 body->oa.o_uid, body->oa.o_gid,
1748 body->oa.o_projid };
1749 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1750 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1751 body->oa.o_valid, body->oa.o_flags);
1752 osc_quota_setdq(cli, qid, body->oa.o_valid,
1756 osc_update_grant(cli, body);
1761 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1762 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1764 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1766 CERROR("Unexpected +ve rc %d\n", rc);
1770 if (req->rq_bulk != NULL &&
1771 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1774 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1775 check_write_checksum(&body->oa, peer, client_cksum,
1776 body->oa.o_cksum, aa))
1779 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1780 aa->aa_page_count, aa->aa_ppga);
1784 /* The rest of this function executes only for OST_READs */
1786 if (req->rq_bulk == NULL) {
1787 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1789 LASSERT(rc == req->rq_status);
1791 /* if unwrap_bulk failed, return -EAGAIN to retry */
1792 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1795 GOTO(out, rc = -EAGAIN);
1797 if (rc > aa->aa_requested_nob) {
1798 CERROR("Unexpected rc %d (%d requested)\n", rc,
1799 aa->aa_requested_nob);
1803 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1804 CERROR ("Unexpected rc %d (%d transferred)\n",
1805 rc, req->rq_bulk->bd_nob_transferred);
1809 if (req->rq_bulk == NULL) {
1811 int nob, pg_count, i = 0;
1814 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1815 pg_count = aa->aa_page_count;
1816 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1819 while (nob > 0 && pg_count > 0) {
1821 int count = aa->aa_ppga[i]->count > nob ?
1822 nob : aa->aa_ppga[i]->count;
1824 CDEBUG(D_CACHE, "page %p count %d\n",
1825 aa->aa_ppga[i]->pg, count);
1826 ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1827 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1829 ll_kunmap_atomic((void *) ptr, KM_USER0);
1838 if (rc < aa->aa_requested_nob)
1839 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1841 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1842 static int cksum_counter;
1843 u32 server_cksum = body->oa.o_cksum;
1846 enum cksum_types cksum_type;
1847 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1848 body->oa.o_flags : 0;
1850 cksum_type = obd_cksum_type_unpack(o_flags);
1851 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1852 aa->aa_page_count, aa->aa_ppga,
1853 OST_READ, &client_cksum);
1857 if (req->rq_bulk != NULL &&
1858 peer->nid != req->rq_bulk->bd_sender) {
1860 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1863 if (server_cksum != client_cksum) {
1864 struct ost_body *clbody;
1865 u32 page_count = aa->aa_page_count;
1867 clbody = req_capsule_client_get(&req->rq_pill,
1869 if (cli->cl_checksum_dump)
1870 dump_all_bulk_pages(&clbody->oa, page_count,
1871 aa->aa_ppga, server_cksum,
1874 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1875 "%s%s%s inode "DFID" object "DOSTID
1876 " extent [%llu-%llu], client %x, "
1877 "server %x, cksum_type %x\n",
1879 libcfs_nid2str(peer->nid),
1881 clbody->oa.o_valid & OBD_MD_FLFID ?
1882 clbody->oa.o_parent_seq : 0ULL,
1883 clbody->oa.o_valid & OBD_MD_FLFID ?
1884 clbody->oa.o_parent_oid : 0,
1885 clbody->oa.o_valid & OBD_MD_FLFID ?
1886 clbody->oa.o_parent_ver : 0,
1887 POSTID(&body->oa.o_oi),
1888 aa->aa_ppga[0]->off,
1889 aa->aa_ppga[page_count-1]->off +
1890 aa->aa_ppga[page_count-1]->count - 1,
1891 client_cksum, server_cksum,
1894 aa->aa_oa->o_cksum = client_cksum;
1898 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1901 } else if (unlikely(client_cksum)) {
1902 static int cksum_missed;
1905 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1906 CERROR("Checksum %u requested from %s but not sent\n",
1907 cksum_missed, libcfs_nid2str(peer->nid));
1913 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1914 aa->aa_oa, &body->oa);
1919 static int osc_brw_redo_request(struct ptlrpc_request *request,
1920 struct osc_brw_async_args *aa, int rc)
1922 struct ptlrpc_request *new_req;
1923 struct osc_brw_async_args *new_aa;
1924 struct osc_async_page *oap;
1927 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1928 "redo for recoverable error %d", rc);
1930 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1931 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1932 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1933 aa->aa_ppga, &new_req, 1);
1937 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1938 if (oap->oap_request != NULL) {
1939 LASSERTF(request == oap->oap_request,
1940 "request %p != oap_request %p\n",
1941 request, oap->oap_request);
1942 if (oap->oap_interrupted) {
1943 ptlrpc_req_finished(new_req);
1949 * New request takes over pga and oaps from old request.
1950 * Note that copying a list_head doesn't work, need to move it...
1953 new_req->rq_interpret_reply = request->rq_interpret_reply;
1954 new_req->rq_async_args = request->rq_async_args;
1955 new_req->rq_commit_cb = request->rq_commit_cb;
1956 /* cap resend delay to the current request timeout, this is similar to
1957 * what ptlrpc does (see after_reply()) */
1958 if (aa->aa_resends > new_req->rq_timeout)
1959 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1961 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1962 new_req->rq_generation_set = 1;
1963 new_req->rq_import_generation = request->rq_import_generation;
1965 new_aa = ptlrpc_req_async_args(new_req);
1967 INIT_LIST_HEAD(&new_aa->aa_oaps);
1968 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1969 INIT_LIST_HEAD(&new_aa->aa_exts);
1970 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1971 new_aa->aa_resends = aa->aa_resends;
1973 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1974 if (oap->oap_request) {
1975 ptlrpc_req_finished(oap->oap_request);
1976 oap->oap_request = ptlrpc_request_addref(new_req);
1980 /* XXX: This code will run into problem if we're going to support
1981 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1982 * and wait for all of them to be finished. We should inherit request
1983 * set from old request. */
1984 ptlrpcd_add_req(new_req);
1986 DEBUG_REQ(D_INFO, new_req, "new request");
1991 * ugh, we want disk allocation on the target to happen in offset order. we'll
1992 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1993 * fine for our small page arrays and doesn't require allocation. its an
1994 * insertion sort that swaps elements that are strides apart, shrinking the
1995 * stride down until its '1' and the array is sorted.
1997 static void sort_brw_pages(struct brw_page **array, int num)
2000 struct brw_page *tmp;
2004 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2009 for (i = stride ; i < num ; i++) {
2012 while (j >= stride && array[j - stride]->off > tmp->off) {
2013 array[j] = array[j - stride];
2018 } while (stride > 1);
2021 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2023 LASSERT(ppga != NULL);
2024 OBD_FREE(ppga, sizeof(*ppga) * count);
2027 static int brw_interpret(const struct lu_env *env,
2028 struct ptlrpc_request *req, void *args, int rc)
2030 struct osc_brw_async_args *aa = args;
2031 struct osc_extent *ext;
2032 struct osc_extent *tmp;
2033 struct client_obd *cli = aa->aa_cli;
2034 unsigned long transferred = 0;
2038 rc = osc_brw_fini_request(req, rc);
2039 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2041 * When server returns -EINPROGRESS, client should always retry
2042 * regardless of the number of times the bulk was resent already.
2044 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2045 if (req->rq_import_generation !=
2046 req->rq_import->imp_generation) {
2047 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2048 ""DOSTID", rc = %d.\n",
2049 req->rq_import->imp_obd->obd_name,
2050 POSTID(&aa->aa_oa->o_oi), rc);
2051 } else if (rc == -EINPROGRESS ||
2052 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2053 rc = osc_brw_redo_request(req, aa, rc);
2055 CERROR("%s: too many resent retries for object: "
2056 "%llu:%llu, rc = %d.\n",
2057 req->rq_import->imp_obd->obd_name,
2058 POSTID(&aa->aa_oa->o_oi), rc);
2063 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2068 struct obdo *oa = aa->aa_oa;
2069 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2070 unsigned long valid = 0;
2071 struct cl_object *obj;
2072 struct osc_async_page *last;
2074 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2075 obj = osc2cl(last->oap_obj);
2077 cl_object_attr_lock(obj);
2078 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2079 attr->cat_blocks = oa->o_blocks;
2080 valid |= CAT_BLOCKS;
2082 if (oa->o_valid & OBD_MD_FLMTIME) {
2083 attr->cat_mtime = oa->o_mtime;
2086 if (oa->o_valid & OBD_MD_FLATIME) {
2087 attr->cat_atime = oa->o_atime;
2090 if (oa->o_valid & OBD_MD_FLCTIME) {
2091 attr->cat_ctime = oa->o_ctime;
2095 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2096 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2097 loff_t last_off = last->oap_count + last->oap_obj_off +
2100 /* Change file size if this is an out of quota or
2101 * direct IO write and it extends the file size */
2102 if (loi->loi_lvb.lvb_size < last_off) {
2103 attr->cat_size = last_off;
2106 /* Extend KMS if it's not a lockless write */
2107 if (loi->loi_kms < last_off &&
2108 oap2osc_page(last)->ops_srvlock == 0) {
2109 attr->cat_kms = last_off;
2115 cl_object_attr_update(env, obj, attr, valid);
2116 cl_object_attr_unlock(obj);
2118 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2120 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2121 osc_inc_unstable_pages(req);
2123 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2124 list_del_init(&ext->oe_link);
2125 osc_extent_finish(env, ext, 1,
2126 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2128 LASSERT(list_empty(&aa->aa_exts));
2129 LASSERT(list_empty(&aa->aa_oaps));
2131 transferred = (req->rq_bulk == NULL ? /* short io */
2132 aa->aa_requested_nob :
2133 req->rq_bulk->bd_nob_transferred);
2135 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2136 ptlrpc_lprocfs_brw(req, transferred);
2138 spin_lock(&cli->cl_loi_list_lock);
2139 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2140 * is called so we know whether to go to sync BRWs or wait for more
2141 * RPCs to complete */
2142 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2143 cli->cl_w_in_flight--;
2145 cli->cl_r_in_flight--;
2146 osc_wake_cache_waiters(cli);
2147 spin_unlock(&cli->cl_loi_list_lock);
2149 osc_io_unplug(env, cli, NULL);
2153 static void brw_commit(struct ptlrpc_request *req)
2155 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2156 * this called via the rq_commit_cb, I need to ensure
2157 * osc_dec_unstable_pages is still called. Otherwise unstable
2158 * pages may be leaked. */
2159 spin_lock(&req->rq_lock);
2160 if (likely(req->rq_unstable)) {
2161 req->rq_unstable = 0;
2162 spin_unlock(&req->rq_lock);
2164 osc_dec_unstable_pages(req);
2166 req->rq_committed = 1;
2167 spin_unlock(&req->rq_lock);
2172 * Build an RPC by the list of extent @ext_list. The caller must ensure
2173 * that the total pages in this list are NOT over max pages per RPC.
2174 * Extents in the list must be in OES_RPC state.
2176 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2177 struct list_head *ext_list, int cmd)
2179 struct ptlrpc_request *req = NULL;
2180 struct osc_extent *ext;
2181 struct brw_page **pga = NULL;
2182 struct osc_brw_async_args *aa = NULL;
2183 struct obdo *oa = NULL;
2184 struct osc_async_page *oap;
2185 struct osc_object *obj = NULL;
2186 struct cl_req_attr *crattr = NULL;
2187 loff_t starting_offset = OBD_OBJECT_EOF;
2188 loff_t ending_offset = 0;
2192 bool soft_sync = false;
2193 bool interrupted = false;
2194 bool ndelay = false;
2198 __u32 layout_version = 0;
2199 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
2200 struct ost_body *body;
2202 LASSERT(!list_empty(ext_list));
2204 /* add pages into rpc_list to build BRW rpc */
2205 list_for_each_entry(ext, ext_list, oe_link) {
2206 LASSERT(ext->oe_state == OES_RPC);
2207 mem_tight |= ext->oe_memalloc;
2208 grant += ext->oe_grants;
2209 page_count += ext->oe_nr_pages;
2210 layout_version = MAX(layout_version, ext->oe_layout_version);
2215 soft_sync = osc_over_unstable_soft_limit(cli);
2217 mpflag = cfs_memory_pressure_get_and_set();
2219 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2221 GOTO(out, rc = -ENOMEM);
2223 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2225 GOTO(out, rc = -ENOMEM);
2228 list_for_each_entry(ext, ext_list, oe_link) {
2229 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2231 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2233 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2234 pga[i] = &oap->oap_brw_page;
2235 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2238 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2239 if (starting_offset == OBD_OBJECT_EOF ||
2240 starting_offset > oap->oap_obj_off)
2241 starting_offset = oap->oap_obj_off;
2243 LASSERT(oap->oap_page_off == 0);
2244 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2245 ending_offset = oap->oap_obj_off +
2248 LASSERT(oap->oap_page_off + oap->oap_count ==
2250 if (oap->oap_interrupted)
2257 /* first page in the list */
2258 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2260 crattr = &osc_env_info(env)->oti_req_attr;
2261 memset(crattr, 0, sizeof(*crattr));
2262 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2263 crattr->cra_flags = ~0ULL;
2264 crattr->cra_page = oap2cl_page(oap);
2265 crattr->cra_oa = oa;
2266 cl_req_attr_set(env, osc2cl(obj), crattr);
2268 if (cmd == OBD_BRW_WRITE) {
2269 oa->o_grant_used = grant;
2270 if (layout_version > 0) {
2271 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2272 PFID(&oa->o_oi.oi_fid), layout_version);
2274 oa->o_layout_version = layout_version;
2275 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2279 sort_brw_pages(pga, page_count);
2280 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2282 CERROR("prep_req failed: %d\n", rc);
2286 req->rq_commit_cb = brw_commit;
2287 req->rq_interpret_reply = brw_interpret;
2288 req->rq_memalloc = mem_tight != 0;
2289 oap->oap_request = ptlrpc_request_addref(req);
2290 if (interrupted && !req->rq_intr)
2291 ptlrpc_mark_interrupted(req);
2293 req->rq_no_resend = req->rq_no_delay = 1;
2294 /* probably set a shorter timeout value.
2295 * to handle ETIMEDOUT in brw_interpret() correctly. */
2296 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2299 /* Need to update the timestamps after the request is built in case
2300 * we race with setattr (locally or in queue at OST). If OST gets
2301 * later setattr before earlier BRW (as determined by the request xid),
2302 * the OST will not use BRW timestamps. Sadly, there is no obvious
2303 * way to do this in a single call. bug 10150 */
2304 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2305 crattr->cra_oa = &body->oa;
2306 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2307 cl_req_attr_set(env, osc2cl(obj), crattr);
2308 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2310 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2311 aa = ptlrpc_req_async_args(req);
2312 INIT_LIST_HEAD(&aa->aa_oaps);
2313 list_splice_init(&rpc_list, &aa->aa_oaps);
2314 INIT_LIST_HEAD(&aa->aa_exts);
2315 list_splice_init(ext_list, &aa->aa_exts);
2317 spin_lock(&cli->cl_loi_list_lock);
2318 starting_offset >>= PAGE_SHIFT;
2319 if (cmd == OBD_BRW_READ) {
2320 cli->cl_r_in_flight++;
2321 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2322 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2323 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2324 starting_offset + 1);
2326 cli->cl_w_in_flight++;
2327 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2328 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2329 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2330 starting_offset + 1);
2332 spin_unlock(&cli->cl_loi_list_lock);
2334 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2335 page_count, aa, cli->cl_r_in_flight,
2336 cli->cl_w_in_flight);
2337 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2339 ptlrpcd_add_req(req);
2345 cfs_memory_pressure_restore(mpflag);
2348 LASSERT(req == NULL);
2351 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2353 OBD_FREE(pga, sizeof(*pga) * page_count);
2354 /* this should happen rarely and is pretty bad, it makes the
2355 * pending list not follow the dirty order */
2356 while (!list_empty(ext_list)) {
2357 ext = list_entry(ext_list->next, struct osc_extent,
2359 list_del_init(&ext->oe_link);
2360 osc_extent_finish(env, ext, 0, rc);
2366 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2370 LASSERT(lock != NULL);
2372 lock_res_and_lock(lock);
2374 if (lock->l_ast_data == NULL)
2375 lock->l_ast_data = data;
2376 if (lock->l_ast_data == data)
2379 unlock_res_and_lock(lock);
2384 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2385 void *cookie, struct lustre_handle *lockh,
2386 enum ldlm_mode mode, __u64 *flags, bool speculative,
2389 bool intent = *flags & LDLM_FL_HAS_INTENT;
2393 /* The request was created before ldlm_cli_enqueue call. */
2394 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2395 struct ldlm_reply *rep;
2397 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2398 LASSERT(rep != NULL);
2400 rep->lock_policy_res1 =
2401 ptlrpc_status_ntoh(rep->lock_policy_res1);
2402 if (rep->lock_policy_res1)
2403 errcode = rep->lock_policy_res1;
2405 *flags |= LDLM_FL_LVB_READY;
2406 } else if (errcode == ELDLM_OK) {
2407 *flags |= LDLM_FL_LVB_READY;
2410 /* Call the update callback. */
2411 rc = (*upcall)(cookie, lockh, errcode);
2413 /* release the reference taken in ldlm_cli_enqueue() */
2414 if (errcode == ELDLM_LOCK_MATCHED)
2416 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2417 ldlm_lock_decref(lockh, mode);
2422 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2425 struct osc_enqueue_args *aa = args;
2426 struct ldlm_lock *lock;
2427 struct lustre_handle *lockh = &aa->oa_lockh;
2428 enum ldlm_mode mode = aa->oa_mode;
2429 struct ost_lvb *lvb = aa->oa_lvb;
2430 __u32 lvb_len = sizeof(*lvb);
2435 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2437 lock = ldlm_handle2lock(lockh);
2438 LASSERTF(lock != NULL,
2439 "lockh %#llx, req %p, aa %p - client evicted?\n",
2440 lockh->cookie, req, aa);
2442 /* Take an additional reference so that a blocking AST that
2443 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2444 * to arrive after an upcall has been executed by
2445 * osc_enqueue_fini(). */
2446 ldlm_lock_addref(lockh, mode);
2448 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2449 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2451 /* Let CP AST to grant the lock first. */
2452 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2454 if (aa->oa_speculative) {
2455 LASSERT(aa->oa_lvb == NULL);
2456 LASSERT(aa->oa_flags == NULL);
2457 aa->oa_flags = &flags;
2460 /* Complete obtaining the lock procedure. */
2461 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2462 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2464 /* Complete osc stuff. */
2465 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2466 aa->oa_flags, aa->oa_speculative, rc);
2468 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2470 ldlm_lock_decref(lockh, mode);
2471 LDLM_LOCK_PUT(lock);
2475 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2477 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2478 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2479 * other synchronous requests, however keeping some locks and trying to obtain
2480 * others may take a considerable amount of time in a case of ost failure; and
2481 * when other sync requests do not get released lock from a client, the client
2482 * is evicted from the cluster -- such scenarious make the life difficult, so
2483 * release locks just after they are obtained. */
2484 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2485 __u64 *flags, union ldlm_policy_data *policy,
2486 struct ost_lvb *lvb, int kms_valid,
2487 osc_enqueue_upcall_f upcall, void *cookie,
2488 struct ldlm_enqueue_info *einfo,
2489 struct ptlrpc_request_set *rqset, int async,
2492 struct obd_device *obd = exp->exp_obd;
2493 struct lustre_handle lockh = { 0 };
2494 struct ptlrpc_request *req = NULL;
2495 int intent = *flags & LDLM_FL_HAS_INTENT;
2496 __u64 match_flags = *flags;
2497 enum ldlm_mode mode;
2501 /* Filesystem lock extents are extended to page boundaries so that
2502 * dealing with the page cache is a little smoother. */
2503 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2504 policy->l_extent.end |= ~PAGE_MASK;
2507 * kms is not valid when either object is completely fresh (so that no
2508 * locks are cached), or object was evicted. In the latter case cached
2509 * lock cannot be used, because it would prime inode state with
2510 * potentially stale LVB.
2515 /* Next, search for already existing extent locks that will cover us */
2516 /* If we're trying to read, we also search for an existing PW lock. The
2517 * VFS and page cache already protect us locally, so lots of readers/
2518 * writers can share a single PW lock.
2520 * There are problems with conversion deadlocks, so instead of
2521 * converting a read lock to a write lock, we'll just enqueue a new
2524 * At some point we should cancel the read lock instead of making them
2525 * send us a blocking callback, but there are problems with canceling
2526 * locks out from other users right now, too. */
2527 mode = einfo->ei_mode;
2528 if (einfo->ei_mode == LCK_PR)
2530 /* Normal lock requests must wait for the LVB to be ready before
2531 * matching a lock; speculative lock requests do not need to,
2532 * because they will not actually use the lock. */
2534 match_flags |= LDLM_FL_LVB_READY;
2536 match_flags |= LDLM_FL_BLOCK_GRANTED;
2537 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2538 einfo->ei_type, policy, mode, &lockh, 0);
2540 struct ldlm_lock *matched;
2542 if (*flags & LDLM_FL_TEST_LOCK)
2545 matched = ldlm_handle2lock(&lockh);
2547 /* This DLM lock request is speculative, and does not
2548 * have an associated IO request. Therefore if there
2549 * is already a DLM lock, it wll just inform the
2550 * caller to cancel the request for this stripe.*/
2551 lock_res_and_lock(matched);
2552 if (ldlm_extent_equal(&policy->l_extent,
2553 &matched->l_policy_data.l_extent))
2557 unlock_res_and_lock(matched);
2559 ldlm_lock_decref(&lockh, mode);
2560 LDLM_LOCK_PUT(matched);
2562 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2563 *flags |= LDLM_FL_LVB_READY;
2565 /* We already have a lock, and it's referenced. */
2566 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2568 ldlm_lock_decref(&lockh, mode);
2569 LDLM_LOCK_PUT(matched);
2572 ldlm_lock_decref(&lockh, mode);
2573 LDLM_LOCK_PUT(matched);
2578 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2582 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2583 &RQF_LDLM_ENQUEUE_LVB);
2587 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2589 ptlrpc_request_free(req);
2593 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2595 ptlrpc_request_set_replen(req);
2598 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2599 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2601 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2602 sizeof(*lvb), LVB_T_OST, &lockh, async);
2605 struct osc_enqueue_args *aa;
2606 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2607 aa = ptlrpc_req_async_args(req);
2609 aa->oa_mode = einfo->ei_mode;
2610 aa->oa_type = einfo->ei_type;
2611 lustre_handle_copy(&aa->oa_lockh, &lockh);
2612 aa->oa_upcall = upcall;
2613 aa->oa_cookie = cookie;
2614 aa->oa_speculative = speculative;
2616 aa->oa_flags = flags;
2619 /* speculative locks are essentially to enqueue
2620 * a DLM lock in advance, so we don't care
2621 * about the result of the enqueue. */
2623 aa->oa_flags = NULL;
2626 req->rq_interpret_reply = osc_enqueue_interpret;
2627 if (rqset == PTLRPCD_SET)
2628 ptlrpcd_add_req(req);
2630 ptlrpc_set_add_req(rqset, req);
2631 } else if (intent) {
2632 ptlrpc_req_finished(req);
2637 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2638 flags, speculative, rc);
2640 ptlrpc_req_finished(req);
2645 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2646 enum ldlm_type type, union ldlm_policy_data *policy,
2647 enum ldlm_mode mode, __u64 *flags, void *data,
2648 struct lustre_handle *lockh, int unref)
2650 struct obd_device *obd = exp->exp_obd;
2651 __u64 lflags = *flags;
2655 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2658 /* Filesystem lock extents are extended to page boundaries so that
2659 * dealing with the page cache is a little smoother */
2660 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2661 policy->l_extent.end |= ~PAGE_MASK;
2663 /* Next, search for already existing extent locks that will cover us */
2664 /* If we're trying to read, we also search for an existing PW lock. The
2665 * VFS and page cache already protect us locally, so lots of readers/
2666 * writers can share a single PW lock. */
2670 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2671 res_id, type, policy, rc, lockh, unref);
2672 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2676 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2678 LASSERT(lock != NULL);
2679 if (!osc_set_lock_data(lock, data)) {
2680 ldlm_lock_decref(lockh, rc);
2683 LDLM_LOCK_PUT(lock);
2688 static int osc_statfs_interpret(const struct lu_env *env,
2689 struct ptlrpc_request *req, void *args, int rc)
2691 struct osc_async_args *aa = args;
2692 struct obd_statfs *msfs;
2697 * The request has in fact never been sent due to issues at
2698 * a higher level (LOV). Exit immediately since the caller
2699 * is aware of the problem and takes care of the clean up.
2703 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2704 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2710 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2712 GOTO(out, rc = -EPROTO);
2714 *aa->aa_oi->oi_osfs = *msfs;
2716 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2721 static int osc_statfs_async(struct obd_export *exp,
2722 struct obd_info *oinfo, time64_t max_age,
2723 struct ptlrpc_request_set *rqset)
2725 struct obd_device *obd = class_exp2obd(exp);
2726 struct ptlrpc_request *req;
2727 struct osc_async_args *aa;
2731 /* We could possibly pass max_age in the request (as an absolute
2732 * timestamp or a "seconds.usec ago") so the target can avoid doing
2733 * extra calls into the filesystem if that isn't necessary (e.g.
2734 * during mount that would help a bit). Having relative timestamps
2735 * is not so great if request processing is slow, while absolute
2736 * timestamps are not ideal because they need time synchronization. */
2737 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2741 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2743 ptlrpc_request_free(req);
2746 ptlrpc_request_set_replen(req);
2747 req->rq_request_portal = OST_CREATE_PORTAL;
2748 ptlrpc_at_set_req_timeout(req);
2750 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2751 /* procfs requests not want stat in wait for avoid deadlock */
2752 req->rq_no_resend = 1;
2753 req->rq_no_delay = 1;
2756 req->rq_interpret_reply = osc_statfs_interpret;
2757 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2758 aa = ptlrpc_req_async_args(req);
2761 ptlrpc_set_add_req(rqset, req);
2765 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2766 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2768 struct obd_device *obd = class_exp2obd(exp);
2769 struct obd_statfs *msfs;
2770 struct ptlrpc_request *req;
2771 struct obd_import *imp = NULL;
2776 /*Since the request might also come from lprocfs, so we need
2777 *sync this with client_disconnect_export Bug15684*/
2778 down_read(&obd->u.cli.cl_sem);
2779 if (obd->u.cli.cl_import)
2780 imp = class_import_get(obd->u.cli.cl_import);
2781 up_read(&obd->u.cli.cl_sem);
2785 /* We could possibly pass max_age in the request (as an absolute
2786 * timestamp or a "seconds.usec ago") so the target can avoid doing
2787 * extra calls into the filesystem if that isn't necessary (e.g.
2788 * during mount that would help a bit). Having relative timestamps
2789 * is not so great if request processing is slow, while absolute
2790 * timestamps are not ideal because they need time synchronization. */
2791 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2793 class_import_put(imp);
2798 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2800 ptlrpc_request_free(req);
2803 ptlrpc_request_set_replen(req);
2804 req->rq_request_portal = OST_CREATE_PORTAL;
2805 ptlrpc_at_set_req_timeout(req);
2807 if (flags & OBD_STATFS_NODELAY) {
2808 /* procfs requests not want stat in wait for avoid deadlock */
2809 req->rq_no_resend = 1;
2810 req->rq_no_delay = 1;
2813 rc = ptlrpc_queue_wait(req);
2817 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2819 GOTO(out, rc = -EPROTO);
2825 ptlrpc_req_finished(req);
2829 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2830 void *karg, void __user *uarg)
2832 struct obd_device *obd = exp->exp_obd;
2833 struct obd_ioctl_data *data = karg;
2837 if (!try_module_get(THIS_MODULE)) {
2838 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2839 module_name(THIS_MODULE));
2843 case OBD_IOC_CLIENT_RECOVER:
2844 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2845 data->ioc_inlbuf1, 0);
2849 case IOC_OSC_SET_ACTIVE:
2850 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2853 case OBD_IOC_PING_TARGET:
2854 err = ptlrpc_obd_ping(obd);
2857 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2858 cmd, current_comm());
2859 GOTO(out, err = -ENOTTY);
2862 module_put(THIS_MODULE);
2866 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2867 u32 keylen, void *key, u32 vallen, void *val,
2868 struct ptlrpc_request_set *set)
2870 struct ptlrpc_request *req;
2871 struct obd_device *obd = exp->exp_obd;
2872 struct obd_import *imp = class_exp2cliimp(exp);
2877 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2879 if (KEY_IS(KEY_CHECKSUM)) {
2880 if (vallen != sizeof(int))
2882 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2886 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2887 sptlrpc_conf_client_adapt(obd);
2891 if (KEY_IS(KEY_FLUSH_CTX)) {
2892 sptlrpc_import_flush_my_ctx(imp);
2896 if (KEY_IS(KEY_CACHE_SET)) {
2897 struct client_obd *cli = &obd->u.cli;
2899 LASSERT(cli->cl_cache == NULL); /* only once */
2900 cli->cl_cache = (struct cl_client_cache *)val;
2901 cl_cache_incref(cli->cl_cache);
2902 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2904 /* add this osc into entity list */
2905 LASSERT(list_empty(&cli->cl_lru_osc));
2906 spin_lock(&cli->cl_cache->ccc_lru_lock);
2907 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2908 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2913 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2914 struct client_obd *cli = &obd->u.cli;
2915 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2916 long target = *(long *)val;
2918 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2923 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2926 /* We pass all other commands directly to OST. Since nobody calls osc
2927 methods directly and everybody is supposed to go through LOV, we
2928 assume lov checked invalid values for us.
2929 The only recognised values so far are evict_by_nid and mds_conn.
2930 Even if something bad goes through, we'd get a -EINVAL from OST
2933 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2934 &RQF_OST_SET_GRANT_INFO :
2939 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2940 RCL_CLIENT, keylen);
2941 if (!KEY_IS(KEY_GRANT_SHRINK))
2942 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2943 RCL_CLIENT, vallen);
2944 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2946 ptlrpc_request_free(req);
2950 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2951 memcpy(tmp, key, keylen);
2952 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2955 memcpy(tmp, val, vallen);
2957 if (KEY_IS(KEY_GRANT_SHRINK)) {
2958 struct osc_grant_args *aa;
2961 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2962 aa = ptlrpc_req_async_args(req);
2963 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2965 ptlrpc_req_finished(req);
2968 *oa = ((struct ost_body *)val)->oa;
2970 req->rq_interpret_reply = osc_shrink_grant_interpret;
2973 ptlrpc_request_set_replen(req);
2974 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2975 LASSERT(set != NULL);
2976 ptlrpc_set_add_req(set, req);
2977 ptlrpc_check_set(NULL, set);
2979 ptlrpcd_add_req(req);
2984 EXPORT_SYMBOL(osc_set_info_async);
2986 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2987 struct obd_device *obd, struct obd_uuid *cluuid,
2988 struct obd_connect_data *data, void *localdata)
2990 struct client_obd *cli = &obd->u.cli;
2992 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2996 spin_lock(&cli->cl_loi_list_lock);
2997 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2998 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2999 grant += cli->cl_dirty_grant;
3001 grant += cli->cl_dirty_pages << PAGE_SHIFT;
3002 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3003 lost_grant = cli->cl_lost_grant;
3004 cli->cl_lost_grant = 0;
3005 spin_unlock(&cli->cl_loi_list_lock);
3007 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3008 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3009 data->ocd_version, data->ocd_grant, lost_grant);
3014 EXPORT_SYMBOL(osc_reconnect);
3016 int osc_disconnect(struct obd_export *exp)
3018 struct obd_device *obd = class_exp2obd(exp);
3021 rc = client_disconnect_export(exp);
3023 * Initially we put del_shrink_grant before disconnect_export, but it
3024 * causes the following problem if setup (connect) and cleanup
3025 * (disconnect) are tangled together.
3026 * connect p1 disconnect p2
3027 * ptlrpc_connect_import
3028 * ............... class_manual_cleanup
3031 * ptlrpc_connect_interrupt
3033 * add this client to shrink list
3035 * Bang! grant shrink thread trigger the shrink. BUG18662
3037 osc_del_grant_list(&obd->u.cli);
3040 EXPORT_SYMBOL(osc_disconnect);
3042 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3043 struct hlist_node *hnode, void *arg)
3045 struct lu_env *env = arg;
3046 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3047 struct ldlm_lock *lock;
3048 struct osc_object *osc = NULL;
3052 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3053 if (lock->l_ast_data != NULL && osc == NULL) {
3054 osc = lock->l_ast_data;
3055 cl_object_get(osc2cl(osc));
3058 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3059 * by the 2nd round of ldlm_namespace_clean() call in
3060 * osc_import_event(). */
3061 ldlm_clear_cleaned(lock);
3066 osc_object_invalidate(env, osc);
3067 cl_object_put(env, osc2cl(osc));
3072 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3074 static int osc_import_event(struct obd_device *obd,
3075 struct obd_import *imp,
3076 enum obd_import_event event)
3078 struct client_obd *cli;
3082 LASSERT(imp->imp_obd == obd);
3085 case IMP_EVENT_DISCON: {
3087 spin_lock(&cli->cl_loi_list_lock);
3088 cli->cl_avail_grant = 0;
3089 cli->cl_lost_grant = 0;
3090 spin_unlock(&cli->cl_loi_list_lock);
3093 case IMP_EVENT_INACTIVE: {
3094 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3097 case IMP_EVENT_INVALIDATE: {
3098 struct ldlm_namespace *ns = obd->obd_namespace;
3102 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3104 env = cl_env_get(&refcheck);
3106 osc_io_unplug(env, &obd->u.cli, NULL);
3108 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3109 osc_ldlm_resource_invalidate,
3111 cl_env_put(env, &refcheck);
3113 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3118 case IMP_EVENT_ACTIVE: {
3119 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3122 case IMP_EVENT_OCD: {
3123 struct obd_connect_data *ocd = &imp->imp_connect_data;
3125 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3126 osc_init_grant(&obd->u.cli, ocd);
3129 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3130 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3132 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3135 case IMP_EVENT_DEACTIVATE: {
3136 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3139 case IMP_EVENT_ACTIVATE: {
3140 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3144 CERROR("Unknown import event %d\n", event);
3151 * Determine whether the lock can be canceled before replaying the lock
3152 * during recovery, see bug16774 for detailed information.
3154 * \retval zero the lock can't be canceled
3155 * \retval other ok to cancel
3157 static int osc_cancel_weight(struct ldlm_lock *lock)
3160 * Cancel all unused and granted extent lock.
3162 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3163 ldlm_is_granted(lock) &&
3164 osc_ldlm_weigh_ast(lock) == 0)
3170 static int brw_queue_work(const struct lu_env *env, void *data)
3172 struct client_obd *cli = data;
3174 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3176 osc_io_unplug(env, cli, NULL);
3180 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3182 struct client_obd *cli = &obd->u.cli;
3188 rc = ptlrpcd_addref();
3192 rc = client_obd_setup(obd, lcfg);
3194 GOTO(out_ptlrpcd, rc);
3197 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3198 if (IS_ERR(handler))
3199 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3200 cli->cl_writeback_work = handler;
3202 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3203 if (IS_ERR(handler))
3204 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3205 cli->cl_lru_work = handler;
3207 rc = osc_quota_setup(obd);
3209 GOTO(out_ptlrpcd_work, rc);
3211 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3212 osc_update_next_shrink(cli);
3217 if (cli->cl_writeback_work != NULL) {
3218 ptlrpcd_destroy_work(cli->cl_writeback_work);
3219 cli->cl_writeback_work = NULL;
3221 if (cli->cl_lru_work != NULL) {
3222 ptlrpcd_destroy_work(cli->cl_lru_work);
3223 cli->cl_lru_work = NULL;
3225 client_obd_cleanup(obd);
3230 EXPORT_SYMBOL(osc_setup_common);
3232 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3234 struct client_obd *cli = &obd->u.cli;
3242 rc = osc_setup_common(obd, lcfg);
3246 rc = osc_tunables_init(obd);
3251 * We try to control the total number of requests with a upper limit
3252 * osc_reqpool_maxreqcount. There might be some race which will cause
3253 * over-limit allocation, but it is fine.
3255 req_count = atomic_read(&osc_pool_req_count);
3256 if (req_count < osc_reqpool_maxreqcount) {
3257 adding = cli->cl_max_rpcs_in_flight + 2;
3258 if (req_count + adding > osc_reqpool_maxreqcount)
3259 adding = osc_reqpool_maxreqcount - req_count;
3261 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3262 atomic_add(added, &osc_pool_req_count);
3265 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3267 spin_lock(&osc_shrink_lock);
3268 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3269 spin_unlock(&osc_shrink_lock);
3270 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3271 cli->cl_import->imp_idle_debug = D_HA;
3276 int osc_precleanup_common(struct obd_device *obd)
3278 struct client_obd *cli = &obd->u.cli;
3282 * for echo client, export may be on zombie list, wait for
3283 * zombie thread to cull it, because cli.cl_import will be
3284 * cleared in client_disconnect_export():
3285 * class_export_destroy() -> obd_cleanup() ->
3286 * echo_device_free() -> echo_client_cleanup() ->
3287 * obd_disconnect() -> osc_disconnect() ->
3288 * client_disconnect_export()
3290 obd_zombie_barrier();
3291 if (cli->cl_writeback_work) {
3292 ptlrpcd_destroy_work(cli->cl_writeback_work);
3293 cli->cl_writeback_work = NULL;
3296 if (cli->cl_lru_work) {
3297 ptlrpcd_destroy_work(cli->cl_lru_work);
3298 cli->cl_lru_work = NULL;
3301 obd_cleanup_client_import(obd);
3304 EXPORT_SYMBOL(osc_precleanup_common);
3306 static int osc_precleanup(struct obd_device *obd)
3310 osc_precleanup_common(obd);
3312 ptlrpc_lprocfs_unregister_obd(obd);
3316 int osc_cleanup_common(struct obd_device *obd)
3318 struct client_obd *cli = &obd->u.cli;
3323 spin_lock(&osc_shrink_lock);
3324 list_del(&cli->cl_shrink_list);
3325 spin_unlock(&osc_shrink_lock);
3328 if (cli->cl_cache != NULL) {
3329 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3330 spin_lock(&cli->cl_cache->ccc_lru_lock);
3331 list_del_init(&cli->cl_lru_osc);
3332 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3333 cli->cl_lru_left = NULL;
3334 cl_cache_decref(cli->cl_cache);
3335 cli->cl_cache = NULL;
3338 /* free memory of osc quota cache */
3339 osc_quota_cleanup(obd);
3341 rc = client_obd_cleanup(obd);
3346 EXPORT_SYMBOL(osc_cleanup_common);
3348 static struct obd_ops osc_obd_ops = {
3349 .o_owner = THIS_MODULE,
3350 .o_setup = osc_setup,
3351 .o_precleanup = osc_precleanup,
3352 .o_cleanup = osc_cleanup_common,
3353 .o_add_conn = client_import_add_conn,
3354 .o_del_conn = client_import_del_conn,
3355 .o_connect = client_connect_import,
3356 .o_reconnect = osc_reconnect,
3357 .o_disconnect = osc_disconnect,
3358 .o_statfs = osc_statfs,
3359 .o_statfs_async = osc_statfs_async,
3360 .o_create = osc_create,
3361 .o_destroy = osc_destroy,
3362 .o_getattr = osc_getattr,
3363 .o_setattr = osc_setattr,
3364 .o_iocontrol = osc_iocontrol,
3365 .o_set_info_async = osc_set_info_async,
3366 .o_import_event = osc_import_event,
3367 .o_quotactl = osc_quotactl,
3370 static struct shrinker *osc_cache_shrinker;
3371 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3372 DEFINE_SPINLOCK(osc_shrink_lock);
3374 #ifndef HAVE_SHRINKER_COUNT
3375 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3377 struct shrink_control scv = {
3378 .nr_to_scan = shrink_param(sc, nr_to_scan),
3379 .gfp_mask = shrink_param(sc, gfp_mask)
3381 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3382 struct shrinker *shrinker = NULL;
3385 (void)osc_cache_shrink_scan(shrinker, &scv);
3387 return osc_cache_shrink_count(shrinker, &scv);
3391 static int __init osc_init(void)
3393 bool enable_proc = true;
3394 struct obd_type *type;
3395 unsigned int reqpool_size;
3396 unsigned int reqsize;
3398 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3399 osc_cache_shrink_count, osc_cache_shrink_scan);
3402 /* print an address of _any_ initialized kernel symbol from this
3403 * module, to allow debugging with gdb that doesn't support data
3404 * symbols from modules.*/
3405 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3407 rc = lu_kmem_init(osc_caches);
3411 type = class_search_type(LUSTRE_OSP_NAME);
3412 if (type != NULL && type->typ_procsym != NULL)
3413 enable_proc = false;
3415 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3416 LUSTRE_OSC_NAME, &osc_device_type);
3420 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3422 /* This is obviously too much memory, only prevent overflow here */
3423 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3424 GOTO(out_type, rc = -EINVAL);
3426 reqpool_size = osc_reqpool_mem_max << 20;
3429 while (reqsize < OST_IO_MAXREQSIZE)
3430 reqsize = reqsize << 1;
3433 * We don't enlarge the request count in OSC pool according to
3434 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3435 * tried after normal allocation failed. So a small OSC pool won't
3436 * cause much performance degression in most of cases.
3438 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3440 atomic_set(&osc_pool_req_count, 0);
3441 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3442 ptlrpc_add_rqs_to_pool);
3444 if (osc_rq_pool == NULL)
3445 GOTO(out_type, rc = -ENOMEM);
3447 rc = osc_start_grant_work();
3449 GOTO(out_req_pool, rc);
3454 ptlrpc_free_rq_pool(osc_rq_pool);
3456 class_unregister_type(LUSTRE_OSC_NAME);
3458 lu_kmem_fini(osc_caches);
3463 static void __exit osc_exit(void)
3465 osc_stop_grant_work();
3466 remove_shrinker(osc_cache_shrinker);
3467 class_unregister_type(LUSTRE_OSC_NAME);
3468 lu_kmem_fini(osc_caches);
3469 ptlrpc_free_rq_pool(osc_rq_pool);
3472 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3473 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3474 MODULE_VERSION(LUSTRE_VERSION_STRING);
3475 MODULE_LICENSE("GPL");
3477 module_init(osc_init);
3478 module_exit(osc_exit);