4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
49 #include "osc_internal.h"
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
62 #define osc_grant_args osc_brw_async_args
64 struct osc_setattr_args {
66 obd_enqueue_update_f sa_upcall;
70 struct osc_fsync_args {
71 struct osc_object *fa_obj;
73 obd_enqueue_update_f fa_upcall;
77 struct osc_ladvise_args {
79 obd_enqueue_update_f la_upcall;
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
89 struct ost_body *body;
91 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100 struct ptlrpc_request *req;
101 struct ost_body *body;
105 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
111 ptlrpc_request_free(req);
115 osc_pack_req_body(req, oa);
117 ptlrpc_request_set_replen(req);
119 rc = ptlrpc_queue_wait(req);
123 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
125 GOTO(out, rc = -EPROTO);
127 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
130 oa->o_blksize = cli_brw_size(exp->exp_obd);
131 oa->o_valid |= OBD_MD_FLBLKSZ;
135 ptlrpc_req_finished(req);
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143 struct ptlrpc_request *req;
144 struct ost_body *body;
148 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
150 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
156 ptlrpc_request_free(req);
160 osc_pack_req_body(req, oa);
162 ptlrpc_request_set_replen(req);
164 rc = ptlrpc_queue_wait(req);
168 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
170 GOTO(out, rc = -EPROTO);
172 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176 ptlrpc_req_finished(req);
181 static int osc_setattr_interpret(const struct lu_env *env,
182 struct ptlrpc_request *req, void *args, int rc)
184 struct osc_setattr_args *sa = args;
185 struct ost_body *body;
192 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194 GOTO(out, rc = -EPROTO);
196 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199 rc = sa->sa_upcall(sa->sa_cookie, rc);
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204 obd_enqueue_update_f upcall, void *cookie,
205 struct ptlrpc_request_set *rqset)
207 struct ptlrpc_request *req;
208 struct osc_setattr_args *sa;
213 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
219 ptlrpc_request_free(req);
223 osc_pack_req_body(req, oa);
225 ptlrpc_request_set_replen(req);
227 /* do mds to ost setattr asynchronously */
229 /* Do not wait for response. */
230 ptlrpcd_add_req(req);
232 req->rq_interpret_reply = osc_setattr_interpret;
234 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
235 sa = ptlrpc_req_async_args(req);
237 sa->sa_upcall = upcall;
238 sa->sa_cookie = cookie;
240 if (rqset == PTLRPCD_SET)
241 ptlrpcd_add_req(req);
243 ptlrpc_set_add_req(rqset, req);
249 static int osc_ladvise_interpret(const struct lu_env *env,
250 struct ptlrpc_request *req,
253 struct osc_ladvise_args *la = arg;
254 struct ost_body *body;
260 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
262 GOTO(out, rc = -EPROTO);
264 *la->la_oa = body->oa;
266 rc = la->la_upcall(la->la_cookie, rc);
271 * If rqset is NULL, do not wait for response. Upcall and cookie could also
272 * be NULL in this case
274 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
275 struct ladvise_hdr *ladvise_hdr,
276 obd_enqueue_update_f upcall, void *cookie,
277 struct ptlrpc_request_set *rqset)
279 struct ptlrpc_request *req;
280 struct ost_body *body;
281 struct osc_ladvise_args *la;
283 struct lu_ladvise *req_ladvise;
284 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
285 int num_advise = ladvise_hdr->lah_count;
286 struct ladvise_hdr *req_ladvise_hdr;
289 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
293 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
294 num_advise * sizeof(*ladvise));
295 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
297 ptlrpc_request_free(req);
300 req->rq_request_portal = OST_IO_PORTAL;
301 ptlrpc_at_set_req_timeout(req);
303 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
305 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
308 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
309 &RMF_OST_LADVISE_HDR);
310 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
312 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
313 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
314 ptlrpc_request_set_replen(req);
317 /* Do not wait for response. */
318 ptlrpcd_add_req(req);
322 req->rq_interpret_reply = osc_ladvise_interpret;
323 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
324 la = ptlrpc_req_async_args(req);
326 la->la_upcall = upcall;
327 la->la_cookie = cookie;
329 if (rqset == PTLRPCD_SET)
330 ptlrpcd_add_req(req);
332 ptlrpc_set_add_req(rqset, req);
337 static int osc_create(const struct lu_env *env, struct obd_export *exp,
340 struct ptlrpc_request *req;
341 struct ost_body *body;
346 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
347 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
349 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
351 GOTO(out, rc = -ENOMEM);
353 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
355 ptlrpc_request_free(req);
359 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
362 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
364 ptlrpc_request_set_replen(req);
366 rc = ptlrpc_queue_wait(req);
370 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
372 GOTO(out_req, rc = -EPROTO);
374 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
375 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
377 oa->o_blksize = cli_brw_size(exp->exp_obd);
378 oa->o_valid |= OBD_MD_FLBLKSZ;
380 CDEBUG(D_HA, "transno: %lld\n",
381 lustre_msg_get_transno(req->rq_repmsg));
383 ptlrpc_req_finished(req);
388 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
389 obd_enqueue_update_f upcall, void *cookie)
391 struct ptlrpc_request *req;
392 struct osc_setattr_args *sa;
393 struct obd_import *imp = class_exp2cliimp(exp);
394 struct ost_body *body;
399 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
403 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
405 ptlrpc_request_free(req);
409 osc_set_io_portal(req);
411 ptlrpc_at_set_req_timeout(req);
413 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
415 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
417 ptlrpc_request_set_replen(req);
419 req->rq_interpret_reply = osc_setattr_interpret;
420 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
421 sa = ptlrpc_req_async_args(req);
423 sa->sa_upcall = upcall;
424 sa->sa_cookie = cookie;
426 ptlrpcd_add_req(req);
430 EXPORT_SYMBOL(osc_punch_send);
432 static int osc_sync_interpret(const struct lu_env *env,
433 struct ptlrpc_request *req, void *args, int rc)
435 struct osc_fsync_args *fa = args;
436 struct ost_body *body;
437 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
438 unsigned long valid = 0;
439 struct cl_object *obj;
445 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
447 CERROR("can't unpack ost_body\n");
448 GOTO(out, rc = -EPROTO);
451 *fa->fa_oa = body->oa;
452 obj = osc2cl(fa->fa_obj);
454 /* Update osc object's blocks attribute */
455 cl_object_attr_lock(obj);
456 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
457 attr->cat_blocks = body->oa.o_blocks;
462 cl_object_attr_update(env, obj, attr, valid);
463 cl_object_attr_unlock(obj);
466 rc = fa->fa_upcall(fa->fa_cookie, rc);
470 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
471 obd_enqueue_update_f upcall, void *cookie,
472 struct ptlrpc_request_set *rqset)
474 struct obd_export *exp = osc_export(obj);
475 struct ptlrpc_request *req;
476 struct ost_body *body;
477 struct osc_fsync_args *fa;
481 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
485 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
487 ptlrpc_request_free(req);
491 /* overload the size and blocks fields in the oa with start/end */
492 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
494 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
496 ptlrpc_request_set_replen(req);
497 req->rq_interpret_reply = osc_sync_interpret;
499 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
500 fa = ptlrpc_req_async_args(req);
503 fa->fa_upcall = upcall;
504 fa->fa_cookie = cookie;
506 if (rqset == PTLRPCD_SET)
507 ptlrpcd_add_req(req);
509 ptlrpc_set_add_req(rqset, req);
514 /* Find and cancel locally locks matched by @mode in the resource found by
515 * @objid. Found locks are added into @cancel list. Returns the amount of
516 * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518 struct list_head *cancels,
519 enum ldlm_mode mode, __u64 lock_flags)
521 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522 struct ldlm_res_id res_id;
523 struct ldlm_resource *res;
527 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528 * export) but disabled through procfs (flag in NS).
530 * This distinguishes from a case when ELC is not supported originally,
531 * when we still want to cancel locks in advance and just cancel them
532 * locally, without sending any RPC. */
533 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
536 ostid_build_res_name(&oa->o_oi, &res_id);
537 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
541 LDLM_RESOURCE_ADDREF(res);
542 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543 lock_flags, 0, NULL);
544 LDLM_RESOURCE_DELREF(res);
545 ldlm_resource_putref(res);
549 static int osc_destroy_interpret(const struct lu_env *env,
550 struct ptlrpc_request *req, void *args, int rc)
552 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
554 atomic_dec(&cli->cl_destroy_in_flight);
555 wake_up(&cli->cl_destroy_waitq);
560 static int osc_can_send_destroy(struct client_obd *cli)
562 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563 cli->cl_max_rpcs_in_flight) {
564 /* The destroy request can be sent */
567 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568 cli->cl_max_rpcs_in_flight) {
570 * The counter has been modified between the two atomic
573 wake_up(&cli->cl_destroy_waitq);
578 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
581 struct client_obd *cli = &exp->exp_obd->u.cli;
582 struct ptlrpc_request *req;
583 struct ost_body *body;
584 struct list_head cancels = LIST_HEAD_INIT(cancels);
589 CDEBUG(D_INFO, "oa NULL\n");
593 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
594 LDLM_FL_DISCARD_DATA);
596 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
598 ldlm_lock_list_put(&cancels, l_bl_ast, count);
602 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
605 ptlrpc_request_free(req);
609 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
610 ptlrpc_at_set_req_timeout(req);
612 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
616 ptlrpc_request_set_replen(req);
618 req->rq_interpret_reply = osc_destroy_interpret;
619 if (!osc_can_send_destroy(cli)) {
620 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
623 * Wait until the number of on-going destroy RPCs drops
624 * under max_rpc_in_flight
626 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
627 osc_can_send_destroy(cli), &lwi);
629 ptlrpc_req_finished(req);
634 /* Do not wait for response */
635 ptlrpcd_add_req(req);
639 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
642 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
644 LASSERT(!(oa->o_valid & bits));
647 spin_lock(&cli->cl_loi_list_lock);
648 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
649 oa->o_dirty = cli->cl_dirty_grant;
651 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
652 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
653 cli->cl_dirty_max_pages)) {
654 CERROR("dirty %lu - %lu > dirty_max %lu\n",
655 cli->cl_dirty_pages, cli->cl_dirty_transit,
656 cli->cl_dirty_max_pages);
658 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
659 atomic_long_read(&obd_dirty_transit_pages) >
660 (long)(obd_max_dirty_pages + 1))) {
661 /* The atomic_read() allowing the atomic_inc() are
662 * not covered by a lock thus they may safely race and trip
663 * this CERROR() unless we add in a small fudge factor (+1). */
664 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
665 cli_name(cli), atomic_long_read(&obd_dirty_pages),
666 atomic_long_read(&obd_dirty_transit_pages),
667 obd_max_dirty_pages);
669 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
671 CERROR("dirty %lu - dirty_max %lu too big???\n",
672 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
675 unsigned long nrpages;
676 unsigned long undirty;
678 nrpages = cli->cl_max_pages_per_rpc;
679 nrpages *= cli->cl_max_rpcs_in_flight + 1;
680 nrpages = max(nrpages, cli->cl_dirty_max_pages);
681 undirty = nrpages << PAGE_SHIFT;
682 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
686 /* take extent tax into account when asking for more
688 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
689 cli->cl_max_extent_pages;
690 undirty += nrextents * cli->cl_grant_extent_tax;
692 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
693 * to add extent tax, etc.
695 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
696 ~(PTLRPC_MAX_BRW_SIZE * 4UL));
698 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
699 oa->o_dropped = cli->cl_lost_grant;
700 cli->cl_lost_grant = 0;
701 spin_unlock(&cli->cl_loi_list_lock);
702 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
703 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
706 void osc_update_next_shrink(struct client_obd *cli)
708 cli->cl_next_shrink_grant = ktime_get_seconds() +
709 cli->cl_grant_shrink_interval;
711 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
712 cli->cl_next_shrink_grant);
715 static void __osc_update_grant(struct client_obd *cli, u64 grant)
717 spin_lock(&cli->cl_loi_list_lock);
718 cli->cl_avail_grant += grant;
719 spin_unlock(&cli->cl_loi_list_lock);
722 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
724 if (body->oa.o_valid & OBD_MD_FLGRANT) {
725 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
726 __osc_update_grant(cli, body->oa.o_grant);
731 * grant thread data for shrinking space.
733 struct grant_thread_data {
734 struct list_head gtd_clients;
735 struct mutex gtd_mutex;
736 unsigned long gtd_stopped:1;
738 static struct grant_thread_data client_gtd;
740 static int osc_shrink_grant_interpret(const struct lu_env *env,
741 struct ptlrpc_request *req,
744 struct osc_grant_args *aa = args;
745 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
746 struct ost_body *body;
749 __osc_update_grant(cli, aa->aa_oa->o_grant);
753 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
755 osc_update_grant(cli, body);
757 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
762 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
764 spin_lock(&cli->cl_loi_list_lock);
765 oa->o_grant = cli->cl_avail_grant / 4;
766 cli->cl_avail_grant -= oa->o_grant;
767 spin_unlock(&cli->cl_loi_list_lock);
768 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
769 oa->o_valid |= OBD_MD_FLFLAGS;
772 oa->o_flags |= OBD_FL_SHRINK_GRANT;
773 osc_update_next_shrink(cli);
776 /* Shrink the current grant, either from some large amount to enough for a
777 * full set of in-flight RPCs, or if we have already shrunk to that limit
778 * then to enough for a single RPC. This avoids keeping more grant than
779 * needed, and avoids shrinking the grant piecemeal. */
780 static int osc_shrink_grant(struct client_obd *cli)
782 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
783 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
785 spin_lock(&cli->cl_loi_list_lock);
786 if (cli->cl_avail_grant <= target_bytes)
787 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
788 spin_unlock(&cli->cl_loi_list_lock);
790 return osc_shrink_grant_to_target(cli, target_bytes);
793 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
796 struct ost_body *body;
799 spin_lock(&cli->cl_loi_list_lock);
800 /* Don't shrink if we are already above or below the desired limit
801 * We don't want to shrink below a single RPC, as that will negatively
802 * impact block allocation and long-term performance. */
803 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
804 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
806 if (target_bytes >= cli->cl_avail_grant) {
807 spin_unlock(&cli->cl_loi_list_lock);
810 spin_unlock(&cli->cl_loi_list_lock);
816 osc_announce_cached(cli, &body->oa, 0);
818 spin_lock(&cli->cl_loi_list_lock);
819 if (target_bytes >= cli->cl_avail_grant) {
820 /* available grant has changed since target calculation */
821 spin_unlock(&cli->cl_loi_list_lock);
822 GOTO(out_free, rc = 0);
824 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
825 cli->cl_avail_grant = target_bytes;
826 spin_unlock(&cli->cl_loi_list_lock);
827 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
828 body->oa.o_valid |= OBD_MD_FLFLAGS;
829 body->oa.o_flags = 0;
831 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
832 osc_update_next_shrink(cli);
834 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
835 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
836 sizeof(*body), body, NULL);
838 __osc_update_grant(cli, body->oa.o_grant);
844 static int osc_should_shrink_grant(struct client_obd *client)
846 time64_t next_shrink = client->cl_next_shrink_grant;
848 if (client->cl_import == NULL)
851 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
852 OBD_CONNECT_GRANT_SHRINK) == 0)
855 if (ktime_get_seconds() >= next_shrink - 5) {
856 /* Get the current RPC size directly, instead of going via:
857 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
858 * Keep comment here so that it can be found by searching. */
859 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
861 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
862 client->cl_avail_grant > brw_size)
865 osc_update_next_shrink(client);
870 #define GRANT_SHRINK_RPC_BATCH 100
872 static struct delayed_work work;
874 static void osc_grant_work_handler(struct work_struct *data)
876 struct client_obd *cli;
878 bool init_next_shrink = true;
879 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
882 mutex_lock(&client_gtd.gtd_mutex);
883 list_for_each_entry(cli, &client_gtd.gtd_clients,
885 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
886 osc_should_shrink_grant(cli)) {
887 osc_shrink_grant(cli);
891 if (!init_next_shrink) {
892 if (cli->cl_next_shrink_grant < next_shrink &&
893 cli->cl_next_shrink_grant > ktime_get_seconds())
894 next_shrink = cli->cl_next_shrink_grant;
896 init_next_shrink = false;
897 next_shrink = cli->cl_next_shrink_grant;
900 mutex_unlock(&client_gtd.gtd_mutex);
902 if (client_gtd.gtd_stopped == 1)
905 if (next_shrink > ktime_get_seconds()) {
906 time64_t delay = next_shrink - ktime_get_seconds();
908 schedule_delayed_work(&work, cfs_time_seconds(delay));
910 schedule_work(&work.work);
914 void osc_schedule_grant_work(void)
916 cancel_delayed_work_sync(&work);
917 schedule_work(&work.work);
921 * Start grant thread for returing grant to server for idle clients.
923 static int osc_start_grant_work(void)
925 client_gtd.gtd_stopped = 0;
926 mutex_init(&client_gtd.gtd_mutex);
927 INIT_LIST_HEAD(&client_gtd.gtd_clients);
929 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
930 schedule_work(&work.work);
935 static void osc_stop_grant_work(void)
937 client_gtd.gtd_stopped = 1;
938 cancel_delayed_work_sync(&work);
941 static void osc_add_grant_list(struct client_obd *client)
943 mutex_lock(&client_gtd.gtd_mutex);
944 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
945 mutex_unlock(&client_gtd.gtd_mutex);
948 static void osc_del_grant_list(struct client_obd *client)
950 if (list_empty(&client->cl_grant_chain))
953 mutex_lock(&client_gtd.gtd_mutex);
954 list_del_init(&client->cl_grant_chain);
955 mutex_unlock(&client_gtd.gtd_mutex);
958 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
961 * ocd_grant is the total grant amount we're expect to hold: if we've
962 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
963 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
966 * race is tolerable here: if we're evicted, but imp_state already
967 * left EVICTED state, then cl_dirty_pages must be 0 already.
969 spin_lock(&cli->cl_loi_list_lock);
970 cli->cl_avail_grant = ocd->ocd_grant;
971 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
972 cli->cl_avail_grant -= cli->cl_reserved_grant;
973 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
974 cli->cl_avail_grant -= cli->cl_dirty_grant;
976 cli->cl_avail_grant -=
977 cli->cl_dirty_pages << PAGE_SHIFT;
980 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
984 /* overhead for each extent insertion */
985 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
986 /* determine the appropriate chunk size used by osc_extent. */
987 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
988 ocd->ocd_grant_blkbits);
989 /* max_pages_per_rpc must be chunk aligned */
990 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
991 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
992 ~chunk_mask) & chunk_mask;
993 /* determine maximum extent size, in #pages */
994 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
995 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
996 if (cli->cl_max_extent_pages == 0)
997 cli->cl_max_extent_pages = 1;
999 cli->cl_grant_extent_tax = 0;
1000 cli->cl_chunkbits = PAGE_SHIFT;
1001 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1003 spin_unlock(&cli->cl_loi_list_lock);
1005 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1006 "chunk bits: %d cl_max_extent_pages: %d\n",
1008 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1009 cli->cl_max_extent_pages);
1011 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1012 osc_add_grant_list(cli);
1014 EXPORT_SYMBOL(osc_init_grant);
1016 /* We assume that the reason this OSC got a short read is because it read
1017 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1018 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1019 * this stripe never got written at or beyond this stripe offset yet. */
1020 static void handle_short_read(int nob_read, size_t page_count,
1021 struct brw_page **pga)
1026 /* skip bytes read OK */
1027 while (nob_read > 0) {
1028 LASSERT (page_count > 0);
1030 if (pga[i]->count > nob_read) {
1031 /* EOF inside this page */
1032 ptr = kmap(pga[i]->pg) +
1033 (pga[i]->off & ~PAGE_MASK);
1034 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1041 nob_read -= pga[i]->count;
1046 /* zero remaining pages */
1047 while (page_count-- > 0) {
1048 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1049 memset(ptr, 0, pga[i]->count);
1055 static int check_write_rcs(struct ptlrpc_request *req,
1056 int requested_nob, int niocount,
1057 size_t page_count, struct brw_page **pga)
1062 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1063 sizeof(*remote_rcs) *
1065 if (remote_rcs == NULL) {
1066 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1070 /* return error if any niobuf was in error */
1071 for (i = 0; i < niocount; i++) {
1072 if ((int)remote_rcs[i] < 0) {
1073 CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1074 i, remote_rcs[i], req);
1075 return remote_rcs[i];
1078 if (remote_rcs[i] != 0) {
1079 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1080 i, remote_rcs[i], req);
1084 if (req->rq_bulk != NULL &&
1085 req->rq_bulk->bd_nob_transferred != requested_nob) {
1086 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1087 req->rq_bulk->bd_nob_transferred, requested_nob);
1094 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1096 if (p1->flag != p2->flag) {
1097 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1098 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1099 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1101 /* warn if we try to combine flags that we don't know to be
1102 * safe to combine */
1103 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1104 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1105 "report this at https://jira.whamcloud.com/\n",
1106 p1->flag, p2->flag);
1111 return (p1->off + p1->count == p2->off);
1114 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1115 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1116 size_t pg_count, struct brw_page **pga,
1117 int opc, obd_dif_csum_fn *fn,
1121 struct ahash_request *req;
1122 /* Used Adler as the default checksum type on top of DIF tags */
1123 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1124 struct page *__page;
1125 unsigned char *buffer;
1127 unsigned int bufsize;
1129 int used_number = 0;
1135 LASSERT(pg_count > 0);
1137 __page = alloc_page(GFP_KERNEL);
1141 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1144 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1145 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1149 buffer = kmap(__page);
1150 guard_start = (__u16 *)buffer;
1151 guard_number = PAGE_SIZE / sizeof(*guard_start);
1152 while (nob > 0 && pg_count > 0) {
1153 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1155 /* corrupt the data before we compute the checksum, to
1156 * simulate an OST->client data error */
1157 if (unlikely(i == 0 && opc == OST_READ &&
1158 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1159 unsigned char *ptr = kmap(pga[i]->pg);
1160 int off = pga[i]->off & ~PAGE_MASK;
1162 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1167 * The left guard number should be able to hold checksums of a
1170 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1171 pga[i]->off & ~PAGE_MASK,
1173 guard_start + used_number,
1174 guard_number - used_number,
1180 used_number += used;
1181 if (used_number == guard_number) {
1182 cfs_crypto_hash_update_page(req, __page, 0,
1183 used_number * sizeof(*guard_start));
1187 nob -= pga[i]->count;
1195 if (used_number != 0)
1196 cfs_crypto_hash_update_page(req, __page, 0,
1197 used_number * sizeof(*guard_start));
1199 bufsize = sizeof(cksum);
1200 cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1202 /* For sending we only compute the wrong checksum instead
1203 * of corrupting the data so it is still correct on a redo */
1204 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1209 __free_page(__page);
1212 #else /* !CONFIG_CRC_T10DIF */
1213 #define obd_dif_ip_fn NULL
1214 #define obd_dif_crc_fn NULL
1215 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \
1217 #endif /* CONFIG_CRC_T10DIF */
1219 static int osc_checksum_bulk(int nob, size_t pg_count,
1220 struct brw_page **pga, int opc,
1221 enum cksum_types cksum_type,
1225 struct ahash_request *req;
1226 unsigned int bufsize;
1227 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1229 LASSERT(pg_count > 0);
1231 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1233 CERROR("Unable to initialize checksum hash %s\n",
1234 cfs_crypto_hash_name(cfs_alg));
1235 return PTR_ERR(req);
1238 while (nob > 0 && pg_count > 0) {
1239 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1241 /* corrupt the data before we compute the checksum, to
1242 * simulate an OST->client data error */
1243 if (i == 0 && opc == OST_READ &&
1244 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1245 unsigned char *ptr = kmap(pga[i]->pg);
1246 int off = pga[i]->off & ~PAGE_MASK;
1248 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1251 cfs_crypto_hash_update_page(req, pga[i]->pg,
1252 pga[i]->off & ~PAGE_MASK,
1254 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1255 (int)(pga[i]->off & ~PAGE_MASK));
1257 nob -= pga[i]->count;
1262 bufsize = sizeof(*cksum);
1263 cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1265 /* For sending we only compute the wrong checksum instead
1266 * of corrupting the data so it is still correct on a redo */
1267 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1273 static int osc_checksum_bulk_rw(const char *obd_name,
1274 enum cksum_types cksum_type,
1275 int nob, size_t pg_count,
1276 struct brw_page **pga, int opc,
1279 obd_dif_csum_fn *fn = NULL;
1280 int sector_size = 0;
1284 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1287 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1288 opc, fn, sector_size, check_sum);
1290 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1297 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1298 u32 page_count, struct brw_page **pga,
1299 struct ptlrpc_request **reqp, int resend)
1301 struct ptlrpc_request *req;
1302 struct ptlrpc_bulk_desc *desc;
1303 struct ost_body *body;
1304 struct obd_ioobj *ioobj;
1305 struct niobuf_remote *niobuf;
1306 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1307 struct osc_brw_async_args *aa;
1308 struct req_capsule *pill;
1309 struct brw_page *pg_prev;
1311 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1314 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1315 RETURN(-ENOMEM); /* Recoverable */
1316 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1317 RETURN(-EINVAL); /* Fatal */
1319 if ((cmd & OBD_BRW_WRITE) != 0) {
1321 req = ptlrpc_request_alloc_pool(cli->cl_import,
1323 &RQF_OST_BRW_WRITE);
1326 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1331 for (niocount = i = 1; i < page_count; i++) {
1332 if (!can_merge_pages(pga[i - 1], pga[i]))
1336 pill = &req->rq_pill;
1337 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1339 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1340 niocount * sizeof(*niobuf));
1342 for (i = 0; i < page_count; i++)
1343 short_io_size += pga[i]->count;
1345 /* Check if read/write is small enough to be a short io. */
1346 if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1347 !imp_connect_shortio(cli->cl_import))
1350 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1351 opc == OST_READ ? 0 : short_io_size);
1352 if (opc == OST_READ)
1353 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1356 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1358 ptlrpc_request_free(req);
1361 osc_set_io_portal(req);
1363 ptlrpc_at_set_req_timeout(req);
1364 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1366 req->rq_no_retry_einprogress = 1;
1368 if (short_io_size != 0) {
1370 short_io_buf = NULL;
1374 desc = ptlrpc_prep_bulk_imp(req, page_count,
1375 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1376 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1377 PTLRPC_BULK_PUT_SINK) |
1378 PTLRPC_BULK_BUF_KIOV,
1380 &ptlrpc_bulk_kiov_pin_ops);
1383 GOTO(out, rc = -ENOMEM);
1384 /* NB request now owns desc and will free it when it gets freed */
1386 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1387 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1388 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1389 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1391 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1393 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1394 * and from_kgid(), because they are asynchronous. Fortunately, variable
1395 * oa contains valid o_uid and o_gid in these two operations.
1396 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1397 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1398 * other process logic */
1399 body->oa.o_uid = oa->o_uid;
1400 body->oa.o_gid = oa->o_gid;
1402 obdo_to_ioobj(oa, ioobj);
1403 ioobj->ioo_bufcnt = niocount;
1404 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1405 * that might be send for this request. The actual number is decided
1406 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1407 * "max - 1" for old client compatibility sending "0", and also so the
1408 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1410 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1412 ioobj_max_brw_set(ioobj, 0);
1414 if (short_io_size != 0) {
1415 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1416 body->oa.o_valid |= OBD_MD_FLFLAGS;
1417 body->oa.o_flags = 0;
1419 body->oa.o_flags |= OBD_FL_SHORT_IO;
1420 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1422 if (opc == OST_WRITE) {
1423 short_io_buf = req_capsule_client_get(pill,
1425 LASSERT(short_io_buf != NULL);
1429 LASSERT(page_count > 0);
1431 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1432 struct brw_page *pg = pga[i];
1433 int poff = pg->off & ~PAGE_MASK;
1435 LASSERT(pg->count > 0);
1436 /* make sure there is no gap in the middle of page array */
1437 LASSERTF(page_count == 1 ||
1438 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1439 ergo(i > 0 && i < page_count - 1,
1440 poff == 0 && pg->count == PAGE_SIZE) &&
1441 ergo(i == page_count - 1, poff == 0)),
1442 "i: %d/%d pg: %p off: %llu, count: %u\n",
1443 i, page_count, pg, pg->off, pg->count);
1444 LASSERTF(i == 0 || pg->off > pg_prev->off,
1445 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1446 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1448 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1449 pg_prev->pg, page_private(pg_prev->pg),
1450 pg_prev->pg->index, pg_prev->off);
1451 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1452 (pg->flag & OBD_BRW_SRVLOCK));
1453 if (short_io_size != 0 && opc == OST_WRITE) {
1454 unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1456 LASSERT(short_io_size >= requested_nob + pg->count);
1457 memcpy(short_io_buf + requested_nob,
1460 ll_kunmap_atomic(ptr, KM_USER0);
1461 } else if (short_io_size == 0) {
1462 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1465 requested_nob += pg->count;
1467 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1469 niobuf->rnb_len += pg->count;
1471 niobuf->rnb_offset = pg->off;
1472 niobuf->rnb_len = pg->count;
1473 niobuf->rnb_flags = pg->flag;
1478 LASSERTF((void *)(niobuf - niocount) ==
1479 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1480 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1481 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1483 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1485 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1486 body->oa.o_valid |= OBD_MD_FLFLAGS;
1487 body->oa.o_flags = 0;
1489 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1492 if (osc_should_shrink_grant(cli))
1493 osc_shrink_grant_local(cli, &body->oa);
1495 /* size[REQ_REC_OFF] still sizeof (*body) */
1496 if (opc == OST_WRITE) {
1497 if (cli->cl_checksum &&
1498 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1499 /* store cl_cksum_type in a local variable since
1500 * it can be changed via lprocfs */
1501 enum cksum_types cksum_type = cli->cl_cksum_type;
1503 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1504 body->oa.o_flags = 0;
1506 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1508 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1510 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1511 requested_nob, page_count,
1515 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1519 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1522 /* save this in 'oa', too, for later checking */
1523 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1524 oa->o_flags |= obd_cksum_type_pack(obd_name,
1527 /* clear out the checksum flag, in case this is a
1528 * resend but cl_checksum is no longer set. b=11238 */
1529 oa->o_valid &= ~OBD_MD_FLCKSUM;
1531 oa->o_cksum = body->oa.o_cksum;
1532 /* 1 RC per niobuf */
1533 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1534 sizeof(__u32) * niocount);
1536 if (cli->cl_checksum &&
1537 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1538 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1539 body->oa.o_flags = 0;
1540 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1541 cli->cl_cksum_type);
1542 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1545 /* Client cksum has been already copied to wire obdo in previous
1546 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1547 * resent due to cksum error, this will allow Server to
1548 * check+dump pages on its side */
1550 ptlrpc_request_set_replen(req);
1552 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1553 aa = ptlrpc_req_async_args(req);
1555 aa->aa_requested_nob = requested_nob;
1556 aa->aa_nio_count = niocount;
1557 aa->aa_page_count = page_count;
1561 INIT_LIST_HEAD(&aa->aa_oaps);
1564 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1565 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1566 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1567 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1571 ptlrpc_req_finished(req);
1575 char dbgcksum_file_name[PATH_MAX];
1577 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1578 struct brw_page **pga, __u32 server_cksum,
1586 /* will only keep dump of pages on first error for the same range in
1587 * file/fid, not during the resends/retries. */
1588 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1589 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1590 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1591 libcfs_debug_file_path_arr :
1592 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1593 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1594 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1595 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1597 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1598 client_cksum, server_cksum);
1599 filp = filp_open(dbgcksum_file_name,
1600 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1604 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1605 "checksum error: rc = %d\n", dbgcksum_file_name,
1608 CERROR("%s: can't open to dump pages with checksum "
1609 "error: rc = %d\n", dbgcksum_file_name, rc);
1613 for (i = 0; i < page_count; i++) {
1614 len = pga[i]->count;
1615 buf = kmap(pga[i]->pg);
1617 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1619 CERROR("%s: wanted to write %u but got %d "
1620 "error\n", dbgcksum_file_name, len, rc);
1625 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1626 dbgcksum_file_name, rc);
1631 rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1633 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1634 filp_close(filp, NULL);
1639 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1640 __u32 client_cksum, __u32 server_cksum,
1641 struct osc_brw_async_args *aa)
1643 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1644 enum cksum_types cksum_type;
1645 obd_dif_csum_fn *fn = NULL;
1646 int sector_size = 0;
1651 if (server_cksum == client_cksum) {
1652 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1656 if (aa->aa_cli->cl_checksum_dump)
1657 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1658 server_cksum, client_cksum);
1660 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1663 switch (cksum_type) {
1664 case OBD_CKSUM_T10IP512:
1668 case OBD_CKSUM_T10IP4K:
1672 case OBD_CKSUM_T10CRC512:
1673 fn = obd_dif_crc_fn;
1676 case OBD_CKSUM_T10CRC4K:
1677 fn = obd_dif_crc_fn;
1685 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1686 aa->aa_page_count, aa->aa_ppga,
1687 OST_WRITE, fn, sector_size,
1690 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1691 aa->aa_ppga, OST_WRITE, cksum_type,
1695 msg = "failed to calculate the client write checksum";
1696 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1697 msg = "the server did not use the checksum type specified in "
1698 "the original request - likely a protocol problem";
1699 else if (new_cksum == server_cksum)
1700 msg = "changed on the client after we checksummed it - "
1701 "likely false positive due to mmap IO (bug 11742)";
1702 else if (new_cksum == client_cksum)
1703 msg = "changed in transit before arrival at OST";
1705 msg = "changed in transit AND doesn't match the original - "
1706 "likely false positive due to mmap IO (bug 11742)";
1708 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1709 DFID " object "DOSTID" extent [%llu-%llu], original "
1710 "client csum %x (type %x), server csum %x (type %x),"
1711 " client csum now %x\n",
1712 obd_name, msg, libcfs_nid2str(peer->nid),
1713 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1714 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1715 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1716 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1717 aa->aa_ppga[aa->aa_page_count - 1]->off +
1718 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1720 obd_cksum_type_unpack(aa->aa_oa->o_flags),
1721 server_cksum, cksum_type, new_cksum);
1725 /* Note rc enters this function as number of bytes transferred */
1726 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1728 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1729 struct client_obd *cli = aa->aa_cli;
1730 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1731 const struct lnet_process_id *peer =
1732 &req->rq_import->imp_connection->c_peer;
1733 struct ost_body *body;
1734 u32 client_cksum = 0;
1737 if (rc < 0 && rc != -EDQUOT) {
1738 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1742 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1743 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1745 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1749 /* set/clear over quota flag for a uid/gid/projid */
1750 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1751 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1752 unsigned qid[LL_MAXQUOTAS] = {
1753 body->oa.o_uid, body->oa.o_gid,
1754 body->oa.o_projid };
1755 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1756 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1757 body->oa.o_valid, body->oa.o_flags);
1758 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1762 osc_update_grant(cli, body);
1767 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1768 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1770 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1772 CERROR("Unexpected +ve rc %d\n", rc);
1776 if (req->rq_bulk != NULL &&
1777 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1780 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1781 check_write_checksum(&body->oa, peer, client_cksum,
1782 body->oa.o_cksum, aa))
1785 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1786 aa->aa_page_count, aa->aa_ppga);
1790 /* The rest of this function executes only for OST_READs */
1792 if (req->rq_bulk == NULL) {
1793 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1795 LASSERT(rc == req->rq_status);
1797 /* if unwrap_bulk failed, return -EAGAIN to retry */
1798 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1801 GOTO(out, rc = -EAGAIN);
1803 if (rc > aa->aa_requested_nob) {
1804 CERROR("Unexpected rc %d (%d requested)\n", rc,
1805 aa->aa_requested_nob);
1809 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1810 CERROR ("Unexpected rc %d (%d transferred)\n",
1811 rc, req->rq_bulk->bd_nob_transferred);
1815 if (req->rq_bulk == NULL) {
1817 int nob, pg_count, i = 0;
1820 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1821 pg_count = aa->aa_page_count;
1822 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1825 while (nob > 0 && pg_count > 0) {
1827 int count = aa->aa_ppga[i]->count > nob ?
1828 nob : aa->aa_ppga[i]->count;
1830 CDEBUG(D_CACHE, "page %p count %d\n",
1831 aa->aa_ppga[i]->pg, count);
1832 ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1833 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1835 ll_kunmap_atomic((void *) ptr, KM_USER0);
1844 if (rc < aa->aa_requested_nob)
1845 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1847 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1848 static int cksum_counter;
1849 u32 server_cksum = body->oa.o_cksum;
1852 enum cksum_types cksum_type;
1853 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1854 body->oa.o_flags : 0;
1856 cksum_type = obd_cksum_type_unpack(o_flags);
1857 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1858 aa->aa_page_count, aa->aa_ppga,
1859 OST_READ, &client_cksum);
1863 if (req->rq_bulk != NULL &&
1864 peer->nid != req->rq_bulk->bd_sender) {
1866 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1869 if (server_cksum != client_cksum) {
1870 struct ost_body *clbody;
1871 u32 page_count = aa->aa_page_count;
1873 clbody = req_capsule_client_get(&req->rq_pill,
1875 if (cli->cl_checksum_dump)
1876 dump_all_bulk_pages(&clbody->oa, page_count,
1877 aa->aa_ppga, server_cksum,
1880 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1881 "%s%s%s inode "DFID" object "DOSTID
1882 " extent [%llu-%llu], client %x, "
1883 "server %x, cksum_type %x\n",
1885 libcfs_nid2str(peer->nid),
1887 clbody->oa.o_valid & OBD_MD_FLFID ?
1888 clbody->oa.o_parent_seq : 0ULL,
1889 clbody->oa.o_valid & OBD_MD_FLFID ?
1890 clbody->oa.o_parent_oid : 0,
1891 clbody->oa.o_valid & OBD_MD_FLFID ?
1892 clbody->oa.o_parent_ver : 0,
1893 POSTID(&body->oa.o_oi),
1894 aa->aa_ppga[0]->off,
1895 aa->aa_ppga[page_count-1]->off +
1896 aa->aa_ppga[page_count-1]->count - 1,
1897 client_cksum, server_cksum,
1900 aa->aa_oa->o_cksum = client_cksum;
1904 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1907 } else if (unlikely(client_cksum)) {
1908 static int cksum_missed;
1911 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1912 CERROR("Checksum %u requested from %s but not sent\n",
1913 cksum_missed, libcfs_nid2str(peer->nid));
1919 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1920 aa->aa_oa, &body->oa);
1925 static int osc_brw_redo_request(struct ptlrpc_request *request,
1926 struct osc_brw_async_args *aa, int rc)
1928 struct ptlrpc_request *new_req;
1929 struct osc_brw_async_args *new_aa;
1930 struct osc_async_page *oap;
1933 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1934 "redo for recoverable error %d", rc);
1936 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1937 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1938 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1939 aa->aa_ppga, &new_req, 1);
1943 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1944 if (oap->oap_request != NULL) {
1945 LASSERTF(request == oap->oap_request,
1946 "request %p != oap_request %p\n",
1947 request, oap->oap_request);
1948 if (oap->oap_interrupted) {
1949 ptlrpc_req_finished(new_req);
1955 * New request takes over pga and oaps from old request.
1956 * Note that copying a list_head doesn't work, need to move it...
1959 new_req->rq_interpret_reply = request->rq_interpret_reply;
1960 new_req->rq_async_args = request->rq_async_args;
1961 new_req->rq_commit_cb = request->rq_commit_cb;
1962 /* cap resend delay to the current request timeout, this is similar to
1963 * what ptlrpc does (see after_reply()) */
1964 if (aa->aa_resends > new_req->rq_timeout)
1965 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1967 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1968 new_req->rq_generation_set = 1;
1969 new_req->rq_import_generation = request->rq_import_generation;
1971 new_aa = ptlrpc_req_async_args(new_req);
1973 INIT_LIST_HEAD(&new_aa->aa_oaps);
1974 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1975 INIT_LIST_HEAD(&new_aa->aa_exts);
1976 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1977 new_aa->aa_resends = aa->aa_resends;
1979 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1980 if (oap->oap_request) {
1981 ptlrpc_req_finished(oap->oap_request);
1982 oap->oap_request = ptlrpc_request_addref(new_req);
1986 /* XXX: This code will run into problem if we're going to support
1987 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1988 * and wait for all of them to be finished. We should inherit request
1989 * set from old request. */
1990 ptlrpcd_add_req(new_req);
1992 DEBUG_REQ(D_INFO, new_req, "new request");
1997 * ugh, we want disk allocation on the target to happen in offset order. we'll
1998 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1999 * fine for our small page arrays and doesn't require allocation. its an
2000 * insertion sort that swaps elements that are strides apart, shrinking the
2001 * stride down until its '1' and the array is sorted.
2003 static void sort_brw_pages(struct brw_page **array, int num)
2006 struct brw_page *tmp;
2010 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2015 for (i = stride ; i < num ; i++) {
2018 while (j >= stride && array[j - stride]->off > tmp->off) {
2019 array[j] = array[j - stride];
2024 } while (stride > 1);
2027 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2029 LASSERT(ppga != NULL);
2030 OBD_FREE(ppga, sizeof(*ppga) * count);
2033 static int brw_interpret(const struct lu_env *env,
2034 struct ptlrpc_request *req, void *args, int rc)
2036 struct osc_brw_async_args *aa = args;
2037 struct osc_extent *ext;
2038 struct osc_extent *tmp;
2039 struct client_obd *cli = aa->aa_cli;
2040 unsigned long transferred = 0;
2044 rc = osc_brw_fini_request(req, rc);
2045 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2047 * When server returns -EINPROGRESS, client should always retry
2048 * regardless of the number of times the bulk was resent already.
2050 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2051 if (req->rq_import_generation !=
2052 req->rq_import->imp_generation) {
2053 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2054 ""DOSTID", rc = %d.\n",
2055 req->rq_import->imp_obd->obd_name,
2056 POSTID(&aa->aa_oa->o_oi), rc);
2057 } else if (rc == -EINPROGRESS ||
2058 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2059 rc = osc_brw_redo_request(req, aa, rc);
2061 CERROR("%s: too many resent retries for object: "
2062 "%llu:%llu, rc = %d.\n",
2063 req->rq_import->imp_obd->obd_name,
2064 POSTID(&aa->aa_oa->o_oi), rc);
2069 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2074 struct obdo *oa = aa->aa_oa;
2075 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2076 unsigned long valid = 0;
2077 struct cl_object *obj;
2078 struct osc_async_page *last;
2080 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2081 obj = osc2cl(last->oap_obj);
2083 cl_object_attr_lock(obj);
2084 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2085 attr->cat_blocks = oa->o_blocks;
2086 valid |= CAT_BLOCKS;
2088 if (oa->o_valid & OBD_MD_FLMTIME) {
2089 attr->cat_mtime = oa->o_mtime;
2092 if (oa->o_valid & OBD_MD_FLATIME) {
2093 attr->cat_atime = oa->o_atime;
2096 if (oa->o_valid & OBD_MD_FLCTIME) {
2097 attr->cat_ctime = oa->o_ctime;
2101 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2102 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2103 loff_t last_off = last->oap_count + last->oap_obj_off +
2106 /* Change file size if this is an out of quota or
2107 * direct IO write and it extends the file size */
2108 if (loi->loi_lvb.lvb_size < last_off) {
2109 attr->cat_size = last_off;
2112 /* Extend KMS if it's not a lockless write */
2113 if (loi->loi_kms < last_off &&
2114 oap2osc_page(last)->ops_srvlock == 0) {
2115 attr->cat_kms = last_off;
2121 cl_object_attr_update(env, obj, attr, valid);
2122 cl_object_attr_unlock(obj);
2124 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2126 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2127 osc_inc_unstable_pages(req);
2129 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2130 list_del_init(&ext->oe_link);
2131 osc_extent_finish(env, ext, 1,
2132 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2134 LASSERT(list_empty(&aa->aa_exts));
2135 LASSERT(list_empty(&aa->aa_oaps));
2137 transferred = (req->rq_bulk == NULL ? /* short io */
2138 aa->aa_requested_nob :
2139 req->rq_bulk->bd_nob_transferred);
2141 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2142 ptlrpc_lprocfs_brw(req, transferred);
2144 spin_lock(&cli->cl_loi_list_lock);
2145 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2146 * is called so we know whether to go to sync BRWs or wait for more
2147 * RPCs to complete */
2148 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2149 cli->cl_w_in_flight--;
2151 cli->cl_r_in_flight--;
2152 osc_wake_cache_waiters(cli);
2153 spin_unlock(&cli->cl_loi_list_lock);
2155 osc_io_unplug(env, cli, NULL);
2159 static void brw_commit(struct ptlrpc_request *req)
2161 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2162 * this called via the rq_commit_cb, I need to ensure
2163 * osc_dec_unstable_pages is still called. Otherwise unstable
2164 * pages may be leaked. */
2165 spin_lock(&req->rq_lock);
2166 if (likely(req->rq_unstable)) {
2167 req->rq_unstable = 0;
2168 spin_unlock(&req->rq_lock);
2170 osc_dec_unstable_pages(req);
2172 req->rq_committed = 1;
2173 spin_unlock(&req->rq_lock);
2178 * Build an RPC by the list of extent @ext_list. The caller must ensure
2179 * that the total pages in this list are NOT over max pages per RPC.
2180 * Extents in the list must be in OES_RPC state.
2182 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2183 struct list_head *ext_list, int cmd)
2185 struct ptlrpc_request *req = NULL;
2186 struct osc_extent *ext;
2187 struct brw_page **pga = NULL;
2188 struct osc_brw_async_args *aa = NULL;
2189 struct obdo *oa = NULL;
2190 struct osc_async_page *oap;
2191 struct osc_object *obj = NULL;
2192 struct cl_req_attr *crattr = NULL;
2193 loff_t starting_offset = OBD_OBJECT_EOF;
2194 loff_t ending_offset = 0;
2198 bool soft_sync = false;
2199 bool interrupted = false;
2200 bool ndelay = false;
2204 __u32 layout_version = 0;
2205 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
2206 struct ost_body *body;
2208 LASSERT(!list_empty(ext_list));
2210 /* add pages into rpc_list to build BRW rpc */
2211 list_for_each_entry(ext, ext_list, oe_link) {
2212 LASSERT(ext->oe_state == OES_RPC);
2213 mem_tight |= ext->oe_memalloc;
2214 grant += ext->oe_grants;
2215 page_count += ext->oe_nr_pages;
2216 layout_version = MAX(layout_version, ext->oe_layout_version);
2221 soft_sync = osc_over_unstable_soft_limit(cli);
2223 mpflag = cfs_memory_pressure_get_and_set();
2225 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2227 GOTO(out, rc = -ENOMEM);
2229 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2231 GOTO(out, rc = -ENOMEM);
2234 list_for_each_entry(ext, ext_list, oe_link) {
2235 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2237 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2239 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2240 pga[i] = &oap->oap_brw_page;
2241 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2244 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2245 if (starting_offset == OBD_OBJECT_EOF ||
2246 starting_offset > oap->oap_obj_off)
2247 starting_offset = oap->oap_obj_off;
2249 LASSERT(oap->oap_page_off == 0);
2250 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2251 ending_offset = oap->oap_obj_off +
2254 LASSERT(oap->oap_page_off + oap->oap_count ==
2256 if (oap->oap_interrupted)
2263 /* first page in the list */
2264 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2266 crattr = &osc_env_info(env)->oti_req_attr;
2267 memset(crattr, 0, sizeof(*crattr));
2268 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2269 crattr->cra_flags = ~0ULL;
2270 crattr->cra_page = oap2cl_page(oap);
2271 crattr->cra_oa = oa;
2272 cl_req_attr_set(env, osc2cl(obj), crattr);
2274 if (cmd == OBD_BRW_WRITE) {
2275 oa->o_grant_used = grant;
2276 if (layout_version > 0) {
2277 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2278 PFID(&oa->o_oi.oi_fid), layout_version);
2280 oa->o_layout_version = layout_version;
2281 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2285 sort_brw_pages(pga, page_count);
2286 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2288 CERROR("prep_req failed: %d\n", rc);
2292 req->rq_commit_cb = brw_commit;
2293 req->rq_interpret_reply = brw_interpret;
2294 req->rq_memalloc = mem_tight != 0;
2295 oap->oap_request = ptlrpc_request_addref(req);
2296 if (interrupted && !req->rq_intr)
2297 ptlrpc_mark_interrupted(req);
2299 req->rq_no_resend = req->rq_no_delay = 1;
2300 /* probably set a shorter timeout value.
2301 * to handle ETIMEDOUT in brw_interpret() correctly. */
2302 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2305 /* Need to update the timestamps after the request is built in case
2306 * we race with setattr (locally or in queue at OST). If OST gets
2307 * later setattr before earlier BRW (as determined by the request xid),
2308 * the OST will not use BRW timestamps. Sadly, there is no obvious
2309 * way to do this in a single call. bug 10150 */
2310 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2311 crattr->cra_oa = &body->oa;
2312 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2313 cl_req_attr_set(env, osc2cl(obj), crattr);
2314 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2316 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2317 aa = ptlrpc_req_async_args(req);
2318 INIT_LIST_HEAD(&aa->aa_oaps);
2319 list_splice_init(&rpc_list, &aa->aa_oaps);
2320 INIT_LIST_HEAD(&aa->aa_exts);
2321 list_splice_init(ext_list, &aa->aa_exts);
2323 spin_lock(&cli->cl_loi_list_lock);
2324 starting_offset >>= PAGE_SHIFT;
2325 if (cmd == OBD_BRW_READ) {
2326 cli->cl_r_in_flight++;
2327 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2328 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2329 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2330 starting_offset + 1);
2332 cli->cl_w_in_flight++;
2333 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2334 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2335 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2336 starting_offset + 1);
2338 spin_unlock(&cli->cl_loi_list_lock);
2340 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2341 page_count, aa, cli->cl_r_in_flight,
2342 cli->cl_w_in_flight);
2343 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2345 ptlrpcd_add_req(req);
2351 cfs_memory_pressure_restore(mpflag);
2354 LASSERT(req == NULL);
2357 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2359 OBD_FREE(pga, sizeof(*pga) * page_count);
2360 /* this should happen rarely and is pretty bad, it makes the
2361 * pending list not follow the dirty order */
2362 while (!list_empty(ext_list)) {
2363 ext = list_entry(ext_list->next, struct osc_extent,
2365 list_del_init(&ext->oe_link);
2366 osc_extent_finish(env, ext, 0, rc);
2372 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2376 LASSERT(lock != NULL);
2378 lock_res_and_lock(lock);
2380 if (lock->l_ast_data == NULL)
2381 lock->l_ast_data = data;
2382 if (lock->l_ast_data == data)
2385 unlock_res_and_lock(lock);
2390 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2391 void *cookie, struct lustre_handle *lockh,
2392 enum ldlm_mode mode, __u64 *flags, bool speculative,
2395 bool intent = *flags & LDLM_FL_HAS_INTENT;
2399 /* The request was created before ldlm_cli_enqueue call. */
2400 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2401 struct ldlm_reply *rep;
2403 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2404 LASSERT(rep != NULL);
2406 rep->lock_policy_res1 =
2407 ptlrpc_status_ntoh(rep->lock_policy_res1);
2408 if (rep->lock_policy_res1)
2409 errcode = rep->lock_policy_res1;
2411 *flags |= LDLM_FL_LVB_READY;
2412 } else if (errcode == ELDLM_OK) {
2413 *flags |= LDLM_FL_LVB_READY;
2416 /* Call the update callback. */
2417 rc = (*upcall)(cookie, lockh, errcode);
2419 /* release the reference taken in ldlm_cli_enqueue() */
2420 if (errcode == ELDLM_LOCK_MATCHED)
2422 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2423 ldlm_lock_decref(lockh, mode);
2428 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2431 struct osc_enqueue_args *aa = args;
2432 struct ldlm_lock *lock;
2433 struct lustre_handle *lockh = &aa->oa_lockh;
2434 enum ldlm_mode mode = aa->oa_mode;
2435 struct ost_lvb *lvb = aa->oa_lvb;
2436 __u32 lvb_len = sizeof(*lvb);
2441 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2443 lock = ldlm_handle2lock(lockh);
2444 LASSERTF(lock != NULL,
2445 "lockh %#llx, req %p, aa %p - client evicted?\n",
2446 lockh->cookie, req, aa);
2448 /* Take an additional reference so that a blocking AST that
2449 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2450 * to arrive after an upcall has been executed by
2451 * osc_enqueue_fini(). */
2452 ldlm_lock_addref(lockh, mode);
2454 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2455 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2457 /* Let CP AST to grant the lock first. */
2458 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2460 if (aa->oa_speculative) {
2461 LASSERT(aa->oa_lvb == NULL);
2462 LASSERT(aa->oa_flags == NULL);
2463 aa->oa_flags = &flags;
2466 /* Complete obtaining the lock procedure. */
2467 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2468 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2470 /* Complete osc stuff. */
2471 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2472 aa->oa_flags, aa->oa_speculative, rc);
2474 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2476 ldlm_lock_decref(lockh, mode);
2477 LDLM_LOCK_PUT(lock);
2481 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2483 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2484 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2485 * other synchronous requests, however keeping some locks and trying to obtain
2486 * others may take a considerable amount of time in a case of ost failure; and
2487 * when other sync requests do not get released lock from a client, the client
2488 * is evicted from the cluster -- such scenarious make the life difficult, so
2489 * release locks just after they are obtained. */
2490 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2491 __u64 *flags, union ldlm_policy_data *policy,
2492 struct ost_lvb *lvb, int kms_valid,
2493 osc_enqueue_upcall_f upcall, void *cookie,
2494 struct ldlm_enqueue_info *einfo,
2495 struct ptlrpc_request_set *rqset, int async,
2498 struct obd_device *obd = exp->exp_obd;
2499 struct lustre_handle lockh = { 0 };
2500 struct ptlrpc_request *req = NULL;
2501 int intent = *flags & LDLM_FL_HAS_INTENT;
2502 __u64 match_flags = *flags;
2503 enum ldlm_mode mode;
2507 /* Filesystem lock extents are extended to page boundaries so that
2508 * dealing with the page cache is a little smoother. */
2509 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2510 policy->l_extent.end |= ~PAGE_MASK;
2513 * kms is not valid when either object is completely fresh (so that no
2514 * locks are cached), or object was evicted. In the latter case cached
2515 * lock cannot be used, because it would prime inode state with
2516 * potentially stale LVB.
2521 /* Next, search for already existing extent locks that will cover us */
2522 /* If we're trying to read, we also search for an existing PW lock. The
2523 * VFS and page cache already protect us locally, so lots of readers/
2524 * writers can share a single PW lock.
2526 * There are problems with conversion deadlocks, so instead of
2527 * converting a read lock to a write lock, we'll just enqueue a new
2530 * At some point we should cancel the read lock instead of making them
2531 * send us a blocking callback, but there are problems with canceling
2532 * locks out from other users right now, too. */
2533 mode = einfo->ei_mode;
2534 if (einfo->ei_mode == LCK_PR)
2536 /* Normal lock requests must wait for the LVB to be ready before
2537 * matching a lock; speculative lock requests do not need to,
2538 * because they will not actually use the lock. */
2540 match_flags |= LDLM_FL_LVB_READY;
2542 match_flags |= LDLM_FL_BLOCK_GRANTED;
2543 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2544 einfo->ei_type, policy, mode, &lockh, 0);
2546 struct ldlm_lock *matched;
2548 if (*flags & LDLM_FL_TEST_LOCK)
2551 matched = ldlm_handle2lock(&lockh);
2553 /* This DLM lock request is speculative, and does not
2554 * have an associated IO request. Therefore if there
2555 * is already a DLM lock, it wll just inform the
2556 * caller to cancel the request for this stripe.*/
2557 lock_res_and_lock(matched);
2558 if (ldlm_extent_equal(&policy->l_extent,
2559 &matched->l_policy_data.l_extent))
2563 unlock_res_and_lock(matched);
2565 ldlm_lock_decref(&lockh, mode);
2566 LDLM_LOCK_PUT(matched);
2568 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2569 *flags |= LDLM_FL_LVB_READY;
2571 /* We already have a lock, and it's referenced. */
2572 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2574 ldlm_lock_decref(&lockh, mode);
2575 LDLM_LOCK_PUT(matched);
2578 ldlm_lock_decref(&lockh, mode);
2579 LDLM_LOCK_PUT(matched);
2584 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2588 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2589 &RQF_LDLM_ENQUEUE_LVB);
2593 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2595 ptlrpc_request_free(req);
2599 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2601 ptlrpc_request_set_replen(req);
2604 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2605 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2607 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2608 sizeof(*lvb), LVB_T_OST, &lockh, async);
2611 struct osc_enqueue_args *aa;
2612 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2613 aa = ptlrpc_req_async_args(req);
2615 aa->oa_mode = einfo->ei_mode;
2616 aa->oa_type = einfo->ei_type;
2617 lustre_handle_copy(&aa->oa_lockh, &lockh);
2618 aa->oa_upcall = upcall;
2619 aa->oa_cookie = cookie;
2620 aa->oa_speculative = speculative;
2622 aa->oa_flags = flags;
2625 /* speculative locks are essentially to enqueue
2626 * a DLM lock in advance, so we don't care
2627 * about the result of the enqueue. */
2629 aa->oa_flags = NULL;
2632 req->rq_interpret_reply = osc_enqueue_interpret;
2633 if (rqset == PTLRPCD_SET)
2634 ptlrpcd_add_req(req);
2636 ptlrpc_set_add_req(rqset, req);
2637 } else if (intent) {
2638 ptlrpc_req_finished(req);
2643 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2644 flags, speculative, rc);
2646 ptlrpc_req_finished(req);
2651 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2652 enum ldlm_type type, union ldlm_policy_data *policy,
2653 enum ldlm_mode mode, __u64 *flags, void *data,
2654 struct lustre_handle *lockh, int unref)
2656 struct obd_device *obd = exp->exp_obd;
2657 __u64 lflags = *flags;
2661 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2664 /* Filesystem lock extents are extended to page boundaries so that
2665 * dealing with the page cache is a little smoother */
2666 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2667 policy->l_extent.end |= ~PAGE_MASK;
2669 /* Next, search for already existing extent locks that will cover us */
2670 /* If we're trying to read, we also search for an existing PW lock. The
2671 * VFS and page cache already protect us locally, so lots of readers/
2672 * writers can share a single PW lock. */
2676 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2677 res_id, type, policy, rc, lockh, unref);
2678 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2682 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2684 LASSERT(lock != NULL);
2685 if (!osc_set_lock_data(lock, data)) {
2686 ldlm_lock_decref(lockh, rc);
2689 LDLM_LOCK_PUT(lock);
2694 static int osc_statfs_interpret(const struct lu_env *env,
2695 struct ptlrpc_request *req, void *args, int rc)
2697 struct osc_async_args *aa = args;
2698 struct obd_statfs *msfs;
2703 * The request has in fact never been sent due to issues at
2704 * a higher level (LOV). Exit immediately since the caller
2705 * is aware of the problem and takes care of the clean up.
2709 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2710 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2716 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2718 GOTO(out, rc = -EPROTO);
2720 *aa->aa_oi->oi_osfs = *msfs;
2722 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2727 static int osc_statfs_async(struct obd_export *exp,
2728 struct obd_info *oinfo, time64_t max_age,
2729 struct ptlrpc_request_set *rqset)
2731 struct obd_device *obd = class_exp2obd(exp);
2732 struct ptlrpc_request *req;
2733 struct osc_async_args *aa;
2737 if (obd->obd_osfs_age >= max_age) {
2739 "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2740 obd->obd_name, &obd->obd_osfs,
2741 obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2742 obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2743 spin_lock(&obd->obd_osfs_lock);
2744 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2745 spin_unlock(&obd->obd_osfs_lock);
2746 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2747 if (oinfo->oi_cb_up)
2748 oinfo->oi_cb_up(oinfo, 0);
2753 /* We could possibly pass max_age in the request (as an absolute
2754 * timestamp or a "seconds.usec ago") so the target can avoid doing
2755 * extra calls into the filesystem if that isn't necessary (e.g.
2756 * during mount that would help a bit). Having relative timestamps
2757 * is not so great if request processing is slow, while absolute
2758 * timestamps are not ideal because they need time synchronization. */
2759 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2763 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2765 ptlrpc_request_free(req);
2768 ptlrpc_request_set_replen(req);
2769 req->rq_request_portal = OST_CREATE_PORTAL;
2770 ptlrpc_at_set_req_timeout(req);
2772 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2773 /* procfs requests not want stat in wait for avoid deadlock */
2774 req->rq_no_resend = 1;
2775 req->rq_no_delay = 1;
2778 req->rq_interpret_reply = osc_statfs_interpret;
2779 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2780 aa = ptlrpc_req_async_args(req);
2783 ptlrpc_set_add_req(rqset, req);
2787 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2788 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2790 struct obd_device *obd = class_exp2obd(exp);
2791 struct obd_statfs *msfs;
2792 struct ptlrpc_request *req;
2793 struct obd_import *imp = NULL;
2798 /*Since the request might also come from lprocfs, so we need
2799 *sync this with client_disconnect_export Bug15684*/
2800 down_read(&obd->u.cli.cl_sem);
2801 if (obd->u.cli.cl_import)
2802 imp = class_import_get(obd->u.cli.cl_import);
2803 up_read(&obd->u.cli.cl_sem);
2807 /* We could possibly pass max_age in the request (as an absolute
2808 * timestamp or a "seconds.usec ago") so the target can avoid doing
2809 * extra calls into the filesystem if that isn't necessary (e.g.
2810 * during mount that would help a bit). Having relative timestamps
2811 * is not so great if request processing is slow, while absolute
2812 * timestamps are not ideal because they need time synchronization. */
2813 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2815 class_import_put(imp);
2820 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2822 ptlrpc_request_free(req);
2825 ptlrpc_request_set_replen(req);
2826 req->rq_request_portal = OST_CREATE_PORTAL;
2827 ptlrpc_at_set_req_timeout(req);
2829 if (flags & OBD_STATFS_NODELAY) {
2830 /* procfs requests not want stat in wait for avoid deadlock */
2831 req->rq_no_resend = 1;
2832 req->rq_no_delay = 1;
2835 rc = ptlrpc_queue_wait(req);
2839 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2841 GOTO(out, rc = -EPROTO);
2847 ptlrpc_req_finished(req);
2851 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2852 void *karg, void __user *uarg)
2854 struct obd_device *obd = exp->exp_obd;
2855 struct obd_ioctl_data *data = karg;
2859 if (!try_module_get(THIS_MODULE)) {
2860 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2861 module_name(THIS_MODULE));
2865 case OBD_IOC_CLIENT_RECOVER:
2866 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2867 data->ioc_inlbuf1, 0);
2871 case IOC_OSC_SET_ACTIVE:
2872 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2877 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2878 obd->obd_name, cmd, current_comm(), rc);
2882 module_put(THIS_MODULE);
2886 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2887 u32 keylen, void *key, u32 vallen, void *val,
2888 struct ptlrpc_request_set *set)
2890 struct ptlrpc_request *req;
2891 struct obd_device *obd = exp->exp_obd;
2892 struct obd_import *imp = class_exp2cliimp(exp);
2897 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2899 if (KEY_IS(KEY_CHECKSUM)) {
2900 if (vallen != sizeof(int))
2902 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2906 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2907 sptlrpc_conf_client_adapt(obd);
2911 if (KEY_IS(KEY_FLUSH_CTX)) {
2912 sptlrpc_import_flush_my_ctx(imp);
2916 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2917 struct client_obd *cli = &obd->u.cli;
2918 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2919 long target = *(long *)val;
2921 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2926 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2929 /* We pass all other commands directly to OST. Since nobody calls osc
2930 methods directly and everybody is supposed to go through LOV, we
2931 assume lov checked invalid values for us.
2932 The only recognised values so far are evict_by_nid and mds_conn.
2933 Even if something bad goes through, we'd get a -EINVAL from OST
2936 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2937 &RQF_OST_SET_GRANT_INFO :
2942 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2943 RCL_CLIENT, keylen);
2944 if (!KEY_IS(KEY_GRANT_SHRINK))
2945 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2946 RCL_CLIENT, vallen);
2947 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2949 ptlrpc_request_free(req);
2953 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2954 memcpy(tmp, key, keylen);
2955 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2958 memcpy(tmp, val, vallen);
2960 if (KEY_IS(KEY_GRANT_SHRINK)) {
2961 struct osc_grant_args *aa;
2964 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2965 aa = ptlrpc_req_async_args(req);
2966 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2968 ptlrpc_req_finished(req);
2971 *oa = ((struct ost_body *)val)->oa;
2973 req->rq_interpret_reply = osc_shrink_grant_interpret;
2976 ptlrpc_request_set_replen(req);
2977 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2978 LASSERT(set != NULL);
2979 ptlrpc_set_add_req(set, req);
2980 ptlrpc_check_set(NULL, set);
2982 ptlrpcd_add_req(req);
2987 EXPORT_SYMBOL(osc_set_info_async);
2989 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2990 struct obd_device *obd, struct obd_uuid *cluuid,
2991 struct obd_connect_data *data, void *localdata)
2993 struct client_obd *cli = &obd->u.cli;
2995 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2999 spin_lock(&cli->cl_loi_list_lock);
3000 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3001 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3002 /* restore ocd_grant_blkbits as client page bits */
3003 data->ocd_grant_blkbits = PAGE_SHIFT;
3004 grant += cli->cl_dirty_grant;
3006 grant += cli->cl_dirty_pages << PAGE_SHIFT;
3008 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3009 lost_grant = cli->cl_lost_grant;
3010 cli->cl_lost_grant = 0;
3011 spin_unlock(&cli->cl_loi_list_lock);
3013 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3014 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3015 data->ocd_version, data->ocd_grant, lost_grant);
3020 EXPORT_SYMBOL(osc_reconnect);
3022 int osc_disconnect(struct obd_export *exp)
3024 struct obd_device *obd = class_exp2obd(exp);
3027 rc = client_disconnect_export(exp);
3029 * Initially we put del_shrink_grant before disconnect_export, but it
3030 * causes the following problem if setup (connect) and cleanup
3031 * (disconnect) are tangled together.
3032 * connect p1 disconnect p2
3033 * ptlrpc_connect_import
3034 * ............... class_manual_cleanup
3037 * ptlrpc_connect_interrupt
3039 * add this client to shrink list
3041 * Bang! grant shrink thread trigger the shrink. BUG18662
3043 osc_del_grant_list(&obd->u.cli);
3046 EXPORT_SYMBOL(osc_disconnect);
3048 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3049 struct hlist_node *hnode, void *arg)
3051 struct lu_env *env = arg;
3052 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3053 struct ldlm_lock *lock;
3054 struct osc_object *osc = NULL;
3058 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3059 if (lock->l_ast_data != NULL && osc == NULL) {
3060 osc = lock->l_ast_data;
3061 cl_object_get(osc2cl(osc));
3064 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3065 * by the 2nd round of ldlm_namespace_clean() call in
3066 * osc_import_event(). */
3067 ldlm_clear_cleaned(lock);
3072 osc_object_invalidate(env, osc);
3073 cl_object_put(env, osc2cl(osc));
3078 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3080 static int osc_import_event(struct obd_device *obd,
3081 struct obd_import *imp,
3082 enum obd_import_event event)
3084 struct client_obd *cli;
3088 LASSERT(imp->imp_obd == obd);
3091 case IMP_EVENT_DISCON: {
3093 spin_lock(&cli->cl_loi_list_lock);
3094 cli->cl_avail_grant = 0;
3095 cli->cl_lost_grant = 0;
3096 spin_unlock(&cli->cl_loi_list_lock);
3099 case IMP_EVENT_INACTIVE: {
3100 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3103 case IMP_EVENT_INVALIDATE: {
3104 struct ldlm_namespace *ns = obd->obd_namespace;
3108 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3110 env = cl_env_get(&refcheck);
3112 osc_io_unplug(env, &obd->u.cli, NULL);
3114 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3115 osc_ldlm_resource_invalidate,
3117 cl_env_put(env, &refcheck);
3119 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3124 case IMP_EVENT_ACTIVE: {
3125 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3128 case IMP_EVENT_OCD: {
3129 struct obd_connect_data *ocd = &imp->imp_connect_data;
3131 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3132 osc_init_grant(&obd->u.cli, ocd);
3135 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3136 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3138 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3141 case IMP_EVENT_DEACTIVATE: {
3142 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3145 case IMP_EVENT_ACTIVATE: {
3146 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3150 CERROR("Unknown import event %d\n", event);
3157 * Determine whether the lock can be canceled before replaying the lock
3158 * during recovery, see bug16774 for detailed information.
3160 * \retval zero the lock can't be canceled
3161 * \retval other ok to cancel
3163 static int osc_cancel_weight(struct ldlm_lock *lock)
3166 * Cancel all unused and granted extent lock.
3168 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3169 ldlm_is_granted(lock) &&
3170 osc_ldlm_weigh_ast(lock) == 0)
3176 static int brw_queue_work(const struct lu_env *env, void *data)
3178 struct client_obd *cli = data;
3180 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3182 osc_io_unplug(env, cli, NULL);
3186 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3188 struct client_obd *cli = &obd->u.cli;
3194 rc = ptlrpcd_addref();
3198 rc = client_obd_setup(obd, lcfg);
3200 GOTO(out_ptlrpcd, rc);
3203 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3204 if (IS_ERR(handler))
3205 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3206 cli->cl_writeback_work = handler;
3208 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3209 if (IS_ERR(handler))
3210 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3211 cli->cl_lru_work = handler;
3213 rc = osc_quota_setup(obd);
3215 GOTO(out_ptlrpcd_work, rc);
3217 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3218 osc_update_next_shrink(cli);
3223 if (cli->cl_writeback_work != NULL) {
3224 ptlrpcd_destroy_work(cli->cl_writeback_work);
3225 cli->cl_writeback_work = NULL;
3227 if (cli->cl_lru_work != NULL) {
3228 ptlrpcd_destroy_work(cli->cl_lru_work);
3229 cli->cl_lru_work = NULL;
3231 client_obd_cleanup(obd);
3236 EXPORT_SYMBOL(osc_setup_common);
3238 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3240 struct client_obd *cli = &obd->u.cli;
3248 rc = osc_setup_common(obd, lcfg);
3252 rc = osc_tunables_init(obd);
3257 * We try to control the total number of requests with a upper limit
3258 * osc_reqpool_maxreqcount. There might be some race which will cause
3259 * over-limit allocation, but it is fine.
3261 req_count = atomic_read(&osc_pool_req_count);
3262 if (req_count < osc_reqpool_maxreqcount) {
3263 adding = cli->cl_max_rpcs_in_flight + 2;
3264 if (req_count + adding > osc_reqpool_maxreqcount)
3265 adding = osc_reqpool_maxreqcount - req_count;
3267 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3268 atomic_add(added, &osc_pool_req_count);
3271 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3273 spin_lock(&osc_shrink_lock);
3274 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3275 spin_unlock(&osc_shrink_lock);
3276 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3277 cli->cl_import->imp_idle_debug = D_HA;
3282 int osc_precleanup_common(struct obd_device *obd)
3284 struct client_obd *cli = &obd->u.cli;
3288 * for echo client, export may be on zombie list, wait for
3289 * zombie thread to cull it, because cli.cl_import will be
3290 * cleared in client_disconnect_export():
3291 * class_export_destroy() -> obd_cleanup() ->
3292 * echo_device_free() -> echo_client_cleanup() ->
3293 * obd_disconnect() -> osc_disconnect() ->
3294 * client_disconnect_export()
3296 obd_zombie_barrier();
3297 if (cli->cl_writeback_work) {
3298 ptlrpcd_destroy_work(cli->cl_writeback_work);
3299 cli->cl_writeback_work = NULL;
3302 if (cli->cl_lru_work) {
3303 ptlrpcd_destroy_work(cli->cl_lru_work);
3304 cli->cl_lru_work = NULL;
3307 obd_cleanup_client_import(obd);
3310 EXPORT_SYMBOL(osc_precleanup_common);
3312 static int osc_precleanup(struct obd_device *obd)
3316 osc_precleanup_common(obd);
3318 ptlrpc_lprocfs_unregister_obd(obd);
3322 int osc_cleanup_common(struct obd_device *obd)
3324 struct client_obd *cli = &obd->u.cli;
3329 spin_lock(&osc_shrink_lock);
3330 list_del(&cli->cl_shrink_list);
3331 spin_unlock(&osc_shrink_lock);
3334 if (cli->cl_cache != NULL) {
3335 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3336 spin_lock(&cli->cl_cache->ccc_lru_lock);
3337 list_del_init(&cli->cl_lru_osc);
3338 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3339 cli->cl_lru_left = NULL;
3340 cl_cache_decref(cli->cl_cache);
3341 cli->cl_cache = NULL;
3344 /* free memory of osc quota cache */
3345 osc_quota_cleanup(obd);
3347 rc = client_obd_cleanup(obd);
3352 EXPORT_SYMBOL(osc_cleanup_common);
3354 static struct obd_ops osc_obd_ops = {
3355 .o_owner = THIS_MODULE,
3356 .o_setup = osc_setup,
3357 .o_precleanup = osc_precleanup,
3358 .o_cleanup = osc_cleanup_common,
3359 .o_add_conn = client_import_add_conn,
3360 .o_del_conn = client_import_del_conn,
3361 .o_connect = client_connect_import,
3362 .o_reconnect = osc_reconnect,
3363 .o_disconnect = osc_disconnect,
3364 .o_statfs = osc_statfs,
3365 .o_statfs_async = osc_statfs_async,
3366 .o_create = osc_create,
3367 .o_destroy = osc_destroy,
3368 .o_getattr = osc_getattr,
3369 .o_setattr = osc_setattr,
3370 .o_iocontrol = osc_iocontrol,
3371 .o_set_info_async = osc_set_info_async,
3372 .o_import_event = osc_import_event,
3373 .o_quotactl = osc_quotactl,
3376 static struct shrinker *osc_cache_shrinker;
3377 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3378 DEFINE_SPINLOCK(osc_shrink_lock);
3380 #ifndef HAVE_SHRINKER_COUNT
3381 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3383 struct shrink_control scv = {
3384 .nr_to_scan = shrink_param(sc, nr_to_scan),
3385 .gfp_mask = shrink_param(sc, gfp_mask)
3387 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3388 struct shrinker *shrinker = NULL;
3391 (void)osc_cache_shrink_scan(shrinker, &scv);
3393 return osc_cache_shrink_count(shrinker, &scv);
3397 static int __init osc_init(void)
3399 unsigned int reqpool_size;
3400 unsigned int reqsize;
3402 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3403 osc_cache_shrink_count, osc_cache_shrink_scan);
3406 /* print an address of _any_ initialized kernel symbol from this
3407 * module, to allow debugging with gdb that doesn't support data
3408 * symbols from modules.*/
3409 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3411 rc = lu_kmem_init(osc_caches);
3415 rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3416 LUSTRE_OSC_NAME, &osc_device_type);
3420 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3422 /* This is obviously too much memory, only prevent overflow here */
3423 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3424 GOTO(out_type, rc = -EINVAL);
3426 reqpool_size = osc_reqpool_mem_max << 20;
3429 while (reqsize < OST_IO_MAXREQSIZE)
3430 reqsize = reqsize << 1;
3433 * We don't enlarge the request count in OSC pool according to
3434 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3435 * tried after normal allocation failed. So a small OSC pool won't
3436 * cause much performance degression in most of cases.
3438 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3440 atomic_set(&osc_pool_req_count, 0);
3441 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3442 ptlrpc_add_rqs_to_pool);
3444 if (osc_rq_pool == NULL)
3445 GOTO(out_type, rc = -ENOMEM);
3447 rc = osc_start_grant_work();
3449 GOTO(out_req_pool, rc);
3454 ptlrpc_free_rq_pool(osc_rq_pool);
3456 class_unregister_type(LUSTRE_OSC_NAME);
3458 lu_kmem_fini(osc_caches);
3463 static void __exit osc_exit(void)
3465 osc_stop_grant_work();
3466 remove_shrinker(osc_cache_shrinker);
3467 class_unregister_type(LUSTRE_OSC_NAME);
3468 lu_kmem_fini(osc_caches);
3469 ptlrpc_free_rq_pool(osc_rq_pool);
3472 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3473 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3474 MODULE_VERSION(LUSTRE_VERSION_STRING);
3475 MODULE_LICENSE("GPL");
3477 module_init(osc_init);
3478 module_exit(osc_exit);