4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
49 #include "osc_internal.h"
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
62 #define osc_grant_args osc_brw_async_args
64 struct osc_setattr_args {
66 obd_enqueue_update_f sa_upcall;
70 struct osc_fsync_args {
71 struct osc_object *fa_obj;
73 obd_enqueue_update_f fa_upcall;
77 struct osc_ladvise_args {
79 obd_enqueue_update_f la_upcall;
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
89 struct ost_body *body;
91 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100 struct ptlrpc_request *req;
101 struct ost_body *body;
105 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
111 ptlrpc_request_free(req);
115 osc_pack_req_body(req, oa);
117 ptlrpc_request_set_replen(req);
119 rc = ptlrpc_queue_wait(req);
123 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
125 GOTO(out, rc = -EPROTO);
127 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
130 oa->o_blksize = cli_brw_size(exp->exp_obd);
131 oa->o_valid |= OBD_MD_FLBLKSZ;
135 ptlrpc_req_finished(req);
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143 struct ptlrpc_request *req;
144 struct ost_body *body;
148 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
150 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
156 ptlrpc_request_free(req);
160 osc_pack_req_body(req, oa);
162 ptlrpc_request_set_replen(req);
164 rc = ptlrpc_queue_wait(req);
168 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
170 GOTO(out, rc = -EPROTO);
172 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176 ptlrpc_req_finished(req);
181 static int osc_setattr_interpret(const struct lu_env *env,
182 struct ptlrpc_request *req, void *args, int rc)
184 struct osc_setattr_args *sa = args;
185 struct ost_body *body;
192 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194 GOTO(out, rc = -EPROTO);
196 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199 rc = sa->sa_upcall(sa->sa_cookie, rc);
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204 obd_enqueue_update_f upcall, void *cookie,
205 struct ptlrpc_request_set *rqset)
207 struct ptlrpc_request *req;
208 struct osc_setattr_args *sa;
213 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
219 ptlrpc_request_free(req);
223 osc_pack_req_body(req, oa);
225 ptlrpc_request_set_replen(req);
227 /* do mds to ost setattr asynchronously */
229 /* Do not wait for response. */
230 ptlrpcd_add_req(req);
232 req->rq_interpret_reply = osc_setattr_interpret;
234 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
235 sa = ptlrpc_req_async_args(req);
237 sa->sa_upcall = upcall;
238 sa->sa_cookie = cookie;
240 if (rqset == PTLRPCD_SET)
241 ptlrpcd_add_req(req);
243 ptlrpc_set_add_req(rqset, req);
249 static int osc_ladvise_interpret(const struct lu_env *env,
250 struct ptlrpc_request *req,
253 struct osc_ladvise_args *la = arg;
254 struct ost_body *body;
260 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
262 GOTO(out, rc = -EPROTO);
264 *la->la_oa = body->oa;
266 rc = la->la_upcall(la->la_cookie, rc);
271 * If rqset is NULL, do not wait for response. Upcall and cookie could also
272 * be NULL in this case
274 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
275 struct ladvise_hdr *ladvise_hdr,
276 obd_enqueue_update_f upcall, void *cookie,
277 struct ptlrpc_request_set *rqset)
279 struct ptlrpc_request *req;
280 struct ost_body *body;
281 struct osc_ladvise_args *la;
283 struct lu_ladvise *req_ladvise;
284 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
285 int num_advise = ladvise_hdr->lah_count;
286 struct ladvise_hdr *req_ladvise_hdr;
289 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
293 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
294 num_advise * sizeof(*ladvise));
295 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
297 ptlrpc_request_free(req);
300 req->rq_request_portal = OST_IO_PORTAL;
301 ptlrpc_at_set_req_timeout(req);
303 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
305 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
308 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
309 &RMF_OST_LADVISE_HDR);
310 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
312 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
313 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
314 ptlrpc_request_set_replen(req);
317 /* Do not wait for response. */
318 ptlrpcd_add_req(req);
322 req->rq_interpret_reply = osc_ladvise_interpret;
323 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
324 la = ptlrpc_req_async_args(req);
326 la->la_upcall = upcall;
327 la->la_cookie = cookie;
329 if (rqset == PTLRPCD_SET)
330 ptlrpcd_add_req(req);
332 ptlrpc_set_add_req(rqset, req);
337 static int osc_create(const struct lu_env *env, struct obd_export *exp,
340 struct ptlrpc_request *req;
341 struct ost_body *body;
346 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
347 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
349 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
351 GOTO(out, rc = -ENOMEM);
353 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
355 ptlrpc_request_free(req);
359 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
362 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
364 ptlrpc_request_set_replen(req);
366 rc = ptlrpc_queue_wait(req);
370 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
372 GOTO(out_req, rc = -EPROTO);
374 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
375 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
377 oa->o_blksize = cli_brw_size(exp->exp_obd);
378 oa->o_valid |= OBD_MD_FLBLKSZ;
380 CDEBUG(D_HA, "transno: %lld\n",
381 lustre_msg_get_transno(req->rq_repmsg));
383 ptlrpc_req_finished(req);
388 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
389 obd_enqueue_update_f upcall, void *cookie)
391 struct ptlrpc_request *req;
392 struct osc_setattr_args *sa;
393 struct obd_import *imp = class_exp2cliimp(exp);
394 struct ost_body *body;
399 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
403 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
405 ptlrpc_request_free(req);
409 osc_set_io_portal(req);
411 ptlrpc_at_set_req_timeout(req);
413 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
415 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
417 ptlrpc_request_set_replen(req);
419 req->rq_interpret_reply = osc_setattr_interpret;
420 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
421 sa = ptlrpc_req_async_args(req);
423 sa->sa_upcall = upcall;
424 sa->sa_cookie = cookie;
426 ptlrpcd_add_req(req);
430 EXPORT_SYMBOL(osc_punch_send);
432 static int osc_sync_interpret(const struct lu_env *env,
433 struct ptlrpc_request *req, void *args, int rc)
435 struct osc_fsync_args *fa = args;
436 struct ost_body *body;
437 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
438 unsigned long valid = 0;
439 struct cl_object *obj;
445 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
447 CERROR("can't unpack ost_body\n");
448 GOTO(out, rc = -EPROTO);
451 *fa->fa_oa = body->oa;
452 obj = osc2cl(fa->fa_obj);
454 /* Update osc object's blocks attribute */
455 cl_object_attr_lock(obj);
456 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
457 attr->cat_blocks = body->oa.o_blocks;
462 cl_object_attr_update(env, obj, attr, valid);
463 cl_object_attr_unlock(obj);
466 rc = fa->fa_upcall(fa->fa_cookie, rc);
470 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
471 obd_enqueue_update_f upcall, void *cookie,
472 struct ptlrpc_request_set *rqset)
474 struct obd_export *exp = osc_export(obj);
475 struct ptlrpc_request *req;
476 struct ost_body *body;
477 struct osc_fsync_args *fa;
481 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
485 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
487 ptlrpc_request_free(req);
491 /* overload the size and blocks fields in the oa with start/end */
492 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
494 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
496 ptlrpc_request_set_replen(req);
497 req->rq_interpret_reply = osc_sync_interpret;
499 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
500 fa = ptlrpc_req_async_args(req);
503 fa->fa_upcall = upcall;
504 fa->fa_cookie = cookie;
506 if (rqset == PTLRPCD_SET)
507 ptlrpcd_add_req(req);
509 ptlrpc_set_add_req(rqset, req);
514 /* Find and cancel locally locks matched by @mode in the resource found by
515 * @objid. Found locks are added into @cancel list. Returns the amount of
516 * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518 struct list_head *cancels,
519 enum ldlm_mode mode, __u64 lock_flags)
521 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522 struct ldlm_res_id res_id;
523 struct ldlm_resource *res;
527 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528 * export) but disabled through procfs (flag in NS).
530 * This distinguishes from a case when ELC is not supported originally,
531 * when we still want to cancel locks in advance and just cancel them
532 * locally, without sending any RPC. */
533 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
536 ostid_build_res_name(&oa->o_oi, &res_id);
537 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
541 LDLM_RESOURCE_ADDREF(res);
542 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543 lock_flags, 0, NULL);
544 LDLM_RESOURCE_DELREF(res);
545 ldlm_resource_putref(res);
549 static int osc_destroy_interpret(const struct lu_env *env,
550 struct ptlrpc_request *req, void *args, int rc)
552 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
554 atomic_dec(&cli->cl_destroy_in_flight);
555 wake_up(&cli->cl_destroy_waitq);
560 static int osc_can_send_destroy(struct client_obd *cli)
562 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563 cli->cl_max_rpcs_in_flight) {
564 /* The destroy request can be sent */
567 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568 cli->cl_max_rpcs_in_flight) {
570 * The counter has been modified between the two atomic
573 wake_up(&cli->cl_destroy_waitq);
578 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
581 struct client_obd *cli = &exp->exp_obd->u.cli;
582 struct ptlrpc_request *req;
583 struct ost_body *body;
584 struct list_head cancels = LIST_HEAD_INIT(cancels);
589 CDEBUG(D_INFO, "oa NULL\n");
593 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
594 LDLM_FL_DISCARD_DATA);
596 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
598 ldlm_lock_list_put(&cancels, l_bl_ast, count);
602 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
605 ptlrpc_request_free(req);
609 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
610 ptlrpc_at_set_req_timeout(req);
612 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
616 ptlrpc_request_set_replen(req);
618 req->rq_interpret_reply = osc_destroy_interpret;
619 if (!osc_can_send_destroy(cli)) {
620 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
623 * Wait until the number of on-going destroy RPCs drops
624 * under max_rpc_in_flight
626 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
627 osc_can_send_destroy(cli), &lwi);
629 ptlrpc_req_finished(req);
634 /* Do not wait for response */
635 ptlrpcd_add_req(req);
639 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
642 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
644 LASSERT(!(oa->o_valid & bits));
647 spin_lock(&cli->cl_loi_list_lock);
648 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
649 oa->o_dirty = cli->cl_dirty_grant;
651 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
652 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
653 cli->cl_dirty_max_pages)) {
654 CERROR("dirty %lu - %lu > dirty_max %lu\n",
655 cli->cl_dirty_pages, cli->cl_dirty_transit,
656 cli->cl_dirty_max_pages);
658 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
659 atomic_long_read(&obd_dirty_transit_pages) >
660 (long)(obd_max_dirty_pages + 1))) {
661 /* The atomic_read() allowing the atomic_inc() are
662 * not covered by a lock thus they may safely race and trip
663 * this CERROR() unless we add in a small fudge factor (+1). */
664 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
665 cli_name(cli), atomic_long_read(&obd_dirty_pages),
666 atomic_long_read(&obd_dirty_transit_pages),
667 obd_max_dirty_pages);
669 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
671 CERROR("dirty %lu - dirty_max %lu too big???\n",
672 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
675 unsigned long nrpages;
676 unsigned long undirty;
678 nrpages = cli->cl_max_pages_per_rpc;
679 nrpages *= cli->cl_max_rpcs_in_flight + 1;
680 nrpages = max(nrpages, cli->cl_dirty_max_pages);
681 undirty = nrpages << PAGE_SHIFT;
682 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
686 /* take extent tax into account when asking for more
688 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
689 cli->cl_max_extent_pages;
690 undirty += nrextents * cli->cl_grant_extent_tax;
692 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
693 * to add extent tax, etc.
695 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
696 ~(PTLRPC_MAX_BRW_SIZE * 4UL));
698 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
699 oa->o_dropped = cli->cl_lost_grant;
700 cli->cl_lost_grant = 0;
701 spin_unlock(&cli->cl_loi_list_lock);
702 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
703 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
706 void osc_update_next_shrink(struct client_obd *cli)
708 cli->cl_next_shrink_grant = ktime_get_seconds() +
709 cli->cl_grant_shrink_interval;
711 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
712 cli->cl_next_shrink_grant);
715 static void __osc_update_grant(struct client_obd *cli, u64 grant)
717 spin_lock(&cli->cl_loi_list_lock);
718 cli->cl_avail_grant += grant;
719 spin_unlock(&cli->cl_loi_list_lock);
722 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
724 if (body->oa.o_valid & OBD_MD_FLGRANT) {
725 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
726 __osc_update_grant(cli, body->oa.o_grant);
731 * grant thread data for shrinking space.
733 struct grant_thread_data {
734 struct list_head gtd_clients;
735 struct mutex gtd_mutex;
736 unsigned long gtd_stopped:1;
738 static struct grant_thread_data client_gtd;
740 static int osc_shrink_grant_interpret(const struct lu_env *env,
741 struct ptlrpc_request *req,
744 struct osc_grant_args *aa = args;
745 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
746 struct ost_body *body;
749 __osc_update_grant(cli, aa->aa_oa->o_grant);
753 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
755 osc_update_grant(cli, body);
757 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
762 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
764 spin_lock(&cli->cl_loi_list_lock);
765 oa->o_grant = cli->cl_avail_grant / 4;
766 cli->cl_avail_grant -= oa->o_grant;
767 spin_unlock(&cli->cl_loi_list_lock);
768 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
769 oa->o_valid |= OBD_MD_FLFLAGS;
772 oa->o_flags |= OBD_FL_SHRINK_GRANT;
773 osc_update_next_shrink(cli);
776 /* Shrink the current grant, either from some large amount to enough for a
777 * full set of in-flight RPCs, or if we have already shrunk to that limit
778 * then to enough for a single RPC. This avoids keeping more grant than
779 * needed, and avoids shrinking the grant piecemeal. */
780 static int osc_shrink_grant(struct client_obd *cli)
782 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
783 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
785 spin_lock(&cli->cl_loi_list_lock);
786 if (cli->cl_avail_grant <= target_bytes)
787 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
788 spin_unlock(&cli->cl_loi_list_lock);
790 return osc_shrink_grant_to_target(cli, target_bytes);
793 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
796 struct ost_body *body;
799 spin_lock(&cli->cl_loi_list_lock);
800 /* Don't shrink if we are already above or below the desired limit
801 * We don't want to shrink below a single RPC, as that will negatively
802 * impact block allocation and long-term performance. */
803 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
804 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
806 if (target_bytes >= cli->cl_avail_grant) {
807 spin_unlock(&cli->cl_loi_list_lock);
810 spin_unlock(&cli->cl_loi_list_lock);
816 osc_announce_cached(cli, &body->oa, 0);
818 spin_lock(&cli->cl_loi_list_lock);
819 if (target_bytes >= cli->cl_avail_grant) {
820 /* available grant has changed since target calculation */
821 spin_unlock(&cli->cl_loi_list_lock);
822 GOTO(out_free, rc = 0);
824 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
825 cli->cl_avail_grant = target_bytes;
826 spin_unlock(&cli->cl_loi_list_lock);
827 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
828 body->oa.o_valid |= OBD_MD_FLFLAGS;
829 body->oa.o_flags = 0;
831 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
832 osc_update_next_shrink(cli);
834 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
835 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
836 sizeof(*body), body, NULL);
838 __osc_update_grant(cli, body->oa.o_grant);
844 static int osc_should_shrink_grant(struct client_obd *client)
846 time64_t next_shrink = client->cl_next_shrink_grant;
848 if (client->cl_import == NULL)
851 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
852 OBD_CONNECT_GRANT_SHRINK) == 0)
855 if (ktime_get_seconds() >= next_shrink - 5) {
856 /* Get the current RPC size directly, instead of going via:
857 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
858 * Keep comment here so that it can be found by searching. */
859 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
861 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
862 client->cl_avail_grant > brw_size)
865 osc_update_next_shrink(client);
870 #define GRANT_SHRINK_RPC_BATCH 100
872 static struct delayed_work work;
874 static void osc_grant_work_handler(struct work_struct *data)
876 struct client_obd *cli;
878 bool init_next_shrink = true;
879 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
882 mutex_lock(&client_gtd.gtd_mutex);
883 list_for_each_entry(cli, &client_gtd.gtd_clients,
885 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
886 osc_should_shrink_grant(cli)) {
887 osc_shrink_grant(cli);
891 if (!init_next_shrink) {
892 if (cli->cl_next_shrink_grant < next_shrink &&
893 cli->cl_next_shrink_grant > ktime_get_seconds())
894 next_shrink = cli->cl_next_shrink_grant;
896 init_next_shrink = false;
897 next_shrink = cli->cl_next_shrink_grant;
900 mutex_unlock(&client_gtd.gtd_mutex);
902 if (client_gtd.gtd_stopped == 1)
905 if (next_shrink > ktime_get_seconds())
906 schedule_delayed_work(&work, msecs_to_jiffies(
907 (next_shrink - ktime_get_seconds()) *
910 schedule_work(&work.work);
913 void osc_schedule_grant_work(void)
915 cancel_delayed_work_sync(&work);
916 schedule_work(&work.work);
920 * Start grant thread for returing grant to server for idle clients.
922 static int osc_start_grant_work(void)
924 client_gtd.gtd_stopped = 0;
925 mutex_init(&client_gtd.gtd_mutex);
926 INIT_LIST_HEAD(&client_gtd.gtd_clients);
928 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
929 schedule_work(&work.work);
934 static void osc_stop_grant_work(void)
936 client_gtd.gtd_stopped = 1;
937 cancel_delayed_work_sync(&work);
940 static void osc_add_grant_list(struct client_obd *client)
942 mutex_lock(&client_gtd.gtd_mutex);
943 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
944 mutex_unlock(&client_gtd.gtd_mutex);
947 static void osc_del_grant_list(struct client_obd *client)
949 if (list_empty(&client->cl_grant_chain))
952 mutex_lock(&client_gtd.gtd_mutex);
953 list_del_init(&client->cl_grant_chain);
954 mutex_unlock(&client_gtd.gtd_mutex);
957 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
960 * ocd_grant is the total grant amount we're expect to hold: if we've
961 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
962 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
965 * race is tolerable here: if we're evicted, but imp_state already
966 * left EVICTED state, then cl_dirty_pages must be 0 already.
968 spin_lock(&cli->cl_loi_list_lock);
969 cli->cl_avail_grant = ocd->ocd_grant;
970 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
971 cli->cl_avail_grant -= cli->cl_reserved_grant;
972 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
973 cli->cl_avail_grant -= cli->cl_dirty_grant;
975 cli->cl_avail_grant -=
976 cli->cl_dirty_pages << PAGE_SHIFT;
979 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
983 /* overhead for each extent insertion */
984 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
985 /* determine the appropriate chunk size used by osc_extent. */
986 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
987 ocd->ocd_grant_blkbits);
988 /* max_pages_per_rpc must be chunk aligned */
989 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
990 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
991 ~chunk_mask) & chunk_mask;
992 /* determine maximum extent size, in #pages */
993 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
994 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
995 if (cli->cl_max_extent_pages == 0)
996 cli->cl_max_extent_pages = 1;
998 cli->cl_grant_extent_tax = 0;
999 cli->cl_chunkbits = PAGE_SHIFT;
1000 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1002 spin_unlock(&cli->cl_loi_list_lock);
1004 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1005 "chunk bits: %d cl_max_extent_pages: %d\n",
1007 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1008 cli->cl_max_extent_pages);
1010 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1011 osc_add_grant_list(cli);
1013 EXPORT_SYMBOL(osc_init_grant);
1015 /* We assume that the reason this OSC got a short read is because it read
1016 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1017 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1018 * this stripe never got written at or beyond this stripe offset yet. */
1019 static void handle_short_read(int nob_read, size_t page_count,
1020 struct brw_page **pga)
1025 /* skip bytes read OK */
1026 while (nob_read > 0) {
1027 LASSERT (page_count > 0);
1029 if (pga[i]->count > nob_read) {
1030 /* EOF inside this page */
1031 ptr = kmap(pga[i]->pg) +
1032 (pga[i]->off & ~PAGE_MASK);
1033 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1040 nob_read -= pga[i]->count;
1045 /* zero remaining pages */
1046 while (page_count-- > 0) {
1047 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1048 memset(ptr, 0, pga[i]->count);
1054 static int check_write_rcs(struct ptlrpc_request *req,
1055 int requested_nob, int niocount,
1056 size_t page_count, struct brw_page **pga)
1061 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1062 sizeof(*remote_rcs) *
1064 if (remote_rcs == NULL) {
1065 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1069 /* return error if any niobuf was in error */
1070 for (i = 0; i < niocount; i++) {
1071 if ((int)remote_rcs[i] < 0) {
1072 CDEBUG(D_INFO, "rc[%d]: %d req %p\n",
1073 i, remote_rcs[i], req);
1074 return remote_rcs[i];
1077 if (remote_rcs[i] != 0) {
1078 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1079 i, remote_rcs[i], req);
1083 if (req->rq_bulk != NULL &&
1084 req->rq_bulk->bd_nob_transferred != requested_nob) {
1085 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1086 req->rq_bulk->bd_nob_transferred, requested_nob);
1093 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1095 if (p1->flag != p2->flag) {
1096 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1097 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1098 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1100 /* warn if we try to combine flags that we don't know to be
1101 * safe to combine */
1102 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1103 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1104 "report this at https://jira.whamcloud.com/\n",
1105 p1->flag, p2->flag);
1110 return (p1->off + p1->count == p2->off);
1113 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1114 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1115 size_t pg_count, struct brw_page **pga,
1116 int opc, obd_dif_csum_fn *fn,
1120 struct ahash_request *req;
1121 /* Used Adler as the default checksum type on top of DIF tags */
1122 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1123 struct page *__page;
1124 unsigned char *buffer;
1126 unsigned int bufsize;
1128 int used_number = 0;
1134 LASSERT(pg_count > 0);
1136 __page = alloc_page(GFP_KERNEL);
1140 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1143 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1144 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1148 buffer = kmap(__page);
1149 guard_start = (__u16 *)buffer;
1150 guard_number = PAGE_SIZE / sizeof(*guard_start);
1151 while (nob > 0 && pg_count > 0) {
1152 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1154 /* corrupt the data before we compute the checksum, to
1155 * simulate an OST->client data error */
1156 if (unlikely(i == 0 && opc == OST_READ &&
1157 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1158 unsigned char *ptr = kmap(pga[i]->pg);
1159 int off = pga[i]->off & ~PAGE_MASK;
1161 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1166 * The left guard number should be able to hold checksums of a
1169 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1170 pga[i]->off & ~PAGE_MASK,
1172 guard_start + used_number,
1173 guard_number - used_number,
1179 used_number += used;
1180 if (used_number == guard_number) {
1181 cfs_crypto_hash_update_page(req, __page, 0,
1182 used_number * sizeof(*guard_start));
1186 nob -= pga[i]->count;
1194 if (used_number != 0)
1195 cfs_crypto_hash_update_page(req, __page, 0,
1196 used_number * sizeof(*guard_start));
1198 bufsize = sizeof(cksum);
1199 cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1201 /* For sending we only compute the wrong checksum instead
1202 * of corrupting the data so it is still correct on a redo */
1203 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1208 __free_page(__page);
1211 #else /* !CONFIG_CRC_T10DIF */
1212 #define obd_dif_ip_fn NULL
1213 #define obd_dif_crc_fn NULL
1214 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \
1216 #endif /* CONFIG_CRC_T10DIF */
1218 static int osc_checksum_bulk(int nob, size_t pg_count,
1219 struct brw_page **pga, int opc,
1220 enum cksum_types cksum_type,
1224 struct ahash_request *req;
1225 unsigned int bufsize;
1226 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1228 LASSERT(pg_count > 0);
1230 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1232 CERROR("Unable to initialize checksum hash %s\n",
1233 cfs_crypto_hash_name(cfs_alg));
1234 return PTR_ERR(req);
1237 while (nob > 0 && pg_count > 0) {
1238 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1240 /* corrupt the data before we compute the checksum, to
1241 * simulate an OST->client data error */
1242 if (i == 0 && opc == OST_READ &&
1243 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1244 unsigned char *ptr = kmap(pga[i]->pg);
1245 int off = pga[i]->off & ~PAGE_MASK;
1247 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1250 cfs_crypto_hash_update_page(req, pga[i]->pg,
1251 pga[i]->off & ~PAGE_MASK,
1253 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1254 (int)(pga[i]->off & ~PAGE_MASK));
1256 nob -= pga[i]->count;
1261 bufsize = sizeof(*cksum);
1262 cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1264 /* For sending we only compute the wrong checksum instead
1265 * of corrupting the data so it is still correct on a redo */
1266 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1272 static int osc_checksum_bulk_rw(const char *obd_name,
1273 enum cksum_types cksum_type,
1274 int nob, size_t pg_count,
1275 struct brw_page **pga, int opc,
1278 obd_dif_csum_fn *fn = NULL;
1279 int sector_size = 0;
1283 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1286 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1287 opc, fn, sector_size, check_sum);
1289 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1296 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1297 u32 page_count, struct brw_page **pga,
1298 struct ptlrpc_request **reqp, int resend)
1300 struct ptlrpc_request *req;
1301 struct ptlrpc_bulk_desc *desc;
1302 struct ost_body *body;
1303 struct obd_ioobj *ioobj;
1304 struct niobuf_remote *niobuf;
1305 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1306 struct osc_brw_async_args *aa;
1307 struct req_capsule *pill;
1308 struct brw_page *pg_prev;
1310 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1313 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1314 RETURN(-ENOMEM); /* Recoverable */
1315 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1316 RETURN(-EINVAL); /* Fatal */
1318 if ((cmd & OBD_BRW_WRITE) != 0) {
1320 req = ptlrpc_request_alloc_pool(cli->cl_import,
1322 &RQF_OST_BRW_WRITE);
1325 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1330 for (niocount = i = 1; i < page_count; i++) {
1331 if (!can_merge_pages(pga[i - 1], pga[i]))
1335 pill = &req->rq_pill;
1336 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1338 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1339 niocount * sizeof(*niobuf));
1341 for (i = 0; i < page_count; i++)
1342 short_io_size += pga[i]->count;
1344 /* Check if read/write is small enough to be a short io. */
1345 if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1346 !imp_connect_shortio(cli->cl_import))
1349 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1350 opc == OST_READ ? 0 : short_io_size);
1351 if (opc == OST_READ)
1352 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1355 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1357 ptlrpc_request_free(req);
1360 osc_set_io_portal(req);
1362 ptlrpc_at_set_req_timeout(req);
1363 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1365 req->rq_no_retry_einprogress = 1;
1367 if (short_io_size != 0) {
1369 short_io_buf = NULL;
1373 desc = ptlrpc_prep_bulk_imp(req, page_count,
1374 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1375 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1376 PTLRPC_BULK_PUT_SINK) |
1377 PTLRPC_BULK_BUF_KIOV,
1379 &ptlrpc_bulk_kiov_pin_ops);
1382 GOTO(out, rc = -ENOMEM);
1383 /* NB request now owns desc and will free it when it gets freed */
1385 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1386 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1387 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1388 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1390 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1392 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1393 * and from_kgid(), because they are asynchronous. Fortunately, variable
1394 * oa contains valid o_uid and o_gid in these two operations.
1395 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1396 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1397 * other process logic */
1398 body->oa.o_uid = oa->o_uid;
1399 body->oa.o_gid = oa->o_gid;
1401 obdo_to_ioobj(oa, ioobj);
1402 ioobj->ioo_bufcnt = niocount;
1403 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1404 * that might be send for this request. The actual number is decided
1405 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1406 * "max - 1" for old client compatibility sending "0", and also so the
1407 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1409 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1411 ioobj_max_brw_set(ioobj, 0);
1413 if (short_io_size != 0) {
1414 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1415 body->oa.o_valid |= OBD_MD_FLFLAGS;
1416 body->oa.o_flags = 0;
1418 body->oa.o_flags |= OBD_FL_SHORT_IO;
1419 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1421 if (opc == OST_WRITE) {
1422 short_io_buf = req_capsule_client_get(pill,
1424 LASSERT(short_io_buf != NULL);
1428 LASSERT(page_count > 0);
1430 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1431 struct brw_page *pg = pga[i];
1432 int poff = pg->off & ~PAGE_MASK;
1434 LASSERT(pg->count > 0);
1435 /* make sure there is no gap in the middle of page array */
1436 LASSERTF(page_count == 1 ||
1437 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1438 ergo(i > 0 && i < page_count - 1,
1439 poff == 0 && pg->count == PAGE_SIZE) &&
1440 ergo(i == page_count - 1, poff == 0)),
1441 "i: %d/%d pg: %p off: %llu, count: %u\n",
1442 i, page_count, pg, pg->off, pg->count);
1443 LASSERTF(i == 0 || pg->off > pg_prev->off,
1444 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1445 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1447 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1448 pg_prev->pg, page_private(pg_prev->pg),
1449 pg_prev->pg->index, pg_prev->off);
1450 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1451 (pg->flag & OBD_BRW_SRVLOCK));
1452 if (short_io_size != 0 && opc == OST_WRITE) {
1453 unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1455 LASSERT(short_io_size >= requested_nob + pg->count);
1456 memcpy(short_io_buf + requested_nob,
1459 ll_kunmap_atomic(ptr, KM_USER0);
1460 } else if (short_io_size == 0) {
1461 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1464 requested_nob += pg->count;
1466 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1468 niobuf->rnb_len += pg->count;
1470 niobuf->rnb_offset = pg->off;
1471 niobuf->rnb_len = pg->count;
1472 niobuf->rnb_flags = pg->flag;
1477 LASSERTF((void *)(niobuf - niocount) ==
1478 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1479 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1480 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1482 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1484 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1485 body->oa.o_valid |= OBD_MD_FLFLAGS;
1486 body->oa.o_flags = 0;
1488 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1491 if (osc_should_shrink_grant(cli))
1492 osc_shrink_grant_local(cli, &body->oa);
1494 /* size[REQ_REC_OFF] still sizeof (*body) */
1495 if (opc == OST_WRITE) {
1496 if (cli->cl_checksum &&
1497 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1498 /* store cl_cksum_type in a local variable since
1499 * it can be changed via lprocfs */
1500 enum cksum_types cksum_type = cli->cl_cksum_type;
1502 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1503 body->oa.o_flags = 0;
1505 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1507 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1509 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1510 requested_nob, page_count,
1514 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1518 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1521 /* save this in 'oa', too, for later checking */
1522 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1523 oa->o_flags |= obd_cksum_type_pack(obd_name,
1526 /* clear out the checksum flag, in case this is a
1527 * resend but cl_checksum is no longer set. b=11238 */
1528 oa->o_valid &= ~OBD_MD_FLCKSUM;
1530 oa->o_cksum = body->oa.o_cksum;
1531 /* 1 RC per niobuf */
1532 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1533 sizeof(__u32) * niocount);
1535 if (cli->cl_checksum &&
1536 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1537 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1538 body->oa.o_flags = 0;
1539 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1540 cli->cl_cksum_type);
1541 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1544 /* Client cksum has been already copied to wire obdo in previous
1545 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1546 * resent due to cksum error, this will allow Server to
1547 * check+dump pages on its side */
1549 ptlrpc_request_set_replen(req);
1551 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1552 aa = ptlrpc_req_async_args(req);
1554 aa->aa_requested_nob = requested_nob;
1555 aa->aa_nio_count = niocount;
1556 aa->aa_page_count = page_count;
1560 INIT_LIST_HEAD(&aa->aa_oaps);
1563 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1564 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1565 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1566 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1570 ptlrpc_req_finished(req);
1574 char dbgcksum_file_name[PATH_MAX];
1576 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1577 struct brw_page **pga, __u32 server_cksum,
1585 /* will only keep dump of pages on first error for the same range in
1586 * file/fid, not during the resends/retries. */
1587 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1588 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1589 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1590 libcfs_debug_file_path_arr :
1591 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1592 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1593 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1594 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1596 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1597 client_cksum, server_cksum);
1598 filp = filp_open(dbgcksum_file_name,
1599 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1603 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1604 "checksum error: rc = %d\n", dbgcksum_file_name,
1607 CERROR("%s: can't open to dump pages with checksum "
1608 "error: rc = %d\n", dbgcksum_file_name, rc);
1612 for (i = 0; i < page_count; i++) {
1613 len = pga[i]->count;
1614 buf = kmap(pga[i]->pg);
1616 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1618 CERROR("%s: wanted to write %u but got %d "
1619 "error\n", dbgcksum_file_name, len, rc);
1624 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1625 dbgcksum_file_name, rc);
1630 rc = vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1632 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1633 filp_close(filp, NULL);
1638 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1639 __u32 client_cksum, __u32 server_cksum,
1640 struct osc_brw_async_args *aa)
1642 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1643 enum cksum_types cksum_type;
1644 obd_dif_csum_fn *fn = NULL;
1645 int sector_size = 0;
1650 if (server_cksum == client_cksum) {
1651 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1655 if (aa->aa_cli->cl_checksum_dump)
1656 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1657 server_cksum, client_cksum);
1659 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1662 switch (cksum_type) {
1663 case OBD_CKSUM_T10IP512:
1667 case OBD_CKSUM_T10IP4K:
1671 case OBD_CKSUM_T10CRC512:
1672 fn = obd_dif_crc_fn;
1675 case OBD_CKSUM_T10CRC4K:
1676 fn = obd_dif_crc_fn;
1684 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1685 aa->aa_page_count, aa->aa_ppga,
1686 OST_WRITE, fn, sector_size,
1689 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1690 aa->aa_ppga, OST_WRITE, cksum_type,
1694 msg = "failed to calculate the client write checksum";
1695 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1696 msg = "the server did not use the checksum type specified in "
1697 "the original request - likely a protocol problem";
1698 else if (new_cksum == server_cksum)
1699 msg = "changed on the client after we checksummed it - "
1700 "likely false positive due to mmap IO (bug 11742)";
1701 else if (new_cksum == client_cksum)
1702 msg = "changed in transit before arrival at OST";
1704 msg = "changed in transit AND doesn't match the original - "
1705 "likely false positive due to mmap IO (bug 11742)";
1707 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1708 DFID " object "DOSTID" extent [%llu-%llu], original "
1709 "client csum %x (type %x), server csum %x (type %x),"
1710 " client csum now %x\n",
1711 obd_name, msg, libcfs_nid2str(peer->nid),
1712 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1713 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1714 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1715 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1716 aa->aa_ppga[aa->aa_page_count - 1]->off +
1717 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1719 obd_cksum_type_unpack(aa->aa_oa->o_flags),
1720 server_cksum, cksum_type, new_cksum);
1724 /* Note rc enters this function as number of bytes transferred */
1725 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1727 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1728 struct client_obd *cli = aa->aa_cli;
1729 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1730 const struct lnet_process_id *peer =
1731 &req->rq_import->imp_connection->c_peer;
1732 struct ost_body *body;
1733 u32 client_cksum = 0;
1736 if (rc < 0 && rc != -EDQUOT) {
1737 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1741 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1742 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1744 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1748 /* set/clear over quota flag for a uid/gid/projid */
1749 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1750 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1751 unsigned qid[LL_MAXQUOTAS] = {
1752 body->oa.o_uid, body->oa.o_gid,
1753 body->oa.o_projid };
1754 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1755 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1756 body->oa.o_valid, body->oa.o_flags);
1757 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1761 osc_update_grant(cli, body);
1766 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1767 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1769 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1771 CERROR("Unexpected +ve rc %d\n", rc);
1775 if (req->rq_bulk != NULL &&
1776 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1779 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1780 check_write_checksum(&body->oa, peer, client_cksum,
1781 body->oa.o_cksum, aa))
1784 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1785 aa->aa_page_count, aa->aa_ppga);
1789 /* The rest of this function executes only for OST_READs */
1791 if (req->rq_bulk == NULL) {
1792 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1794 LASSERT(rc == req->rq_status);
1796 /* if unwrap_bulk failed, return -EAGAIN to retry */
1797 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1800 GOTO(out, rc = -EAGAIN);
1802 if (rc > aa->aa_requested_nob) {
1803 CERROR("Unexpected rc %d (%d requested)\n", rc,
1804 aa->aa_requested_nob);
1808 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1809 CERROR ("Unexpected rc %d (%d transferred)\n",
1810 rc, req->rq_bulk->bd_nob_transferred);
1814 if (req->rq_bulk == NULL) {
1816 int nob, pg_count, i = 0;
1819 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1820 pg_count = aa->aa_page_count;
1821 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1824 while (nob > 0 && pg_count > 0) {
1826 int count = aa->aa_ppga[i]->count > nob ?
1827 nob : aa->aa_ppga[i]->count;
1829 CDEBUG(D_CACHE, "page %p count %d\n",
1830 aa->aa_ppga[i]->pg, count);
1831 ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1832 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1834 ll_kunmap_atomic((void *) ptr, KM_USER0);
1843 if (rc < aa->aa_requested_nob)
1844 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1846 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1847 static int cksum_counter;
1848 u32 server_cksum = body->oa.o_cksum;
1851 enum cksum_types cksum_type;
1852 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1853 body->oa.o_flags : 0;
1855 cksum_type = obd_cksum_type_unpack(o_flags);
1856 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1857 aa->aa_page_count, aa->aa_ppga,
1858 OST_READ, &client_cksum);
1862 if (req->rq_bulk != NULL &&
1863 peer->nid != req->rq_bulk->bd_sender) {
1865 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1868 if (server_cksum != client_cksum) {
1869 struct ost_body *clbody;
1870 u32 page_count = aa->aa_page_count;
1872 clbody = req_capsule_client_get(&req->rq_pill,
1874 if (cli->cl_checksum_dump)
1875 dump_all_bulk_pages(&clbody->oa, page_count,
1876 aa->aa_ppga, server_cksum,
1879 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1880 "%s%s%s inode "DFID" object "DOSTID
1881 " extent [%llu-%llu], client %x, "
1882 "server %x, cksum_type %x\n",
1884 libcfs_nid2str(peer->nid),
1886 clbody->oa.o_valid & OBD_MD_FLFID ?
1887 clbody->oa.o_parent_seq : 0ULL,
1888 clbody->oa.o_valid & OBD_MD_FLFID ?
1889 clbody->oa.o_parent_oid : 0,
1890 clbody->oa.o_valid & OBD_MD_FLFID ?
1891 clbody->oa.o_parent_ver : 0,
1892 POSTID(&body->oa.o_oi),
1893 aa->aa_ppga[0]->off,
1894 aa->aa_ppga[page_count-1]->off +
1895 aa->aa_ppga[page_count-1]->count - 1,
1896 client_cksum, server_cksum,
1899 aa->aa_oa->o_cksum = client_cksum;
1903 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1906 } else if (unlikely(client_cksum)) {
1907 static int cksum_missed;
1910 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1911 CERROR("Checksum %u requested from %s but not sent\n",
1912 cksum_missed, libcfs_nid2str(peer->nid));
1918 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1919 aa->aa_oa, &body->oa);
1924 static int osc_brw_redo_request(struct ptlrpc_request *request,
1925 struct osc_brw_async_args *aa, int rc)
1927 struct ptlrpc_request *new_req;
1928 struct osc_brw_async_args *new_aa;
1929 struct osc_async_page *oap;
1932 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1933 "redo for recoverable error %d", rc);
1935 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1936 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1937 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1938 aa->aa_ppga, &new_req, 1);
1942 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1943 if (oap->oap_request != NULL) {
1944 LASSERTF(request == oap->oap_request,
1945 "request %p != oap_request %p\n",
1946 request, oap->oap_request);
1947 if (oap->oap_interrupted) {
1948 ptlrpc_req_finished(new_req);
1954 * New request takes over pga and oaps from old request.
1955 * Note that copying a list_head doesn't work, need to move it...
1958 new_req->rq_interpret_reply = request->rq_interpret_reply;
1959 new_req->rq_async_args = request->rq_async_args;
1960 new_req->rq_commit_cb = request->rq_commit_cb;
1961 /* cap resend delay to the current request timeout, this is similar to
1962 * what ptlrpc does (see after_reply()) */
1963 if (aa->aa_resends > new_req->rq_timeout)
1964 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1966 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1967 new_req->rq_generation_set = 1;
1968 new_req->rq_import_generation = request->rq_import_generation;
1970 new_aa = ptlrpc_req_async_args(new_req);
1972 INIT_LIST_HEAD(&new_aa->aa_oaps);
1973 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1974 INIT_LIST_HEAD(&new_aa->aa_exts);
1975 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1976 new_aa->aa_resends = aa->aa_resends;
1978 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1979 if (oap->oap_request) {
1980 ptlrpc_req_finished(oap->oap_request);
1981 oap->oap_request = ptlrpc_request_addref(new_req);
1985 /* XXX: This code will run into problem if we're going to support
1986 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1987 * and wait for all of them to be finished. We should inherit request
1988 * set from old request. */
1989 ptlrpcd_add_req(new_req);
1991 DEBUG_REQ(D_INFO, new_req, "new request");
1996 * ugh, we want disk allocation on the target to happen in offset order. we'll
1997 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1998 * fine for our small page arrays and doesn't require allocation. its an
1999 * insertion sort that swaps elements that are strides apart, shrinking the
2000 * stride down until its '1' and the array is sorted.
2002 static void sort_brw_pages(struct brw_page **array, int num)
2005 struct brw_page *tmp;
2009 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2014 for (i = stride ; i < num ; i++) {
2017 while (j >= stride && array[j - stride]->off > tmp->off) {
2018 array[j] = array[j - stride];
2023 } while (stride > 1);
2026 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2028 LASSERT(ppga != NULL);
2029 OBD_FREE(ppga, sizeof(*ppga) * count);
2032 static int brw_interpret(const struct lu_env *env,
2033 struct ptlrpc_request *req, void *args, int rc)
2035 struct osc_brw_async_args *aa = args;
2036 struct osc_extent *ext;
2037 struct osc_extent *tmp;
2038 struct client_obd *cli = aa->aa_cli;
2039 unsigned long transferred = 0;
2043 rc = osc_brw_fini_request(req, rc);
2044 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2046 * When server returns -EINPROGRESS, client should always retry
2047 * regardless of the number of times the bulk was resent already.
2049 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2050 if (req->rq_import_generation !=
2051 req->rq_import->imp_generation) {
2052 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2053 ""DOSTID", rc = %d.\n",
2054 req->rq_import->imp_obd->obd_name,
2055 POSTID(&aa->aa_oa->o_oi), rc);
2056 } else if (rc == -EINPROGRESS ||
2057 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2058 rc = osc_brw_redo_request(req, aa, rc);
2060 CERROR("%s: too many resent retries for object: "
2061 "%llu:%llu, rc = %d.\n",
2062 req->rq_import->imp_obd->obd_name,
2063 POSTID(&aa->aa_oa->o_oi), rc);
2068 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2073 struct obdo *oa = aa->aa_oa;
2074 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2075 unsigned long valid = 0;
2076 struct cl_object *obj;
2077 struct osc_async_page *last;
2079 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2080 obj = osc2cl(last->oap_obj);
2082 cl_object_attr_lock(obj);
2083 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2084 attr->cat_blocks = oa->o_blocks;
2085 valid |= CAT_BLOCKS;
2087 if (oa->o_valid & OBD_MD_FLMTIME) {
2088 attr->cat_mtime = oa->o_mtime;
2091 if (oa->o_valid & OBD_MD_FLATIME) {
2092 attr->cat_atime = oa->o_atime;
2095 if (oa->o_valid & OBD_MD_FLCTIME) {
2096 attr->cat_ctime = oa->o_ctime;
2100 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2101 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2102 loff_t last_off = last->oap_count + last->oap_obj_off +
2105 /* Change file size if this is an out of quota or
2106 * direct IO write and it extends the file size */
2107 if (loi->loi_lvb.lvb_size < last_off) {
2108 attr->cat_size = last_off;
2111 /* Extend KMS if it's not a lockless write */
2112 if (loi->loi_kms < last_off &&
2113 oap2osc_page(last)->ops_srvlock == 0) {
2114 attr->cat_kms = last_off;
2120 cl_object_attr_update(env, obj, attr, valid);
2121 cl_object_attr_unlock(obj);
2123 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2125 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2126 osc_inc_unstable_pages(req);
2128 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2129 list_del_init(&ext->oe_link);
2130 osc_extent_finish(env, ext, 1,
2131 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2133 LASSERT(list_empty(&aa->aa_exts));
2134 LASSERT(list_empty(&aa->aa_oaps));
2136 transferred = (req->rq_bulk == NULL ? /* short io */
2137 aa->aa_requested_nob :
2138 req->rq_bulk->bd_nob_transferred);
2140 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2141 ptlrpc_lprocfs_brw(req, transferred);
2143 spin_lock(&cli->cl_loi_list_lock);
2144 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2145 * is called so we know whether to go to sync BRWs or wait for more
2146 * RPCs to complete */
2147 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2148 cli->cl_w_in_flight--;
2150 cli->cl_r_in_flight--;
2151 osc_wake_cache_waiters(cli);
2152 spin_unlock(&cli->cl_loi_list_lock);
2154 osc_io_unplug(env, cli, NULL);
2158 static void brw_commit(struct ptlrpc_request *req)
2160 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2161 * this called via the rq_commit_cb, I need to ensure
2162 * osc_dec_unstable_pages is still called. Otherwise unstable
2163 * pages may be leaked. */
2164 spin_lock(&req->rq_lock);
2165 if (likely(req->rq_unstable)) {
2166 req->rq_unstable = 0;
2167 spin_unlock(&req->rq_lock);
2169 osc_dec_unstable_pages(req);
2171 req->rq_committed = 1;
2172 spin_unlock(&req->rq_lock);
2177 * Build an RPC by the list of extent @ext_list. The caller must ensure
2178 * that the total pages in this list are NOT over max pages per RPC.
2179 * Extents in the list must be in OES_RPC state.
2181 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2182 struct list_head *ext_list, int cmd)
2184 struct ptlrpc_request *req = NULL;
2185 struct osc_extent *ext;
2186 struct brw_page **pga = NULL;
2187 struct osc_brw_async_args *aa = NULL;
2188 struct obdo *oa = NULL;
2189 struct osc_async_page *oap;
2190 struct osc_object *obj = NULL;
2191 struct cl_req_attr *crattr = NULL;
2192 loff_t starting_offset = OBD_OBJECT_EOF;
2193 loff_t ending_offset = 0;
2197 bool soft_sync = false;
2198 bool interrupted = false;
2199 bool ndelay = false;
2203 __u32 layout_version = 0;
2204 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
2205 struct ost_body *body;
2207 LASSERT(!list_empty(ext_list));
2209 /* add pages into rpc_list to build BRW rpc */
2210 list_for_each_entry(ext, ext_list, oe_link) {
2211 LASSERT(ext->oe_state == OES_RPC);
2212 mem_tight |= ext->oe_memalloc;
2213 grant += ext->oe_grants;
2214 page_count += ext->oe_nr_pages;
2215 layout_version = MAX(layout_version, ext->oe_layout_version);
2220 soft_sync = osc_over_unstable_soft_limit(cli);
2222 mpflag = cfs_memory_pressure_get_and_set();
2224 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2226 GOTO(out, rc = -ENOMEM);
2228 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2230 GOTO(out, rc = -ENOMEM);
2233 list_for_each_entry(ext, ext_list, oe_link) {
2234 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2236 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2238 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2239 pga[i] = &oap->oap_brw_page;
2240 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2243 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2244 if (starting_offset == OBD_OBJECT_EOF ||
2245 starting_offset > oap->oap_obj_off)
2246 starting_offset = oap->oap_obj_off;
2248 LASSERT(oap->oap_page_off == 0);
2249 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2250 ending_offset = oap->oap_obj_off +
2253 LASSERT(oap->oap_page_off + oap->oap_count ==
2255 if (oap->oap_interrupted)
2262 /* first page in the list */
2263 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2265 crattr = &osc_env_info(env)->oti_req_attr;
2266 memset(crattr, 0, sizeof(*crattr));
2267 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2268 crattr->cra_flags = ~0ULL;
2269 crattr->cra_page = oap2cl_page(oap);
2270 crattr->cra_oa = oa;
2271 cl_req_attr_set(env, osc2cl(obj), crattr);
2273 if (cmd == OBD_BRW_WRITE) {
2274 oa->o_grant_used = grant;
2275 if (layout_version > 0) {
2276 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2277 PFID(&oa->o_oi.oi_fid), layout_version);
2279 oa->o_layout_version = layout_version;
2280 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2284 sort_brw_pages(pga, page_count);
2285 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2287 CERROR("prep_req failed: %d\n", rc);
2291 req->rq_commit_cb = brw_commit;
2292 req->rq_interpret_reply = brw_interpret;
2293 req->rq_memalloc = mem_tight != 0;
2294 oap->oap_request = ptlrpc_request_addref(req);
2295 if (interrupted && !req->rq_intr)
2296 ptlrpc_mark_interrupted(req);
2298 req->rq_no_resend = req->rq_no_delay = 1;
2299 /* probably set a shorter timeout value.
2300 * to handle ETIMEDOUT in brw_interpret() correctly. */
2301 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2304 /* Need to update the timestamps after the request is built in case
2305 * we race with setattr (locally or in queue at OST). If OST gets
2306 * later setattr before earlier BRW (as determined by the request xid),
2307 * the OST will not use BRW timestamps. Sadly, there is no obvious
2308 * way to do this in a single call. bug 10150 */
2309 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2310 crattr->cra_oa = &body->oa;
2311 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2312 cl_req_attr_set(env, osc2cl(obj), crattr);
2313 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2315 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2316 aa = ptlrpc_req_async_args(req);
2317 INIT_LIST_HEAD(&aa->aa_oaps);
2318 list_splice_init(&rpc_list, &aa->aa_oaps);
2319 INIT_LIST_HEAD(&aa->aa_exts);
2320 list_splice_init(ext_list, &aa->aa_exts);
2322 spin_lock(&cli->cl_loi_list_lock);
2323 starting_offset >>= PAGE_SHIFT;
2324 if (cmd == OBD_BRW_READ) {
2325 cli->cl_r_in_flight++;
2326 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2327 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2328 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2329 starting_offset + 1);
2331 cli->cl_w_in_flight++;
2332 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2333 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2334 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2335 starting_offset + 1);
2337 spin_unlock(&cli->cl_loi_list_lock);
2339 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2340 page_count, aa, cli->cl_r_in_flight,
2341 cli->cl_w_in_flight);
2342 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2344 ptlrpcd_add_req(req);
2350 cfs_memory_pressure_restore(mpflag);
2353 LASSERT(req == NULL);
2356 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2358 OBD_FREE(pga, sizeof(*pga) * page_count);
2359 /* this should happen rarely and is pretty bad, it makes the
2360 * pending list not follow the dirty order */
2361 while (!list_empty(ext_list)) {
2362 ext = list_entry(ext_list->next, struct osc_extent,
2364 list_del_init(&ext->oe_link);
2365 osc_extent_finish(env, ext, 0, rc);
2371 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2375 LASSERT(lock != NULL);
2377 lock_res_and_lock(lock);
2379 if (lock->l_ast_data == NULL)
2380 lock->l_ast_data = data;
2381 if (lock->l_ast_data == data)
2384 unlock_res_and_lock(lock);
2389 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2390 void *cookie, struct lustre_handle *lockh,
2391 enum ldlm_mode mode, __u64 *flags, bool speculative,
2394 bool intent = *flags & LDLM_FL_HAS_INTENT;
2398 /* The request was created before ldlm_cli_enqueue call. */
2399 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2400 struct ldlm_reply *rep;
2402 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2403 LASSERT(rep != NULL);
2405 rep->lock_policy_res1 =
2406 ptlrpc_status_ntoh(rep->lock_policy_res1);
2407 if (rep->lock_policy_res1)
2408 errcode = rep->lock_policy_res1;
2410 *flags |= LDLM_FL_LVB_READY;
2411 } else if (errcode == ELDLM_OK) {
2412 *flags |= LDLM_FL_LVB_READY;
2415 /* Call the update callback. */
2416 rc = (*upcall)(cookie, lockh, errcode);
2418 /* release the reference taken in ldlm_cli_enqueue() */
2419 if (errcode == ELDLM_LOCK_MATCHED)
2421 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2422 ldlm_lock_decref(lockh, mode);
2427 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2430 struct osc_enqueue_args *aa = args;
2431 struct ldlm_lock *lock;
2432 struct lustre_handle *lockh = &aa->oa_lockh;
2433 enum ldlm_mode mode = aa->oa_mode;
2434 struct ost_lvb *lvb = aa->oa_lvb;
2435 __u32 lvb_len = sizeof(*lvb);
2440 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2442 lock = ldlm_handle2lock(lockh);
2443 LASSERTF(lock != NULL,
2444 "lockh %#llx, req %p, aa %p - client evicted?\n",
2445 lockh->cookie, req, aa);
2447 /* Take an additional reference so that a blocking AST that
2448 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2449 * to arrive after an upcall has been executed by
2450 * osc_enqueue_fini(). */
2451 ldlm_lock_addref(lockh, mode);
2453 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2454 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2456 /* Let CP AST to grant the lock first. */
2457 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2459 if (aa->oa_speculative) {
2460 LASSERT(aa->oa_lvb == NULL);
2461 LASSERT(aa->oa_flags == NULL);
2462 aa->oa_flags = &flags;
2465 /* Complete obtaining the lock procedure. */
2466 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2467 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2469 /* Complete osc stuff. */
2470 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2471 aa->oa_flags, aa->oa_speculative, rc);
2473 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2475 ldlm_lock_decref(lockh, mode);
2476 LDLM_LOCK_PUT(lock);
2480 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2482 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2483 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2484 * other synchronous requests, however keeping some locks and trying to obtain
2485 * others may take a considerable amount of time in a case of ost failure; and
2486 * when other sync requests do not get released lock from a client, the client
2487 * is evicted from the cluster -- such scenarious make the life difficult, so
2488 * release locks just after they are obtained. */
2489 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2490 __u64 *flags, union ldlm_policy_data *policy,
2491 struct ost_lvb *lvb, int kms_valid,
2492 osc_enqueue_upcall_f upcall, void *cookie,
2493 struct ldlm_enqueue_info *einfo,
2494 struct ptlrpc_request_set *rqset, int async,
2497 struct obd_device *obd = exp->exp_obd;
2498 struct lustre_handle lockh = { 0 };
2499 struct ptlrpc_request *req = NULL;
2500 int intent = *flags & LDLM_FL_HAS_INTENT;
2501 __u64 match_flags = *flags;
2502 enum ldlm_mode mode;
2506 /* Filesystem lock extents are extended to page boundaries so that
2507 * dealing with the page cache is a little smoother. */
2508 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2509 policy->l_extent.end |= ~PAGE_MASK;
2512 * kms is not valid when either object is completely fresh (so that no
2513 * locks are cached), or object was evicted. In the latter case cached
2514 * lock cannot be used, because it would prime inode state with
2515 * potentially stale LVB.
2520 /* Next, search for already existing extent locks that will cover us */
2521 /* If we're trying to read, we also search for an existing PW lock. The
2522 * VFS and page cache already protect us locally, so lots of readers/
2523 * writers can share a single PW lock.
2525 * There are problems with conversion deadlocks, so instead of
2526 * converting a read lock to a write lock, we'll just enqueue a new
2529 * At some point we should cancel the read lock instead of making them
2530 * send us a blocking callback, but there are problems with canceling
2531 * locks out from other users right now, too. */
2532 mode = einfo->ei_mode;
2533 if (einfo->ei_mode == LCK_PR)
2535 /* Normal lock requests must wait for the LVB to be ready before
2536 * matching a lock; speculative lock requests do not need to,
2537 * because they will not actually use the lock. */
2539 match_flags |= LDLM_FL_LVB_READY;
2541 match_flags |= LDLM_FL_BLOCK_GRANTED;
2542 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2543 einfo->ei_type, policy, mode, &lockh, 0);
2545 struct ldlm_lock *matched;
2547 if (*flags & LDLM_FL_TEST_LOCK)
2550 matched = ldlm_handle2lock(&lockh);
2552 /* This DLM lock request is speculative, and does not
2553 * have an associated IO request. Therefore if there
2554 * is already a DLM lock, it wll just inform the
2555 * caller to cancel the request for this stripe.*/
2556 lock_res_and_lock(matched);
2557 if (ldlm_extent_equal(&policy->l_extent,
2558 &matched->l_policy_data.l_extent))
2562 unlock_res_and_lock(matched);
2564 ldlm_lock_decref(&lockh, mode);
2565 LDLM_LOCK_PUT(matched);
2567 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2568 *flags |= LDLM_FL_LVB_READY;
2570 /* We already have a lock, and it's referenced. */
2571 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2573 ldlm_lock_decref(&lockh, mode);
2574 LDLM_LOCK_PUT(matched);
2577 ldlm_lock_decref(&lockh, mode);
2578 LDLM_LOCK_PUT(matched);
2583 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2587 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2588 &RQF_LDLM_ENQUEUE_LVB);
2592 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2594 ptlrpc_request_free(req);
2598 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2600 ptlrpc_request_set_replen(req);
2603 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2604 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2606 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2607 sizeof(*lvb), LVB_T_OST, &lockh, async);
2610 struct osc_enqueue_args *aa;
2611 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2612 aa = ptlrpc_req_async_args(req);
2614 aa->oa_mode = einfo->ei_mode;
2615 aa->oa_type = einfo->ei_type;
2616 lustre_handle_copy(&aa->oa_lockh, &lockh);
2617 aa->oa_upcall = upcall;
2618 aa->oa_cookie = cookie;
2619 aa->oa_speculative = speculative;
2621 aa->oa_flags = flags;
2624 /* speculative locks are essentially to enqueue
2625 * a DLM lock in advance, so we don't care
2626 * about the result of the enqueue. */
2628 aa->oa_flags = NULL;
2631 req->rq_interpret_reply = osc_enqueue_interpret;
2632 if (rqset == PTLRPCD_SET)
2633 ptlrpcd_add_req(req);
2635 ptlrpc_set_add_req(rqset, req);
2636 } else if (intent) {
2637 ptlrpc_req_finished(req);
2642 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2643 flags, speculative, rc);
2645 ptlrpc_req_finished(req);
2650 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2651 enum ldlm_type type, union ldlm_policy_data *policy,
2652 enum ldlm_mode mode, __u64 *flags, void *data,
2653 struct lustre_handle *lockh, int unref)
2655 struct obd_device *obd = exp->exp_obd;
2656 __u64 lflags = *flags;
2660 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2663 /* Filesystem lock extents are extended to page boundaries so that
2664 * dealing with the page cache is a little smoother */
2665 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2666 policy->l_extent.end |= ~PAGE_MASK;
2668 /* Next, search for already existing extent locks that will cover us */
2669 /* If we're trying to read, we also search for an existing PW lock. The
2670 * VFS and page cache already protect us locally, so lots of readers/
2671 * writers can share a single PW lock. */
2675 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2676 res_id, type, policy, rc, lockh, unref);
2677 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2681 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2683 LASSERT(lock != NULL);
2684 if (!osc_set_lock_data(lock, data)) {
2685 ldlm_lock_decref(lockh, rc);
2688 LDLM_LOCK_PUT(lock);
2693 static int osc_statfs_interpret(const struct lu_env *env,
2694 struct ptlrpc_request *req, void *args, int rc)
2696 struct osc_async_args *aa = args;
2697 struct obd_statfs *msfs;
2702 * The request has in fact never been sent due to issues at
2703 * a higher level (LOV). Exit immediately since the caller
2704 * is aware of the problem and takes care of the clean up.
2708 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2709 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2715 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2717 GOTO(out, rc = -EPROTO);
2719 *aa->aa_oi->oi_osfs = *msfs;
2721 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2726 static int osc_statfs_async(struct obd_export *exp,
2727 struct obd_info *oinfo, time64_t max_age,
2728 struct ptlrpc_request_set *rqset)
2730 struct obd_device *obd = class_exp2obd(exp);
2731 struct ptlrpc_request *req;
2732 struct osc_async_args *aa;
2736 if (obd->obd_osfs_age >= max_age) {
2738 "%s: use %p cache blocks %llu/%llu objects %llu/%llu\n",
2739 obd->obd_name, &obd->obd_osfs,
2740 obd->obd_osfs.os_bavail, obd->obd_osfs.os_blocks,
2741 obd->obd_osfs.os_ffree, obd->obd_osfs.os_files);
2742 spin_lock(&obd->obd_osfs_lock);
2743 memcpy(oinfo->oi_osfs, &obd->obd_osfs, sizeof(*oinfo->oi_osfs));
2744 spin_unlock(&obd->obd_osfs_lock);
2745 oinfo->oi_flags |= OBD_STATFS_FROM_CACHE;
2746 if (oinfo->oi_cb_up)
2747 oinfo->oi_cb_up(oinfo, 0);
2752 /* We could possibly pass max_age in the request (as an absolute
2753 * timestamp or a "seconds.usec ago") so the target can avoid doing
2754 * extra calls into the filesystem if that isn't necessary (e.g.
2755 * during mount that would help a bit). Having relative timestamps
2756 * is not so great if request processing is slow, while absolute
2757 * timestamps are not ideal because they need time synchronization. */
2758 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2762 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2764 ptlrpc_request_free(req);
2767 ptlrpc_request_set_replen(req);
2768 req->rq_request_portal = OST_CREATE_PORTAL;
2769 ptlrpc_at_set_req_timeout(req);
2771 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2772 /* procfs requests not want stat in wait for avoid deadlock */
2773 req->rq_no_resend = 1;
2774 req->rq_no_delay = 1;
2777 req->rq_interpret_reply = osc_statfs_interpret;
2778 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2779 aa = ptlrpc_req_async_args(req);
2782 ptlrpc_set_add_req(rqset, req);
2786 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2787 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2789 struct obd_device *obd = class_exp2obd(exp);
2790 struct obd_statfs *msfs;
2791 struct ptlrpc_request *req;
2792 struct obd_import *imp = NULL;
2797 /*Since the request might also come from lprocfs, so we need
2798 *sync this with client_disconnect_export Bug15684*/
2799 down_read(&obd->u.cli.cl_sem);
2800 if (obd->u.cli.cl_import)
2801 imp = class_import_get(obd->u.cli.cl_import);
2802 up_read(&obd->u.cli.cl_sem);
2806 /* We could possibly pass max_age in the request (as an absolute
2807 * timestamp or a "seconds.usec ago") so the target can avoid doing
2808 * extra calls into the filesystem if that isn't necessary (e.g.
2809 * during mount that would help a bit). Having relative timestamps
2810 * is not so great if request processing is slow, while absolute
2811 * timestamps are not ideal because they need time synchronization. */
2812 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2814 class_import_put(imp);
2819 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2821 ptlrpc_request_free(req);
2824 ptlrpc_request_set_replen(req);
2825 req->rq_request_portal = OST_CREATE_PORTAL;
2826 ptlrpc_at_set_req_timeout(req);
2828 if (flags & OBD_STATFS_NODELAY) {
2829 /* procfs requests not want stat in wait for avoid deadlock */
2830 req->rq_no_resend = 1;
2831 req->rq_no_delay = 1;
2834 rc = ptlrpc_queue_wait(req);
2838 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2840 GOTO(out, rc = -EPROTO);
2846 ptlrpc_req_finished(req);
2850 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2851 void *karg, void __user *uarg)
2853 struct obd_device *obd = exp->exp_obd;
2854 struct obd_ioctl_data *data = karg;
2858 if (!try_module_get(THIS_MODULE)) {
2859 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2860 module_name(THIS_MODULE));
2864 case OBD_IOC_CLIENT_RECOVER:
2865 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2866 data->ioc_inlbuf1, 0);
2870 case IOC_OSC_SET_ACTIVE:
2871 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2876 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2877 obd->obd_name, cmd, current_comm(), rc);
2881 module_put(THIS_MODULE);
2885 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2886 u32 keylen, void *key, u32 vallen, void *val,
2887 struct ptlrpc_request_set *set)
2889 struct ptlrpc_request *req;
2890 struct obd_device *obd = exp->exp_obd;
2891 struct obd_import *imp = class_exp2cliimp(exp);
2896 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2898 if (KEY_IS(KEY_CHECKSUM)) {
2899 if (vallen != sizeof(int))
2901 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2905 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2906 sptlrpc_conf_client_adapt(obd);
2910 if (KEY_IS(KEY_FLUSH_CTX)) {
2911 sptlrpc_import_flush_my_ctx(imp);
2915 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2916 struct client_obd *cli = &obd->u.cli;
2917 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2918 long target = *(long *)val;
2920 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2925 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2928 /* We pass all other commands directly to OST. Since nobody calls osc
2929 methods directly and everybody is supposed to go through LOV, we
2930 assume lov checked invalid values for us.
2931 The only recognised values so far are evict_by_nid and mds_conn.
2932 Even if something bad goes through, we'd get a -EINVAL from OST
2935 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2936 &RQF_OST_SET_GRANT_INFO :
2941 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2942 RCL_CLIENT, keylen);
2943 if (!KEY_IS(KEY_GRANT_SHRINK))
2944 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2945 RCL_CLIENT, vallen);
2946 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2948 ptlrpc_request_free(req);
2952 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2953 memcpy(tmp, key, keylen);
2954 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2957 memcpy(tmp, val, vallen);
2959 if (KEY_IS(KEY_GRANT_SHRINK)) {
2960 struct osc_grant_args *aa;
2963 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2964 aa = ptlrpc_req_async_args(req);
2965 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2967 ptlrpc_req_finished(req);
2970 *oa = ((struct ost_body *)val)->oa;
2972 req->rq_interpret_reply = osc_shrink_grant_interpret;
2975 ptlrpc_request_set_replen(req);
2976 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2977 LASSERT(set != NULL);
2978 ptlrpc_set_add_req(set, req);
2979 ptlrpc_check_set(NULL, set);
2981 ptlrpcd_add_req(req);
2986 EXPORT_SYMBOL(osc_set_info_async);
2988 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2989 struct obd_device *obd, struct obd_uuid *cluuid,
2990 struct obd_connect_data *data, void *localdata)
2992 struct client_obd *cli = &obd->u.cli;
2994 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2998 spin_lock(&cli->cl_loi_list_lock);
2999 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
3000 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
3001 /* restore ocd_grant_blkbits as client page bits */
3002 data->ocd_grant_blkbits = PAGE_SHIFT;
3003 grant += cli->cl_dirty_grant;
3005 grant += cli->cl_dirty_pages << PAGE_SHIFT;
3007 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
3008 lost_grant = cli->cl_lost_grant;
3009 cli->cl_lost_grant = 0;
3010 spin_unlock(&cli->cl_loi_list_lock);
3012 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3013 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3014 data->ocd_version, data->ocd_grant, lost_grant);
3019 EXPORT_SYMBOL(osc_reconnect);
3021 int osc_disconnect(struct obd_export *exp)
3023 struct obd_device *obd = class_exp2obd(exp);
3026 rc = client_disconnect_export(exp);
3028 * Initially we put del_shrink_grant before disconnect_export, but it
3029 * causes the following problem if setup (connect) and cleanup
3030 * (disconnect) are tangled together.
3031 * connect p1 disconnect p2
3032 * ptlrpc_connect_import
3033 * ............... class_manual_cleanup
3036 * ptlrpc_connect_interrupt
3038 * add this client to shrink list
3040 * Bang! grant shrink thread trigger the shrink. BUG18662
3042 osc_del_grant_list(&obd->u.cli);
3045 EXPORT_SYMBOL(osc_disconnect);
3047 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3048 struct hlist_node *hnode, void *arg)
3050 struct lu_env *env = arg;
3051 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3052 struct ldlm_lock *lock;
3053 struct osc_object *osc = NULL;
3057 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3058 if (lock->l_ast_data != NULL && osc == NULL) {
3059 osc = lock->l_ast_data;
3060 cl_object_get(osc2cl(osc));
3063 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3064 * by the 2nd round of ldlm_namespace_clean() call in
3065 * osc_import_event(). */
3066 ldlm_clear_cleaned(lock);
3071 osc_object_invalidate(env, osc);
3072 cl_object_put(env, osc2cl(osc));
3077 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3079 static int osc_import_event(struct obd_device *obd,
3080 struct obd_import *imp,
3081 enum obd_import_event event)
3083 struct client_obd *cli;
3087 LASSERT(imp->imp_obd == obd);
3090 case IMP_EVENT_DISCON: {
3092 spin_lock(&cli->cl_loi_list_lock);
3093 cli->cl_avail_grant = 0;
3094 cli->cl_lost_grant = 0;
3095 spin_unlock(&cli->cl_loi_list_lock);
3098 case IMP_EVENT_INACTIVE: {
3099 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3102 case IMP_EVENT_INVALIDATE: {
3103 struct ldlm_namespace *ns = obd->obd_namespace;
3107 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3109 env = cl_env_get(&refcheck);
3111 osc_io_unplug(env, &obd->u.cli, NULL);
3113 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3114 osc_ldlm_resource_invalidate,
3116 cl_env_put(env, &refcheck);
3118 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3123 case IMP_EVENT_ACTIVE: {
3124 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3127 case IMP_EVENT_OCD: {
3128 struct obd_connect_data *ocd = &imp->imp_connect_data;
3130 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3131 osc_init_grant(&obd->u.cli, ocd);
3134 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3135 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3137 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3140 case IMP_EVENT_DEACTIVATE: {
3141 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3144 case IMP_EVENT_ACTIVATE: {
3145 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3149 CERROR("Unknown import event %d\n", event);
3156 * Determine whether the lock can be canceled before replaying the lock
3157 * during recovery, see bug16774 for detailed information.
3159 * \retval zero the lock can't be canceled
3160 * \retval other ok to cancel
3162 static int osc_cancel_weight(struct ldlm_lock *lock)
3165 * Cancel all unused and granted extent lock.
3167 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3168 ldlm_is_granted(lock) &&
3169 osc_ldlm_weigh_ast(lock) == 0)
3175 static int brw_queue_work(const struct lu_env *env, void *data)
3177 struct client_obd *cli = data;
3179 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3181 osc_io_unplug(env, cli, NULL);
3185 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3187 struct client_obd *cli = &obd->u.cli;
3193 rc = ptlrpcd_addref();
3197 rc = client_obd_setup(obd, lcfg);
3199 GOTO(out_ptlrpcd, rc);
3202 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3203 if (IS_ERR(handler))
3204 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3205 cli->cl_writeback_work = handler;
3207 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3208 if (IS_ERR(handler))
3209 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3210 cli->cl_lru_work = handler;
3212 rc = osc_quota_setup(obd);
3214 GOTO(out_ptlrpcd_work, rc);
3216 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3217 osc_update_next_shrink(cli);
3222 if (cli->cl_writeback_work != NULL) {
3223 ptlrpcd_destroy_work(cli->cl_writeback_work);
3224 cli->cl_writeback_work = NULL;
3226 if (cli->cl_lru_work != NULL) {
3227 ptlrpcd_destroy_work(cli->cl_lru_work);
3228 cli->cl_lru_work = NULL;
3230 client_obd_cleanup(obd);
3235 EXPORT_SYMBOL(osc_setup_common);
3237 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3239 struct client_obd *cli = &obd->u.cli;
3247 rc = osc_setup_common(obd, lcfg);
3251 rc = osc_tunables_init(obd);
3256 * We try to control the total number of requests with a upper limit
3257 * osc_reqpool_maxreqcount. There might be some race which will cause
3258 * over-limit allocation, but it is fine.
3260 req_count = atomic_read(&osc_pool_req_count);
3261 if (req_count < osc_reqpool_maxreqcount) {
3262 adding = cli->cl_max_rpcs_in_flight + 2;
3263 if (req_count + adding > osc_reqpool_maxreqcount)
3264 adding = osc_reqpool_maxreqcount - req_count;
3266 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3267 atomic_add(added, &osc_pool_req_count);
3270 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3272 spin_lock(&osc_shrink_lock);
3273 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3274 spin_unlock(&osc_shrink_lock);
3275 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3276 cli->cl_import->imp_idle_debug = D_HA;
3281 int osc_precleanup_common(struct obd_device *obd)
3283 struct client_obd *cli = &obd->u.cli;
3287 * for echo client, export may be on zombie list, wait for
3288 * zombie thread to cull it, because cli.cl_import will be
3289 * cleared in client_disconnect_export():
3290 * class_export_destroy() -> obd_cleanup() ->
3291 * echo_device_free() -> echo_client_cleanup() ->
3292 * obd_disconnect() -> osc_disconnect() ->
3293 * client_disconnect_export()
3295 obd_zombie_barrier();
3296 if (cli->cl_writeback_work) {
3297 ptlrpcd_destroy_work(cli->cl_writeback_work);
3298 cli->cl_writeback_work = NULL;
3301 if (cli->cl_lru_work) {
3302 ptlrpcd_destroy_work(cli->cl_lru_work);
3303 cli->cl_lru_work = NULL;
3306 obd_cleanup_client_import(obd);
3309 EXPORT_SYMBOL(osc_precleanup_common);
3311 static int osc_precleanup(struct obd_device *obd)
3315 osc_precleanup_common(obd);
3317 ptlrpc_lprocfs_unregister_obd(obd);
3321 int osc_cleanup_common(struct obd_device *obd)
3323 struct client_obd *cli = &obd->u.cli;
3328 spin_lock(&osc_shrink_lock);
3329 list_del(&cli->cl_shrink_list);
3330 spin_unlock(&osc_shrink_lock);
3333 if (cli->cl_cache != NULL) {
3334 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3335 spin_lock(&cli->cl_cache->ccc_lru_lock);
3336 list_del_init(&cli->cl_lru_osc);
3337 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3338 cli->cl_lru_left = NULL;
3339 cl_cache_decref(cli->cl_cache);
3340 cli->cl_cache = NULL;
3343 /* free memory of osc quota cache */
3344 osc_quota_cleanup(obd);
3346 rc = client_obd_cleanup(obd);
3351 EXPORT_SYMBOL(osc_cleanup_common);
3353 static struct obd_ops osc_obd_ops = {
3354 .o_owner = THIS_MODULE,
3355 .o_setup = osc_setup,
3356 .o_precleanup = osc_precleanup,
3357 .o_cleanup = osc_cleanup_common,
3358 .o_add_conn = client_import_add_conn,
3359 .o_del_conn = client_import_del_conn,
3360 .o_connect = client_connect_import,
3361 .o_reconnect = osc_reconnect,
3362 .o_disconnect = osc_disconnect,
3363 .o_statfs = osc_statfs,
3364 .o_statfs_async = osc_statfs_async,
3365 .o_create = osc_create,
3366 .o_destroy = osc_destroy,
3367 .o_getattr = osc_getattr,
3368 .o_setattr = osc_setattr,
3369 .o_iocontrol = osc_iocontrol,
3370 .o_set_info_async = osc_set_info_async,
3371 .o_import_event = osc_import_event,
3372 .o_quotactl = osc_quotactl,
3375 static struct shrinker *osc_cache_shrinker;
3376 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3377 DEFINE_SPINLOCK(osc_shrink_lock);
3379 #ifndef HAVE_SHRINKER_COUNT
3380 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3382 struct shrink_control scv = {
3383 .nr_to_scan = shrink_param(sc, nr_to_scan),
3384 .gfp_mask = shrink_param(sc, gfp_mask)
3386 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3387 struct shrinker *shrinker = NULL;
3390 (void)osc_cache_shrink_scan(shrinker, &scv);
3392 return osc_cache_shrink_count(shrinker, &scv);
3396 static int __init osc_init(void)
3398 unsigned int reqpool_size;
3399 unsigned int reqsize;
3401 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3402 osc_cache_shrink_count, osc_cache_shrink_scan);
3405 /* print an address of _any_ initialized kernel symbol from this
3406 * module, to allow debugging with gdb that doesn't support data
3407 * symbols from modules.*/
3408 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3410 rc = lu_kmem_init(osc_caches);
3414 rc = class_register_type(&osc_obd_ops, NULL, true, NULL,
3415 LUSTRE_OSC_NAME, &osc_device_type);
3419 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3421 /* This is obviously too much memory, only prevent overflow here */
3422 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3423 GOTO(out_type, rc = -EINVAL);
3425 reqpool_size = osc_reqpool_mem_max << 20;
3428 while (reqsize < OST_IO_MAXREQSIZE)
3429 reqsize = reqsize << 1;
3432 * We don't enlarge the request count in OSC pool according to
3433 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3434 * tried after normal allocation failed. So a small OSC pool won't
3435 * cause much performance degression in most of cases.
3437 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3439 atomic_set(&osc_pool_req_count, 0);
3440 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3441 ptlrpc_add_rqs_to_pool);
3443 if (osc_rq_pool == NULL)
3444 GOTO(out_type, rc = -ENOMEM);
3446 rc = osc_start_grant_work();
3448 GOTO(out_req_pool, rc);
3453 ptlrpc_free_rq_pool(osc_rq_pool);
3455 class_unregister_type(LUSTRE_OSC_NAME);
3457 lu_kmem_fini(osc_caches);
3462 static void __exit osc_exit(void)
3464 osc_stop_grant_work();
3465 remove_shrinker(osc_cache_shrinker);
3466 class_unregister_type(LUSTRE_OSC_NAME);
3467 lu_kmem_fini(osc_caches);
3468 ptlrpc_free_rq_pool(osc_rq_pool);
3471 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3472 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3473 MODULE_VERSION(LUSTRE_VERSION_STRING);
3474 MODULE_LICENSE("GPL");
3476 module_init(osc_init);
3477 module_exit(osc_exit);