4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <linux/workqueue.h>
36 #include <lprocfs_status.h>
37 #include <lustre_debug.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_ha.h>
41 #include <uapi/linux/lustre/lustre_ioctl.h>
42 #include <lustre_net.h>
43 #include <lustre_obdo.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
49 #include "osc_internal.h"
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
62 #define osc_grant_args osc_brw_async_args
64 struct osc_setattr_args {
66 obd_enqueue_update_f sa_upcall;
70 struct osc_fsync_args {
71 struct osc_object *fa_obj;
73 obd_enqueue_update_f fa_upcall;
77 struct osc_ladvise_args {
79 obd_enqueue_update_f la_upcall;
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
89 struct ost_body *body;
91 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100 struct ptlrpc_request *req;
101 struct ost_body *body;
105 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
111 ptlrpc_request_free(req);
115 osc_pack_req_body(req, oa);
117 ptlrpc_request_set_replen(req);
119 rc = ptlrpc_queue_wait(req);
123 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
125 GOTO(out, rc = -EPROTO);
127 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
130 oa->o_blksize = cli_brw_size(exp->exp_obd);
131 oa->o_valid |= OBD_MD_FLBLKSZ;
135 ptlrpc_req_finished(req);
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143 struct ptlrpc_request *req;
144 struct ost_body *body;
148 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
150 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
156 ptlrpc_request_free(req);
160 osc_pack_req_body(req, oa);
162 ptlrpc_request_set_replen(req);
164 rc = ptlrpc_queue_wait(req);
168 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
170 GOTO(out, rc = -EPROTO);
172 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176 ptlrpc_req_finished(req);
181 static int osc_setattr_interpret(const struct lu_env *env,
182 struct ptlrpc_request *req, void *args, int rc)
184 struct osc_setattr_args *sa = args;
185 struct ost_body *body;
192 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
194 GOTO(out, rc = -EPROTO);
196 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
199 rc = sa->sa_upcall(sa->sa_cookie, rc);
203 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
204 obd_enqueue_update_f upcall, void *cookie,
205 struct ptlrpc_request_set *rqset)
207 struct ptlrpc_request *req;
208 struct osc_setattr_args *sa;
213 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
217 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
219 ptlrpc_request_free(req);
223 osc_pack_req_body(req, oa);
225 ptlrpc_request_set_replen(req);
227 /* do mds to ost setattr asynchronously */
229 /* Do not wait for response. */
230 ptlrpcd_add_req(req);
232 req->rq_interpret_reply = osc_setattr_interpret;
234 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
235 sa = ptlrpc_req_async_args(req);
237 sa->sa_upcall = upcall;
238 sa->sa_cookie = cookie;
240 if (rqset == PTLRPCD_SET)
241 ptlrpcd_add_req(req);
243 ptlrpc_set_add_req(rqset, req);
249 static int osc_ladvise_interpret(const struct lu_env *env,
250 struct ptlrpc_request *req,
253 struct osc_ladvise_args *la = arg;
254 struct ost_body *body;
260 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
262 GOTO(out, rc = -EPROTO);
264 *la->la_oa = body->oa;
266 rc = la->la_upcall(la->la_cookie, rc);
271 * If rqset is NULL, do not wait for response. Upcall and cookie could also
272 * be NULL in this case
274 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
275 struct ladvise_hdr *ladvise_hdr,
276 obd_enqueue_update_f upcall, void *cookie,
277 struct ptlrpc_request_set *rqset)
279 struct ptlrpc_request *req;
280 struct ost_body *body;
281 struct osc_ladvise_args *la;
283 struct lu_ladvise *req_ladvise;
284 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
285 int num_advise = ladvise_hdr->lah_count;
286 struct ladvise_hdr *req_ladvise_hdr;
289 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
293 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
294 num_advise * sizeof(*ladvise));
295 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
297 ptlrpc_request_free(req);
300 req->rq_request_portal = OST_IO_PORTAL;
301 ptlrpc_at_set_req_timeout(req);
303 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
305 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
308 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
309 &RMF_OST_LADVISE_HDR);
310 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
312 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
313 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
314 ptlrpc_request_set_replen(req);
317 /* Do not wait for response. */
318 ptlrpcd_add_req(req);
322 req->rq_interpret_reply = osc_ladvise_interpret;
323 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
324 la = ptlrpc_req_async_args(req);
326 la->la_upcall = upcall;
327 la->la_cookie = cookie;
329 if (rqset == PTLRPCD_SET)
330 ptlrpcd_add_req(req);
332 ptlrpc_set_add_req(rqset, req);
337 static int osc_create(const struct lu_env *env, struct obd_export *exp,
340 struct ptlrpc_request *req;
341 struct ost_body *body;
346 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
347 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
349 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
351 GOTO(out, rc = -ENOMEM);
353 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
355 ptlrpc_request_free(req);
359 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
362 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
364 ptlrpc_request_set_replen(req);
366 rc = ptlrpc_queue_wait(req);
370 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
372 GOTO(out_req, rc = -EPROTO);
374 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
375 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
377 oa->o_blksize = cli_brw_size(exp->exp_obd);
378 oa->o_valid |= OBD_MD_FLBLKSZ;
380 CDEBUG(D_HA, "transno: %lld\n",
381 lustre_msg_get_transno(req->rq_repmsg));
383 ptlrpc_req_finished(req);
388 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
389 obd_enqueue_update_f upcall, void *cookie)
391 struct ptlrpc_request *req;
392 struct osc_setattr_args *sa;
393 struct obd_import *imp = class_exp2cliimp(exp);
394 struct ost_body *body;
399 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
403 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
405 ptlrpc_request_free(req);
409 osc_set_io_portal(req);
411 ptlrpc_at_set_req_timeout(req);
413 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
415 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
417 ptlrpc_request_set_replen(req);
419 req->rq_interpret_reply = osc_setattr_interpret;
420 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
421 sa = ptlrpc_req_async_args(req);
423 sa->sa_upcall = upcall;
424 sa->sa_cookie = cookie;
426 ptlrpcd_add_req(req);
430 EXPORT_SYMBOL(osc_punch_send);
432 static int osc_sync_interpret(const struct lu_env *env,
433 struct ptlrpc_request *req, void *args, int rc)
435 struct osc_fsync_args *fa = args;
436 struct ost_body *body;
437 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
438 unsigned long valid = 0;
439 struct cl_object *obj;
445 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
447 CERROR("can't unpack ost_body\n");
448 GOTO(out, rc = -EPROTO);
451 *fa->fa_oa = body->oa;
452 obj = osc2cl(fa->fa_obj);
454 /* Update osc object's blocks attribute */
455 cl_object_attr_lock(obj);
456 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
457 attr->cat_blocks = body->oa.o_blocks;
462 cl_object_attr_update(env, obj, attr, valid);
463 cl_object_attr_unlock(obj);
466 rc = fa->fa_upcall(fa->fa_cookie, rc);
470 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
471 obd_enqueue_update_f upcall, void *cookie,
472 struct ptlrpc_request_set *rqset)
474 struct obd_export *exp = osc_export(obj);
475 struct ptlrpc_request *req;
476 struct ost_body *body;
477 struct osc_fsync_args *fa;
481 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
485 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
487 ptlrpc_request_free(req);
491 /* overload the size and blocks fields in the oa with start/end */
492 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
494 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
496 ptlrpc_request_set_replen(req);
497 req->rq_interpret_reply = osc_sync_interpret;
499 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
500 fa = ptlrpc_req_async_args(req);
503 fa->fa_upcall = upcall;
504 fa->fa_cookie = cookie;
506 if (rqset == PTLRPCD_SET)
507 ptlrpcd_add_req(req);
509 ptlrpc_set_add_req(rqset, req);
514 /* Find and cancel locally locks matched by @mode in the resource found by
515 * @objid. Found locks are added into @cancel list. Returns the amount of
516 * locks added to @cancels list. */
517 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
518 struct list_head *cancels,
519 enum ldlm_mode mode, __u64 lock_flags)
521 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
522 struct ldlm_res_id res_id;
523 struct ldlm_resource *res;
527 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
528 * export) but disabled through procfs (flag in NS).
530 * This distinguishes from a case when ELC is not supported originally,
531 * when we still want to cancel locks in advance and just cancel them
532 * locally, without sending any RPC. */
533 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
536 ostid_build_res_name(&oa->o_oi, &res_id);
537 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
541 LDLM_RESOURCE_ADDREF(res);
542 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
543 lock_flags, 0, NULL);
544 LDLM_RESOURCE_DELREF(res);
545 ldlm_resource_putref(res);
549 static int osc_destroy_interpret(const struct lu_env *env,
550 struct ptlrpc_request *req, void *args, int rc)
552 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
554 atomic_dec(&cli->cl_destroy_in_flight);
555 wake_up(&cli->cl_destroy_waitq);
560 static int osc_can_send_destroy(struct client_obd *cli)
562 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
563 cli->cl_max_rpcs_in_flight) {
564 /* The destroy request can be sent */
567 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
568 cli->cl_max_rpcs_in_flight) {
570 * The counter has been modified between the two atomic
573 wake_up(&cli->cl_destroy_waitq);
578 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
581 struct client_obd *cli = &exp->exp_obd->u.cli;
582 struct ptlrpc_request *req;
583 struct ost_body *body;
584 struct list_head cancels = LIST_HEAD_INIT(cancels);
589 CDEBUG(D_INFO, "oa NULL\n");
593 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
594 LDLM_FL_DISCARD_DATA);
596 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
598 ldlm_lock_list_put(&cancels, l_bl_ast, count);
602 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
605 ptlrpc_request_free(req);
609 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
610 ptlrpc_at_set_req_timeout(req);
612 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
616 ptlrpc_request_set_replen(req);
618 req->rq_interpret_reply = osc_destroy_interpret;
619 if (!osc_can_send_destroy(cli)) {
620 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
623 * Wait until the number of on-going destroy RPCs drops
624 * under max_rpc_in_flight
626 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
627 osc_can_send_destroy(cli), &lwi);
629 ptlrpc_req_finished(req);
634 /* Do not wait for response */
635 ptlrpcd_add_req(req);
639 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
642 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
644 LASSERT(!(oa->o_valid & bits));
647 spin_lock(&cli->cl_loi_list_lock);
648 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
649 oa->o_dirty = cli->cl_dirty_grant;
651 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
652 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
653 cli->cl_dirty_max_pages)) {
654 CERROR("dirty %lu - %lu > dirty_max %lu\n",
655 cli->cl_dirty_pages, cli->cl_dirty_transit,
656 cli->cl_dirty_max_pages);
658 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
659 atomic_long_read(&obd_dirty_transit_pages) >
660 (long)(obd_max_dirty_pages + 1))) {
661 /* The atomic_read() allowing the atomic_inc() are
662 * not covered by a lock thus they may safely race and trip
663 * this CERROR() unless we add in a small fudge factor (+1). */
664 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
665 cli_name(cli), atomic_long_read(&obd_dirty_pages),
666 atomic_long_read(&obd_dirty_transit_pages),
667 obd_max_dirty_pages);
669 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
671 CERROR("dirty %lu - dirty_max %lu too big???\n",
672 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
675 unsigned long nrpages;
676 unsigned long undirty;
678 nrpages = cli->cl_max_pages_per_rpc;
679 nrpages *= cli->cl_max_rpcs_in_flight + 1;
680 nrpages = max(nrpages, cli->cl_dirty_max_pages);
681 undirty = nrpages << PAGE_SHIFT;
682 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
686 /* take extent tax into account when asking for more
688 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
689 cli->cl_max_extent_pages;
690 undirty += nrextents * cli->cl_grant_extent_tax;
692 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
693 * to add extent tax, etc.
695 oa->o_undirty = min(undirty, OBD_MAX_GRANT &
696 ~(PTLRPC_MAX_BRW_SIZE * 4UL));
698 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
699 oa->o_dropped = cli->cl_lost_grant;
700 cli->cl_lost_grant = 0;
701 spin_unlock(&cli->cl_loi_list_lock);
702 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
703 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
706 void osc_update_next_shrink(struct client_obd *cli)
708 cli->cl_next_shrink_grant = ktime_get_seconds() +
709 cli->cl_grant_shrink_interval;
711 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
712 cli->cl_next_shrink_grant);
715 static void __osc_update_grant(struct client_obd *cli, u64 grant)
717 spin_lock(&cli->cl_loi_list_lock);
718 cli->cl_avail_grant += grant;
719 spin_unlock(&cli->cl_loi_list_lock);
722 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
724 if (body->oa.o_valid & OBD_MD_FLGRANT) {
725 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
726 __osc_update_grant(cli, body->oa.o_grant);
731 * grant thread data for shrinking space.
733 struct grant_thread_data {
734 struct list_head gtd_clients;
735 struct mutex gtd_mutex;
736 unsigned long gtd_stopped:1;
738 static struct grant_thread_data client_gtd;
740 static int osc_shrink_grant_interpret(const struct lu_env *env,
741 struct ptlrpc_request *req,
744 struct osc_grant_args *aa = args;
745 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
746 struct ost_body *body;
749 __osc_update_grant(cli, aa->aa_oa->o_grant);
753 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
755 osc_update_grant(cli, body);
757 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
762 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
764 spin_lock(&cli->cl_loi_list_lock);
765 oa->o_grant = cli->cl_avail_grant / 4;
766 cli->cl_avail_grant -= oa->o_grant;
767 spin_unlock(&cli->cl_loi_list_lock);
768 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
769 oa->o_valid |= OBD_MD_FLFLAGS;
772 oa->o_flags |= OBD_FL_SHRINK_GRANT;
773 osc_update_next_shrink(cli);
776 /* Shrink the current grant, either from some large amount to enough for a
777 * full set of in-flight RPCs, or if we have already shrunk to that limit
778 * then to enough for a single RPC. This avoids keeping more grant than
779 * needed, and avoids shrinking the grant piecemeal. */
780 static int osc_shrink_grant(struct client_obd *cli)
782 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
783 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
785 spin_lock(&cli->cl_loi_list_lock);
786 if (cli->cl_avail_grant <= target_bytes)
787 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
788 spin_unlock(&cli->cl_loi_list_lock);
790 return osc_shrink_grant_to_target(cli, target_bytes);
793 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
796 struct ost_body *body;
799 spin_lock(&cli->cl_loi_list_lock);
800 /* Don't shrink if we are already above or below the desired limit
801 * We don't want to shrink below a single RPC, as that will negatively
802 * impact block allocation and long-term performance. */
803 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
804 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
806 if (target_bytes >= cli->cl_avail_grant) {
807 spin_unlock(&cli->cl_loi_list_lock);
810 spin_unlock(&cli->cl_loi_list_lock);
816 osc_announce_cached(cli, &body->oa, 0);
818 spin_lock(&cli->cl_loi_list_lock);
819 if (target_bytes >= cli->cl_avail_grant) {
820 /* available grant has changed since target calculation */
821 spin_unlock(&cli->cl_loi_list_lock);
822 GOTO(out_free, rc = 0);
824 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
825 cli->cl_avail_grant = target_bytes;
826 spin_unlock(&cli->cl_loi_list_lock);
827 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
828 body->oa.o_valid |= OBD_MD_FLFLAGS;
829 body->oa.o_flags = 0;
831 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
832 osc_update_next_shrink(cli);
834 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
835 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
836 sizeof(*body), body, NULL);
838 __osc_update_grant(cli, body->oa.o_grant);
844 static int osc_should_shrink_grant(struct client_obd *client)
846 time64_t next_shrink = client->cl_next_shrink_grant;
848 if (client->cl_import == NULL)
851 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
852 OBD_CONNECT_GRANT_SHRINK) == 0)
855 if (ktime_get_seconds() >= next_shrink - 5) {
856 /* Get the current RPC size directly, instead of going via:
857 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
858 * Keep comment here so that it can be found by searching. */
859 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
861 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
862 client->cl_avail_grant > brw_size)
865 osc_update_next_shrink(client);
870 #define GRANT_SHRINK_RPC_BATCH 100
872 static struct delayed_work work;
874 static void osc_grant_work_handler(struct work_struct *data)
876 struct client_obd *cli;
878 bool init_next_shrink = true;
879 time64_t next_shrink = ktime_get_seconds() + GRANT_SHRINK_INTERVAL;
882 mutex_lock(&client_gtd.gtd_mutex);
883 list_for_each_entry(cli, &client_gtd.gtd_clients,
885 if (rpc_sent < GRANT_SHRINK_RPC_BATCH &&
886 osc_should_shrink_grant(cli)) {
887 osc_shrink_grant(cli);
891 if (!init_next_shrink) {
892 if (cli->cl_next_shrink_grant < next_shrink &&
893 cli->cl_next_shrink_grant > ktime_get_seconds())
894 next_shrink = cli->cl_next_shrink_grant;
896 init_next_shrink = false;
897 next_shrink = cli->cl_next_shrink_grant;
900 mutex_unlock(&client_gtd.gtd_mutex);
902 if (client_gtd.gtd_stopped == 1)
905 if (next_shrink > ktime_get_seconds())
906 schedule_delayed_work(&work, msecs_to_jiffies(
907 (next_shrink - ktime_get_seconds()) *
910 schedule_work(&work.work);
913 void osc_schedule_grant_work(void)
915 cancel_delayed_work_sync(&work);
916 schedule_work(&work.work);
920 * Start grant thread for returing grant to server for idle clients.
922 static int osc_start_grant_work(void)
924 client_gtd.gtd_stopped = 0;
925 mutex_init(&client_gtd.gtd_mutex);
926 INIT_LIST_HEAD(&client_gtd.gtd_clients);
928 INIT_DELAYED_WORK(&work, osc_grant_work_handler);
929 schedule_work(&work.work);
934 static void osc_stop_grant_work(void)
936 client_gtd.gtd_stopped = 1;
937 cancel_delayed_work_sync(&work);
940 static void osc_add_grant_list(struct client_obd *client)
942 mutex_lock(&client_gtd.gtd_mutex);
943 list_add(&client->cl_grant_chain, &client_gtd.gtd_clients);
944 mutex_unlock(&client_gtd.gtd_mutex);
947 static void osc_del_grant_list(struct client_obd *client)
949 if (list_empty(&client->cl_grant_chain))
952 mutex_lock(&client_gtd.gtd_mutex);
953 list_del_init(&client->cl_grant_chain);
954 mutex_unlock(&client_gtd.gtd_mutex);
957 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
960 * ocd_grant is the total grant amount we're expect to hold: if we've
961 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
962 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
965 * race is tolerable here: if we're evicted, but imp_state already
966 * left EVICTED state, then cl_dirty_pages must be 0 already.
968 spin_lock(&cli->cl_loi_list_lock);
969 cli->cl_avail_grant = ocd->ocd_grant;
970 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
971 cli->cl_avail_grant -= cli->cl_reserved_grant;
972 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
973 cli->cl_avail_grant -= cli->cl_dirty_grant;
975 cli->cl_avail_grant -=
976 cli->cl_dirty_pages << PAGE_SHIFT;
979 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
983 /* overhead for each extent insertion */
984 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
985 /* determine the appropriate chunk size used by osc_extent. */
986 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
987 ocd->ocd_grant_blkbits);
988 /* max_pages_per_rpc must be chunk aligned */
989 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
990 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
991 ~chunk_mask) & chunk_mask;
992 /* determine maximum extent size, in #pages */
993 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
994 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
995 if (cli->cl_max_extent_pages == 0)
996 cli->cl_max_extent_pages = 1;
998 cli->cl_grant_extent_tax = 0;
999 cli->cl_chunkbits = PAGE_SHIFT;
1000 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
1002 spin_unlock(&cli->cl_loi_list_lock);
1004 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1005 "chunk bits: %d cl_max_extent_pages: %d\n",
1007 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
1008 cli->cl_max_extent_pages);
1010 if (OCD_HAS_FLAG(ocd, GRANT_SHRINK) && list_empty(&cli->cl_grant_chain))
1011 osc_add_grant_list(cli);
1013 EXPORT_SYMBOL(osc_init_grant);
1015 /* We assume that the reason this OSC got a short read is because it read
1016 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1017 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1018 * this stripe never got written at or beyond this stripe offset yet. */
1019 static void handle_short_read(int nob_read, size_t page_count,
1020 struct brw_page **pga)
1025 /* skip bytes read OK */
1026 while (nob_read > 0) {
1027 LASSERT (page_count > 0);
1029 if (pga[i]->count > nob_read) {
1030 /* EOF inside this page */
1031 ptr = kmap(pga[i]->pg) +
1032 (pga[i]->off & ~PAGE_MASK);
1033 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1040 nob_read -= pga[i]->count;
1045 /* zero remaining pages */
1046 while (page_count-- > 0) {
1047 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
1048 memset(ptr, 0, pga[i]->count);
1054 static int check_write_rcs(struct ptlrpc_request *req,
1055 int requested_nob, int niocount,
1056 size_t page_count, struct brw_page **pga)
1061 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1062 sizeof(*remote_rcs) *
1064 if (remote_rcs == NULL) {
1065 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1069 /* return error if any niobuf was in error */
1070 for (i = 0; i < niocount; i++) {
1071 if ((int)remote_rcs[i] < 0)
1072 return(remote_rcs[i]);
1074 if (remote_rcs[i] != 0) {
1075 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1076 i, remote_rcs[i], req);
1080 if (req->rq_bulk != NULL &&
1081 req->rq_bulk->bd_nob_transferred != requested_nob) {
1082 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1083 req->rq_bulk->bd_nob_transferred, requested_nob);
1090 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1092 if (p1->flag != p2->flag) {
1093 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1094 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1095 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1097 /* warn if we try to combine flags that we don't know to be
1098 * safe to combine */
1099 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1100 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1101 "report this at https://jira.whamcloud.com/\n",
1102 p1->flag, p2->flag);
1107 return (p1->off + p1->count == p2->off);
1110 #if IS_ENABLED(CONFIG_CRC_T10DIF)
1111 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1112 size_t pg_count, struct brw_page **pga,
1113 int opc, obd_dif_csum_fn *fn,
1117 struct ahash_request *req;
1118 /* Used Adler as the default checksum type on top of DIF tags */
1119 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1120 struct page *__page;
1121 unsigned char *buffer;
1123 unsigned int bufsize;
1125 int used_number = 0;
1131 LASSERT(pg_count > 0);
1133 __page = alloc_page(GFP_KERNEL);
1137 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1140 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1141 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1145 buffer = kmap(__page);
1146 guard_start = (__u16 *)buffer;
1147 guard_number = PAGE_SIZE / sizeof(*guard_start);
1148 while (nob > 0 && pg_count > 0) {
1149 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1151 /* corrupt the data before we compute the checksum, to
1152 * simulate an OST->client data error */
1153 if (unlikely(i == 0 && opc == OST_READ &&
1154 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1155 unsigned char *ptr = kmap(pga[i]->pg);
1156 int off = pga[i]->off & ~PAGE_MASK;
1158 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1163 * The left guard number should be able to hold checksums of a
1166 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg,
1167 pga[i]->off & ~PAGE_MASK,
1169 guard_start + used_number,
1170 guard_number - used_number,
1176 used_number += used;
1177 if (used_number == guard_number) {
1178 cfs_crypto_hash_update_page(req, __page, 0,
1179 used_number * sizeof(*guard_start));
1183 nob -= pga[i]->count;
1191 if (used_number != 0)
1192 cfs_crypto_hash_update_page(req, __page, 0,
1193 used_number * sizeof(*guard_start));
1195 bufsize = sizeof(cksum);
1196 cfs_crypto_hash_final(req, (unsigned char *)&cksum, &bufsize);
1198 /* For sending we only compute the wrong checksum instead
1199 * of corrupting the data so it is still correct on a redo */
1200 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1205 __free_page(__page);
1208 #else /* !CONFIG_CRC_T10DIF */
1209 #define obd_dif_ip_fn NULL
1210 #define obd_dif_crc_fn NULL
1211 #define osc_checksum_bulk_t10pi(name, nob, pgc, pga, opc, fn, ssize, csum) \
1213 #endif /* CONFIG_CRC_T10DIF */
1215 static int osc_checksum_bulk(int nob, size_t pg_count,
1216 struct brw_page **pga, int opc,
1217 enum cksum_types cksum_type,
1221 struct ahash_request *req;
1222 unsigned int bufsize;
1223 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1225 LASSERT(pg_count > 0);
1227 req = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1229 CERROR("Unable to initialize checksum hash %s\n",
1230 cfs_crypto_hash_name(cfs_alg));
1231 return PTR_ERR(req);
1234 while (nob > 0 && pg_count > 0) {
1235 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1237 /* corrupt the data before we compute the checksum, to
1238 * simulate an OST->client data error */
1239 if (i == 0 && opc == OST_READ &&
1240 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1241 unsigned char *ptr = kmap(pga[i]->pg);
1242 int off = pga[i]->off & ~PAGE_MASK;
1244 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1247 cfs_crypto_hash_update_page(req, pga[i]->pg,
1248 pga[i]->off & ~PAGE_MASK,
1250 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1251 (int)(pga[i]->off & ~PAGE_MASK));
1253 nob -= pga[i]->count;
1258 bufsize = sizeof(*cksum);
1259 cfs_crypto_hash_final(req, (unsigned char *)cksum, &bufsize);
1261 /* For sending we only compute the wrong checksum instead
1262 * of corrupting the data so it is still correct on a redo */
1263 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1269 static int osc_checksum_bulk_rw(const char *obd_name,
1270 enum cksum_types cksum_type,
1271 int nob, size_t pg_count,
1272 struct brw_page **pga, int opc,
1275 obd_dif_csum_fn *fn = NULL;
1276 int sector_size = 0;
1280 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1283 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1284 opc, fn, sector_size, check_sum);
1286 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1293 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1294 u32 page_count, struct brw_page **pga,
1295 struct ptlrpc_request **reqp, int resend)
1297 struct ptlrpc_request *req;
1298 struct ptlrpc_bulk_desc *desc;
1299 struct ost_body *body;
1300 struct obd_ioobj *ioobj;
1301 struct niobuf_remote *niobuf;
1302 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1303 struct osc_brw_async_args *aa;
1304 struct req_capsule *pill;
1305 struct brw_page *pg_prev;
1307 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1310 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1311 RETURN(-ENOMEM); /* Recoverable */
1312 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1313 RETURN(-EINVAL); /* Fatal */
1315 if ((cmd & OBD_BRW_WRITE) != 0) {
1317 req = ptlrpc_request_alloc_pool(cli->cl_import,
1319 &RQF_OST_BRW_WRITE);
1322 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1327 for (niocount = i = 1; i < page_count; i++) {
1328 if (!can_merge_pages(pga[i - 1], pga[i]))
1332 pill = &req->rq_pill;
1333 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1335 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1336 niocount * sizeof(*niobuf));
1338 for (i = 0; i < page_count; i++)
1339 short_io_size += pga[i]->count;
1341 /* Check if read/write is small enough to be a short io. */
1342 if (short_io_size > cli->cl_max_short_io_bytes || niocount > 1 ||
1343 !imp_connect_shortio(cli->cl_import))
1346 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1347 opc == OST_READ ? 0 : short_io_size);
1348 if (opc == OST_READ)
1349 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1352 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1354 ptlrpc_request_free(req);
1357 osc_set_io_portal(req);
1359 ptlrpc_at_set_req_timeout(req);
1360 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1362 req->rq_no_retry_einprogress = 1;
1364 if (short_io_size != 0) {
1366 short_io_buf = NULL;
1370 desc = ptlrpc_prep_bulk_imp(req, page_count,
1371 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1372 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1373 PTLRPC_BULK_PUT_SINK) |
1374 PTLRPC_BULK_BUF_KIOV,
1376 &ptlrpc_bulk_kiov_pin_ops);
1379 GOTO(out, rc = -ENOMEM);
1380 /* NB request now owns desc and will free it when it gets freed */
1382 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1383 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1384 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1385 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1387 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1389 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1390 * and from_kgid(), because they are asynchronous. Fortunately, variable
1391 * oa contains valid o_uid and o_gid in these two operations.
1392 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1393 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1394 * other process logic */
1395 body->oa.o_uid = oa->o_uid;
1396 body->oa.o_gid = oa->o_gid;
1398 obdo_to_ioobj(oa, ioobj);
1399 ioobj->ioo_bufcnt = niocount;
1400 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1401 * that might be send for this request. The actual number is decided
1402 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1403 * "max - 1" for old client compatibility sending "0", and also so the
1404 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1406 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1408 ioobj_max_brw_set(ioobj, 0);
1410 if (short_io_size != 0) {
1411 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1412 body->oa.o_valid |= OBD_MD_FLFLAGS;
1413 body->oa.o_flags = 0;
1415 body->oa.o_flags |= OBD_FL_SHORT_IO;
1416 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1418 if (opc == OST_WRITE) {
1419 short_io_buf = req_capsule_client_get(pill,
1421 LASSERT(short_io_buf != NULL);
1425 LASSERT(page_count > 0);
1427 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1428 struct brw_page *pg = pga[i];
1429 int poff = pg->off & ~PAGE_MASK;
1431 LASSERT(pg->count > 0);
1432 /* make sure there is no gap in the middle of page array */
1433 LASSERTF(page_count == 1 ||
1434 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1435 ergo(i > 0 && i < page_count - 1,
1436 poff == 0 && pg->count == PAGE_SIZE) &&
1437 ergo(i == page_count - 1, poff == 0)),
1438 "i: %d/%d pg: %p off: %llu, count: %u\n",
1439 i, page_count, pg, pg->off, pg->count);
1440 LASSERTF(i == 0 || pg->off > pg_prev->off,
1441 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1442 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1444 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1445 pg_prev->pg, page_private(pg_prev->pg),
1446 pg_prev->pg->index, pg_prev->off);
1447 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1448 (pg->flag & OBD_BRW_SRVLOCK));
1449 if (short_io_size != 0 && opc == OST_WRITE) {
1450 unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1452 LASSERT(short_io_size >= requested_nob + pg->count);
1453 memcpy(short_io_buf + requested_nob,
1456 ll_kunmap_atomic(ptr, KM_USER0);
1457 } else if (short_io_size == 0) {
1458 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1461 requested_nob += pg->count;
1463 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1465 niobuf->rnb_len += pg->count;
1467 niobuf->rnb_offset = pg->off;
1468 niobuf->rnb_len = pg->count;
1469 niobuf->rnb_flags = pg->flag;
1474 LASSERTF((void *)(niobuf - niocount) ==
1475 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1476 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1477 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1479 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1481 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1482 body->oa.o_valid |= OBD_MD_FLFLAGS;
1483 body->oa.o_flags = 0;
1485 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1488 if (osc_should_shrink_grant(cli))
1489 osc_shrink_grant_local(cli, &body->oa);
1491 /* size[REQ_REC_OFF] still sizeof (*body) */
1492 if (opc == OST_WRITE) {
1493 if (cli->cl_checksum &&
1494 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1495 /* store cl_cksum_type in a local variable since
1496 * it can be changed via lprocfs */
1497 enum cksum_types cksum_type = cli->cl_cksum_type;
1499 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1500 body->oa.o_flags = 0;
1502 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1504 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1506 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1507 requested_nob, page_count,
1511 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1515 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1518 /* save this in 'oa', too, for later checking */
1519 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1520 oa->o_flags |= obd_cksum_type_pack(obd_name,
1523 /* clear out the checksum flag, in case this is a
1524 * resend but cl_checksum is no longer set. b=11238 */
1525 oa->o_valid &= ~OBD_MD_FLCKSUM;
1527 oa->o_cksum = body->oa.o_cksum;
1528 /* 1 RC per niobuf */
1529 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1530 sizeof(__u32) * niocount);
1532 if (cli->cl_checksum &&
1533 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1534 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1535 body->oa.o_flags = 0;
1536 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1537 cli->cl_cksum_type);
1538 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1541 /* Client cksum has been already copied to wire obdo in previous
1542 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1543 * resent due to cksum error, this will allow Server to
1544 * check+dump pages on its side */
1546 ptlrpc_request_set_replen(req);
1548 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1549 aa = ptlrpc_req_async_args(req);
1551 aa->aa_requested_nob = requested_nob;
1552 aa->aa_nio_count = niocount;
1553 aa->aa_page_count = page_count;
1557 INIT_LIST_HEAD(&aa->aa_oaps);
1560 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1561 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1562 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1563 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1567 ptlrpc_req_finished(req);
1571 char dbgcksum_file_name[PATH_MAX];
1573 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1574 struct brw_page **pga, __u32 server_cksum,
1582 /* will only keep dump of pages on first error for the same range in
1583 * file/fid, not during the resends/retries. */
1584 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1585 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1586 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1587 libcfs_debug_file_path_arr :
1588 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1589 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1590 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1591 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1593 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1594 client_cksum, server_cksum);
1595 filp = filp_open(dbgcksum_file_name,
1596 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1600 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1601 "checksum error: rc = %d\n", dbgcksum_file_name,
1604 CERROR("%s: can't open to dump pages with checksum "
1605 "error: rc = %d\n", dbgcksum_file_name, rc);
1609 for (i = 0; i < page_count; i++) {
1610 len = pga[i]->count;
1611 buf = kmap(pga[i]->pg);
1613 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1615 CERROR("%s: wanted to write %u but got %d "
1616 "error\n", dbgcksum_file_name, len, rc);
1621 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1622 dbgcksum_file_name, rc);
1627 rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1629 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1630 filp_close(filp, NULL);
1635 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1636 __u32 client_cksum, __u32 server_cksum,
1637 struct osc_brw_async_args *aa)
1639 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1640 enum cksum_types cksum_type;
1641 obd_dif_csum_fn *fn = NULL;
1642 int sector_size = 0;
1647 if (server_cksum == client_cksum) {
1648 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1652 if (aa->aa_cli->cl_checksum_dump)
1653 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1654 server_cksum, client_cksum);
1656 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1659 switch (cksum_type) {
1660 case OBD_CKSUM_T10IP512:
1664 case OBD_CKSUM_T10IP4K:
1668 case OBD_CKSUM_T10CRC512:
1669 fn = obd_dif_crc_fn;
1672 case OBD_CKSUM_T10CRC4K:
1673 fn = obd_dif_crc_fn;
1681 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1682 aa->aa_page_count, aa->aa_ppga,
1683 OST_WRITE, fn, sector_size,
1686 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1687 aa->aa_ppga, OST_WRITE, cksum_type,
1691 msg = "failed to calculate the client write checksum";
1692 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1693 msg = "the server did not use the checksum type specified in "
1694 "the original request - likely a protocol problem";
1695 else if (new_cksum == server_cksum)
1696 msg = "changed on the client after we checksummed it - "
1697 "likely false positive due to mmap IO (bug 11742)";
1698 else if (new_cksum == client_cksum)
1699 msg = "changed in transit before arrival at OST";
1701 msg = "changed in transit AND doesn't match the original - "
1702 "likely false positive due to mmap IO (bug 11742)";
1704 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1705 DFID " object "DOSTID" extent [%llu-%llu], original "
1706 "client csum %x (type %x), server csum %x (type %x),"
1707 " client csum now %x\n",
1708 obd_name, msg, libcfs_nid2str(peer->nid),
1709 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1710 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1711 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1712 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1713 aa->aa_ppga[aa->aa_page_count - 1]->off +
1714 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1716 obd_cksum_type_unpack(aa->aa_oa->o_flags),
1717 server_cksum, cksum_type, new_cksum);
1721 /* Note rc enters this function as number of bytes transferred */
1722 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1724 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1725 struct client_obd *cli = aa->aa_cli;
1726 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1727 const struct lnet_process_id *peer =
1728 &req->rq_import->imp_connection->c_peer;
1729 struct ost_body *body;
1730 u32 client_cksum = 0;
1733 if (rc < 0 && rc != -EDQUOT) {
1734 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1738 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1739 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1741 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1745 /* set/clear over quota flag for a uid/gid/projid */
1746 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1747 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1748 unsigned qid[LL_MAXQUOTAS] = {
1749 body->oa.o_uid, body->oa.o_gid,
1750 body->oa.o_projid };
1751 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1752 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1753 body->oa.o_valid, body->oa.o_flags);
1754 osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
1758 osc_update_grant(cli, body);
1763 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1764 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1766 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1768 CERROR("Unexpected +ve rc %d\n", rc);
1772 if (req->rq_bulk != NULL &&
1773 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1776 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1777 check_write_checksum(&body->oa, peer, client_cksum,
1778 body->oa.o_cksum, aa))
1781 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1782 aa->aa_page_count, aa->aa_ppga);
1786 /* The rest of this function executes only for OST_READs */
1788 if (req->rq_bulk == NULL) {
1789 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1791 LASSERT(rc == req->rq_status);
1793 /* if unwrap_bulk failed, return -EAGAIN to retry */
1794 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1797 GOTO(out, rc = -EAGAIN);
1799 if (rc > aa->aa_requested_nob) {
1800 CERROR("Unexpected rc %d (%d requested)\n", rc,
1801 aa->aa_requested_nob);
1805 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1806 CERROR ("Unexpected rc %d (%d transferred)\n",
1807 rc, req->rq_bulk->bd_nob_transferred);
1811 if (req->rq_bulk == NULL) {
1813 int nob, pg_count, i = 0;
1816 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1817 pg_count = aa->aa_page_count;
1818 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1821 while (nob > 0 && pg_count > 0) {
1823 int count = aa->aa_ppga[i]->count > nob ?
1824 nob : aa->aa_ppga[i]->count;
1826 CDEBUG(D_CACHE, "page %p count %d\n",
1827 aa->aa_ppga[i]->pg, count);
1828 ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1829 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1831 ll_kunmap_atomic((void *) ptr, KM_USER0);
1840 if (rc < aa->aa_requested_nob)
1841 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1843 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1844 static int cksum_counter;
1845 u32 server_cksum = body->oa.o_cksum;
1848 enum cksum_types cksum_type;
1849 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1850 body->oa.o_flags : 0;
1852 cksum_type = obd_cksum_type_unpack(o_flags);
1853 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1854 aa->aa_page_count, aa->aa_ppga,
1855 OST_READ, &client_cksum);
1859 if (req->rq_bulk != NULL &&
1860 peer->nid != req->rq_bulk->bd_sender) {
1862 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1865 if (server_cksum != client_cksum) {
1866 struct ost_body *clbody;
1867 u32 page_count = aa->aa_page_count;
1869 clbody = req_capsule_client_get(&req->rq_pill,
1871 if (cli->cl_checksum_dump)
1872 dump_all_bulk_pages(&clbody->oa, page_count,
1873 aa->aa_ppga, server_cksum,
1876 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1877 "%s%s%s inode "DFID" object "DOSTID
1878 " extent [%llu-%llu], client %x, "
1879 "server %x, cksum_type %x\n",
1881 libcfs_nid2str(peer->nid),
1883 clbody->oa.o_valid & OBD_MD_FLFID ?
1884 clbody->oa.o_parent_seq : 0ULL,
1885 clbody->oa.o_valid & OBD_MD_FLFID ?
1886 clbody->oa.o_parent_oid : 0,
1887 clbody->oa.o_valid & OBD_MD_FLFID ?
1888 clbody->oa.o_parent_ver : 0,
1889 POSTID(&body->oa.o_oi),
1890 aa->aa_ppga[0]->off,
1891 aa->aa_ppga[page_count-1]->off +
1892 aa->aa_ppga[page_count-1]->count - 1,
1893 client_cksum, server_cksum,
1896 aa->aa_oa->o_cksum = client_cksum;
1900 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1903 } else if (unlikely(client_cksum)) {
1904 static int cksum_missed;
1907 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1908 CERROR("Checksum %u requested from %s but not sent\n",
1909 cksum_missed, libcfs_nid2str(peer->nid));
1915 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1916 aa->aa_oa, &body->oa);
1921 static int osc_brw_redo_request(struct ptlrpc_request *request,
1922 struct osc_brw_async_args *aa, int rc)
1924 struct ptlrpc_request *new_req;
1925 struct osc_brw_async_args *new_aa;
1926 struct osc_async_page *oap;
1929 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1930 "redo for recoverable error %d", rc);
1932 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1933 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1934 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1935 aa->aa_ppga, &new_req, 1);
1939 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1940 if (oap->oap_request != NULL) {
1941 LASSERTF(request == oap->oap_request,
1942 "request %p != oap_request %p\n",
1943 request, oap->oap_request);
1944 if (oap->oap_interrupted) {
1945 ptlrpc_req_finished(new_req);
1951 * New request takes over pga and oaps from old request.
1952 * Note that copying a list_head doesn't work, need to move it...
1955 new_req->rq_interpret_reply = request->rq_interpret_reply;
1956 new_req->rq_async_args = request->rq_async_args;
1957 new_req->rq_commit_cb = request->rq_commit_cb;
1958 /* cap resend delay to the current request timeout, this is similar to
1959 * what ptlrpc does (see after_reply()) */
1960 if (aa->aa_resends > new_req->rq_timeout)
1961 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1963 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1964 new_req->rq_generation_set = 1;
1965 new_req->rq_import_generation = request->rq_import_generation;
1967 new_aa = ptlrpc_req_async_args(new_req);
1969 INIT_LIST_HEAD(&new_aa->aa_oaps);
1970 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1971 INIT_LIST_HEAD(&new_aa->aa_exts);
1972 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1973 new_aa->aa_resends = aa->aa_resends;
1975 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1976 if (oap->oap_request) {
1977 ptlrpc_req_finished(oap->oap_request);
1978 oap->oap_request = ptlrpc_request_addref(new_req);
1982 /* XXX: This code will run into problem if we're going to support
1983 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1984 * and wait for all of them to be finished. We should inherit request
1985 * set from old request. */
1986 ptlrpcd_add_req(new_req);
1988 DEBUG_REQ(D_INFO, new_req, "new request");
1993 * ugh, we want disk allocation on the target to happen in offset order. we'll
1994 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1995 * fine for our small page arrays and doesn't require allocation. its an
1996 * insertion sort that swaps elements that are strides apart, shrinking the
1997 * stride down until its '1' and the array is sorted.
1999 static void sort_brw_pages(struct brw_page **array, int num)
2002 struct brw_page *tmp;
2006 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
2011 for (i = stride ; i < num ; i++) {
2014 while (j >= stride && array[j - stride]->off > tmp->off) {
2015 array[j] = array[j - stride];
2020 } while (stride > 1);
2023 static void osc_release_ppga(struct brw_page **ppga, size_t count)
2025 LASSERT(ppga != NULL);
2026 OBD_FREE(ppga, sizeof(*ppga) * count);
2029 static int brw_interpret(const struct lu_env *env,
2030 struct ptlrpc_request *req, void *args, int rc)
2032 struct osc_brw_async_args *aa = args;
2033 struct osc_extent *ext;
2034 struct osc_extent *tmp;
2035 struct client_obd *cli = aa->aa_cli;
2036 unsigned long transferred = 0;
2040 rc = osc_brw_fini_request(req, rc);
2041 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2043 * When server returns -EINPROGRESS, client should always retry
2044 * regardless of the number of times the bulk was resent already.
2046 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
2047 if (req->rq_import_generation !=
2048 req->rq_import->imp_generation) {
2049 CDEBUG(D_HA, "%s: resend cross eviction for object: "
2050 ""DOSTID", rc = %d.\n",
2051 req->rq_import->imp_obd->obd_name,
2052 POSTID(&aa->aa_oa->o_oi), rc);
2053 } else if (rc == -EINPROGRESS ||
2054 client_should_resend(aa->aa_resends, aa->aa_cli)) {
2055 rc = osc_brw_redo_request(req, aa, rc);
2057 CERROR("%s: too many resent retries for object: "
2058 "%llu:%llu, rc = %d.\n",
2059 req->rq_import->imp_obd->obd_name,
2060 POSTID(&aa->aa_oa->o_oi), rc);
2065 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2070 struct obdo *oa = aa->aa_oa;
2071 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
2072 unsigned long valid = 0;
2073 struct cl_object *obj;
2074 struct osc_async_page *last;
2076 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2077 obj = osc2cl(last->oap_obj);
2079 cl_object_attr_lock(obj);
2080 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2081 attr->cat_blocks = oa->o_blocks;
2082 valid |= CAT_BLOCKS;
2084 if (oa->o_valid & OBD_MD_FLMTIME) {
2085 attr->cat_mtime = oa->o_mtime;
2088 if (oa->o_valid & OBD_MD_FLATIME) {
2089 attr->cat_atime = oa->o_atime;
2092 if (oa->o_valid & OBD_MD_FLCTIME) {
2093 attr->cat_ctime = oa->o_ctime;
2097 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2098 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2099 loff_t last_off = last->oap_count + last->oap_obj_off +
2102 /* Change file size if this is an out of quota or
2103 * direct IO write and it extends the file size */
2104 if (loi->loi_lvb.lvb_size < last_off) {
2105 attr->cat_size = last_off;
2108 /* Extend KMS if it's not a lockless write */
2109 if (loi->loi_kms < last_off &&
2110 oap2osc_page(last)->ops_srvlock == 0) {
2111 attr->cat_kms = last_off;
2117 cl_object_attr_update(env, obj, attr, valid);
2118 cl_object_attr_unlock(obj);
2120 OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
2122 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2123 osc_inc_unstable_pages(req);
2125 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2126 list_del_init(&ext->oe_link);
2127 osc_extent_finish(env, ext, 1,
2128 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2130 LASSERT(list_empty(&aa->aa_exts));
2131 LASSERT(list_empty(&aa->aa_oaps));
2133 transferred = (req->rq_bulk == NULL ? /* short io */
2134 aa->aa_requested_nob :
2135 req->rq_bulk->bd_nob_transferred);
2137 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2138 ptlrpc_lprocfs_brw(req, transferred);
2140 spin_lock(&cli->cl_loi_list_lock);
2141 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2142 * is called so we know whether to go to sync BRWs or wait for more
2143 * RPCs to complete */
2144 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2145 cli->cl_w_in_flight--;
2147 cli->cl_r_in_flight--;
2148 osc_wake_cache_waiters(cli);
2149 spin_unlock(&cli->cl_loi_list_lock);
2151 osc_io_unplug(env, cli, NULL);
2155 static void brw_commit(struct ptlrpc_request *req)
2157 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2158 * this called via the rq_commit_cb, I need to ensure
2159 * osc_dec_unstable_pages is still called. Otherwise unstable
2160 * pages may be leaked. */
2161 spin_lock(&req->rq_lock);
2162 if (likely(req->rq_unstable)) {
2163 req->rq_unstable = 0;
2164 spin_unlock(&req->rq_lock);
2166 osc_dec_unstable_pages(req);
2168 req->rq_committed = 1;
2169 spin_unlock(&req->rq_lock);
2174 * Build an RPC by the list of extent @ext_list. The caller must ensure
2175 * that the total pages in this list are NOT over max pages per RPC.
2176 * Extents in the list must be in OES_RPC state.
2178 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2179 struct list_head *ext_list, int cmd)
2181 struct ptlrpc_request *req = NULL;
2182 struct osc_extent *ext;
2183 struct brw_page **pga = NULL;
2184 struct osc_brw_async_args *aa = NULL;
2185 struct obdo *oa = NULL;
2186 struct osc_async_page *oap;
2187 struct osc_object *obj = NULL;
2188 struct cl_req_attr *crattr = NULL;
2189 loff_t starting_offset = OBD_OBJECT_EOF;
2190 loff_t ending_offset = 0;
2194 bool soft_sync = false;
2195 bool interrupted = false;
2196 bool ndelay = false;
2200 __u32 layout_version = 0;
2201 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
2202 struct ost_body *body;
2204 LASSERT(!list_empty(ext_list));
2206 /* add pages into rpc_list to build BRW rpc */
2207 list_for_each_entry(ext, ext_list, oe_link) {
2208 LASSERT(ext->oe_state == OES_RPC);
2209 mem_tight |= ext->oe_memalloc;
2210 grant += ext->oe_grants;
2211 page_count += ext->oe_nr_pages;
2212 layout_version = MAX(layout_version, ext->oe_layout_version);
2217 soft_sync = osc_over_unstable_soft_limit(cli);
2219 mpflag = cfs_memory_pressure_get_and_set();
2221 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2223 GOTO(out, rc = -ENOMEM);
2225 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2227 GOTO(out, rc = -ENOMEM);
2230 list_for_each_entry(ext, ext_list, oe_link) {
2231 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2233 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2235 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2236 pga[i] = &oap->oap_brw_page;
2237 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2240 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2241 if (starting_offset == OBD_OBJECT_EOF ||
2242 starting_offset > oap->oap_obj_off)
2243 starting_offset = oap->oap_obj_off;
2245 LASSERT(oap->oap_page_off == 0);
2246 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2247 ending_offset = oap->oap_obj_off +
2250 LASSERT(oap->oap_page_off + oap->oap_count ==
2252 if (oap->oap_interrupted)
2259 /* first page in the list */
2260 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2262 crattr = &osc_env_info(env)->oti_req_attr;
2263 memset(crattr, 0, sizeof(*crattr));
2264 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2265 crattr->cra_flags = ~0ULL;
2266 crattr->cra_page = oap2cl_page(oap);
2267 crattr->cra_oa = oa;
2268 cl_req_attr_set(env, osc2cl(obj), crattr);
2270 if (cmd == OBD_BRW_WRITE) {
2271 oa->o_grant_used = grant;
2272 if (layout_version > 0) {
2273 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2274 PFID(&oa->o_oi.oi_fid), layout_version);
2276 oa->o_layout_version = layout_version;
2277 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2281 sort_brw_pages(pga, page_count);
2282 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2284 CERROR("prep_req failed: %d\n", rc);
2288 req->rq_commit_cb = brw_commit;
2289 req->rq_interpret_reply = brw_interpret;
2290 req->rq_memalloc = mem_tight != 0;
2291 oap->oap_request = ptlrpc_request_addref(req);
2292 if (interrupted && !req->rq_intr)
2293 ptlrpc_mark_interrupted(req);
2295 req->rq_no_resend = req->rq_no_delay = 1;
2296 /* probably set a shorter timeout value.
2297 * to handle ETIMEDOUT in brw_interpret() correctly. */
2298 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2301 /* Need to update the timestamps after the request is built in case
2302 * we race with setattr (locally or in queue at OST). If OST gets
2303 * later setattr before earlier BRW (as determined by the request xid),
2304 * the OST will not use BRW timestamps. Sadly, there is no obvious
2305 * way to do this in a single call. bug 10150 */
2306 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2307 crattr->cra_oa = &body->oa;
2308 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2309 cl_req_attr_set(env, osc2cl(obj), crattr);
2310 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2312 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2313 aa = ptlrpc_req_async_args(req);
2314 INIT_LIST_HEAD(&aa->aa_oaps);
2315 list_splice_init(&rpc_list, &aa->aa_oaps);
2316 INIT_LIST_HEAD(&aa->aa_exts);
2317 list_splice_init(ext_list, &aa->aa_exts);
2319 spin_lock(&cli->cl_loi_list_lock);
2320 starting_offset >>= PAGE_SHIFT;
2321 if (cmd == OBD_BRW_READ) {
2322 cli->cl_r_in_flight++;
2323 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2324 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2325 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2326 starting_offset + 1);
2328 cli->cl_w_in_flight++;
2329 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2330 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2331 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2332 starting_offset + 1);
2334 spin_unlock(&cli->cl_loi_list_lock);
2336 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2337 page_count, aa, cli->cl_r_in_flight,
2338 cli->cl_w_in_flight);
2339 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2341 ptlrpcd_add_req(req);
2347 cfs_memory_pressure_restore(mpflag);
2350 LASSERT(req == NULL);
2353 OBD_SLAB_FREE_PTR(oa, osc_obdo_kmem);
2355 OBD_FREE(pga, sizeof(*pga) * page_count);
2356 /* this should happen rarely and is pretty bad, it makes the
2357 * pending list not follow the dirty order */
2358 while (!list_empty(ext_list)) {
2359 ext = list_entry(ext_list->next, struct osc_extent,
2361 list_del_init(&ext->oe_link);
2362 osc_extent_finish(env, ext, 0, rc);
2368 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2372 LASSERT(lock != NULL);
2374 lock_res_and_lock(lock);
2376 if (lock->l_ast_data == NULL)
2377 lock->l_ast_data = data;
2378 if (lock->l_ast_data == data)
2381 unlock_res_and_lock(lock);
2386 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2387 void *cookie, struct lustre_handle *lockh,
2388 enum ldlm_mode mode, __u64 *flags, bool speculative,
2391 bool intent = *flags & LDLM_FL_HAS_INTENT;
2395 /* The request was created before ldlm_cli_enqueue call. */
2396 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2397 struct ldlm_reply *rep;
2399 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2400 LASSERT(rep != NULL);
2402 rep->lock_policy_res1 =
2403 ptlrpc_status_ntoh(rep->lock_policy_res1);
2404 if (rep->lock_policy_res1)
2405 errcode = rep->lock_policy_res1;
2407 *flags |= LDLM_FL_LVB_READY;
2408 } else if (errcode == ELDLM_OK) {
2409 *flags |= LDLM_FL_LVB_READY;
2412 /* Call the update callback. */
2413 rc = (*upcall)(cookie, lockh, errcode);
2415 /* release the reference taken in ldlm_cli_enqueue() */
2416 if (errcode == ELDLM_LOCK_MATCHED)
2418 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2419 ldlm_lock_decref(lockh, mode);
2424 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2427 struct osc_enqueue_args *aa = args;
2428 struct ldlm_lock *lock;
2429 struct lustre_handle *lockh = &aa->oa_lockh;
2430 enum ldlm_mode mode = aa->oa_mode;
2431 struct ost_lvb *lvb = aa->oa_lvb;
2432 __u32 lvb_len = sizeof(*lvb);
2437 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2439 lock = ldlm_handle2lock(lockh);
2440 LASSERTF(lock != NULL,
2441 "lockh %#llx, req %p, aa %p - client evicted?\n",
2442 lockh->cookie, req, aa);
2444 /* Take an additional reference so that a blocking AST that
2445 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2446 * to arrive after an upcall has been executed by
2447 * osc_enqueue_fini(). */
2448 ldlm_lock_addref(lockh, mode);
2450 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2451 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2453 /* Let CP AST to grant the lock first. */
2454 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2456 if (aa->oa_speculative) {
2457 LASSERT(aa->oa_lvb == NULL);
2458 LASSERT(aa->oa_flags == NULL);
2459 aa->oa_flags = &flags;
2462 /* Complete obtaining the lock procedure. */
2463 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2464 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2466 /* Complete osc stuff. */
2467 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2468 aa->oa_flags, aa->oa_speculative, rc);
2470 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2472 ldlm_lock_decref(lockh, mode);
2473 LDLM_LOCK_PUT(lock);
2477 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2479 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2480 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2481 * other synchronous requests, however keeping some locks and trying to obtain
2482 * others may take a considerable amount of time in a case of ost failure; and
2483 * when other sync requests do not get released lock from a client, the client
2484 * is evicted from the cluster -- such scenarious make the life difficult, so
2485 * release locks just after they are obtained. */
2486 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2487 __u64 *flags, union ldlm_policy_data *policy,
2488 struct ost_lvb *lvb, int kms_valid,
2489 osc_enqueue_upcall_f upcall, void *cookie,
2490 struct ldlm_enqueue_info *einfo,
2491 struct ptlrpc_request_set *rqset, int async,
2494 struct obd_device *obd = exp->exp_obd;
2495 struct lustre_handle lockh = { 0 };
2496 struct ptlrpc_request *req = NULL;
2497 int intent = *flags & LDLM_FL_HAS_INTENT;
2498 __u64 match_flags = *flags;
2499 enum ldlm_mode mode;
2503 /* Filesystem lock extents are extended to page boundaries so that
2504 * dealing with the page cache is a little smoother. */
2505 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2506 policy->l_extent.end |= ~PAGE_MASK;
2509 * kms is not valid when either object is completely fresh (so that no
2510 * locks are cached), or object was evicted. In the latter case cached
2511 * lock cannot be used, because it would prime inode state with
2512 * potentially stale LVB.
2517 /* Next, search for already existing extent locks that will cover us */
2518 /* If we're trying to read, we also search for an existing PW lock. The
2519 * VFS and page cache already protect us locally, so lots of readers/
2520 * writers can share a single PW lock.
2522 * There are problems with conversion deadlocks, so instead of
2523 * converting a read lock to a write lock, we'll just enqueue a new
2526 * At some point we should cancel the read lock instead of making them
2527 * send us a blocking callback, but there are problems with canceling
2528 * locks out from other users right now, too. */
2529 mode = einfo->ei_mode;
2530 if (einfo->ei_mode == LCK_PR)
2532 /* Normal lock requests must wait for the LVB to be ready before
2533 * matching a lock; speculative lock requests do not need to,
2534 * because they will not actually use the lock. */
2536 match_flags |= LDLM_FL_LVB_READY;
2538 match_flags |= LDLM_FL_BLOCK_GRANTED;
2539 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2540 einfo->ei_type, policy, mode, &lockh, 0);
2542 struct ldlm_lock *matched;
2544 if (*flags & LDLM_FL_TEST_LOCK)
2547 matched = ldlm_handle2lock(&lockh);
2549 /* This DLM lock request is speculative, and does not
2550 * have an associated IO request. Therefore if there
2551 * is already a DLM lock, it wll just inform the
2552 * caller to cancel the request for this stripe.*/
2553 lock_res_and_lock(matched);
2554 if (ldlm_extent_equal(&policy->l_extent,
2555 &matched->l_policy_data.l_extent))
2559 unlock_res_and_lock(matched);
2561 ldlm_lock_decref(&lockh, mode);
2562 LDLM_LOCK_PUT(matched);
2564 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2565 *flags |= LDLM_FL_LVB_READY;
2567 /* We already have a lock, and it's referenced. */
2568 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2570 ldlm_lock_decref(&lockh, mode);
2571 LDLM_LOCK_PUT(matched);
2574 ldlm_lock_decref(&lockh, mode);
2575 LDLM_LOCK_PUT(matched);
2580 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2584 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2585 &RQF_LDLM_ENQUEUE_LVB);
2589 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2591 ptlrpc_request_free(req);
2595 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2597 ptlrpc_request_set_replen(req);
2600 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2601 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2603 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2604 sizeof(*lvb), LVB_T_OST, &lockh, async);
2607 struct osc_enqueue_args *aa;
2608 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2609 aa = ptlrpc_req_async_args(req);
2611 aa->oa_mode = einfo->ei_mode;
2612 aa->oa_type = einfo->ei_type;
2613 lustre_handle_copy(&aa->oa_lockh, &lockh);
2614 aa->oa_upcall = upcall;
2615 aa->oa_cookie = cookie;
2616 aa->oa_speculative = speculative;
2618 aa->oa_flags = flags;
2621 /* speculative locks are essentially to enqueue
2622 * a DLM lock in advance, so we don't care
2623 * about the result of the enqueue. */
2625 aa->oa_flags = NULL;
2628 req->rq_interpret_reply = osc_enqueue_interpret;
2629 if (rqset == PTLRPCD_SET)
2630 ptlrpcd_add_req(req);
2632 ptlrpc_set_add_req(rqset, req);
2633 } else if (intent) {
2634 ptlrpc_req_finished(req);
2639 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2640 flags, speculative, rc);
2642 ptlrpc_req_finished(req);
2647 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2648 enum ldlm_type type, union ldlm_policy_data *policy,
2649 enum ldlm_mode mode, __u64 *flags, void *data,
2650 struct lustre_handle *lockh, int unref)
2652 struct obd_device *obd = exp->exp_obd;
2653 __u64 lflags = *flags;
2657 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2660 /* Filesystem lock extents are extended to page boundaries so that
2661 * dealing with the page cache is a little smoother */
2662 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2663 policy->l_extent.end |= ~PAGE_MASK;
2665 /* Next, search for already existing extent locks that will cover us */
2666 /* If we're trying to read, we also search for an existing PW lock. The
2667 * VFS and page cache already protect us locally, so lots of readers/
2668 * writers can share a single PW lock. */
2672 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2673 res_id, type, policy, rc, lockh, unref);
2674 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2678 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2680 LASSERT(lock != NULL);
2681 if (!osc_set_lock_data(lock, data)) {
2682 ldlm_lock_decref(lockh, rc);
2685 LDLM_LOCK_PUT(lock);
2690 static int osc_statfs_interpret(const struct lu_env *env,
2691 struct ptlrpc_request *req, void *args, int rc)
2693 struct osc_async_args *aa = args;
2694 struct obd_statfs *msfs;
2699 * The request has in fact never been sent due to issues at
2700 * a higher level (LOV). Exit immediately since the caller
2701 * is aware of the problem and takes care of the clean up.
2705 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2706 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2712 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2714 GOTO(out, rc = -EPROTO);
2716 *aa->aa_oi->oi_osfs = *msfs;
2718 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2723 static int osc_statfs_async(struct obd_export *exp,
2724 struct obd_info *oinfo, time64_t max_age,
2725 struct ptlrpc_request_set *rqset)
2727 struct obd_device *obd = class_exp2obd(exp);
2728 struct ptlrpc_request *req;
2729 struct osc_async_args *aa;
2733 /* We could possibly pass max_age in the request (as an absolute
2734 * timestamp or a "seconds.usec ago") so the target can avoid doing
2735 * extra calls into the filesystem if that isn't necessary (e.g.
2736 * during mount that would help a bit). Having relative timestamps
2737 * is not so great if request processing is slow, while absolute
2738 * timestamps are not ideal because they need time synchronization. */
2739 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2743 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2745 ptlrpc_request_free(req);
2748 ptlrpc_request_set_replen(req);
2749 req->rq_request_portal = OST_CREATE_PORTAL;
2750 ptlrpc_at_set_req_timeout(req);
2752 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2753 /* procfs requests not want stat in wait for avoid deadlock */
2754 req->rq_no_resend = 1;
2755 req->rq_no_delay = 1;
2758 req->rq_interpret_reply = osc_statfs_interpret;
2759 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2760 aa = ptlrpc_req_async_args(req);
2763 ptlrpc_set_add_req(rqset, req);
2767 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2768 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2770 struct obd_device *obd = class_exp2obd(exp);
2771 struct obd_statfs *msfs;
2772 struct ptlrpc_request *req;
2773 struct obd_import *imp = NULL;
2778 /*Since the request might also come from lprocfs, so we need
2779 *sync this with client_disconnect_export Bug15684*/
2780 down_read(&obd->u.cli.cl_sem);
2781 if (obd->u.cli.cl_import)
2782 imp = class_import_get(obd->u.cli.cl_import);
2783 up_read(&obd->u.cli.cl_sem);
2787 /* We could possibly pass max_age in the request (as an absolute
2788 * timestamp or a "seconds.usec ago") so the target can avoid doing
2789 * extra calls into the filesystem if that isn't necessary (e.g.
2790 * during mount that would help a bit). Having relative timestamps
2791 * is not so great if request processing is slow, while absolute
2792 * timestamps are not ideal because they need time synchronization. */
2793 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2795 class_import_put(imp);
2800 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2802 ptlrpc_request_free(req);
2805 ptlrpc_request_set_replen(req);
2806 req->rq_request_portal = OST_CREATE_PORTAL;
2807 ptlrpc_at_set_req_timeout(req);
2809 if (flags & OBD_STATFS_NODELAY) {
2810 /* procfs requests not want stat in wait for avoid deadlock */
2811 req->rq_no_resend = 1;
2812 req->rq_no_delay = 1;
2815 rc = ptlrpc_queue_wait(req);
2819 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2821 GOTO(out, rc = -EPROTO);
2827 ptlrpc_req_finished(req);
2831 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2832 void *karg, void __user *uarg)
2834 struct obd_device *obd = exp->exp_obd;
2835 struct obd_ioctl_data *data = karg;
2839 if (!try_module_get(THIS_MODULE)) {
2840 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2841 module_name(THIS_MODULE));
2845 case OBD_IOC_CLIENT_RECOVER:
2846 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
2847 data->ioc_inlbuf1, 0);
2851 case IOC_OSC_SET_ACTIVE:
2852 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
2857 CDEBUG(D_INODE, "%s: unrecognised ioctl %#x by %s: rc = %d\n",
2858 obd->obd_name, cmd, current_comm(), rc);
2862 module_put(THIS_MODULE);
2866 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2867 u32 keylen, void *key, u32 vallen, void *val,
2868 struct ptlrpc_request_set *set)
2870 struct ptlrpc_request *req;
2871 struct obd_device *obd = exp->exp_obd;
2872 struct obd_import *imp = class_exp2cliimp(exp);
2877 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2879 if (KEY_IS(KEY_CHECKSUM)) {
2880 if (vallen != sizeof(int))
2882 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2886 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2887 sptlrpc_conf_client_adapt(obd);
2891 if (KEY_IS(KEY_FLUSH_CTX)) {
2892 sptlrpc_import_flush_my_ctx(imp);
2896 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2897 struct client_obd *cli = &obd->u.cli;
2898 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2899 long target = *(long *)val;
2901 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2906 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2909 /* We pass all other commands directly to OST. Since nobody calls osc
2910 methods directly and everybody is supposed to go through LOV, we
2911 assume lov checked invalid values for us.
2912 The only recognised values so far are evict_by_nid and mds_conn.
2913 Even if something bad goes through, we'd get a -EINVAL from OST
2916 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2917 &RQF_OST_SET_GRANT_INFO :
2922 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2923 RCL_CLIENT, keylen);
2924 if (!KEY_IS(KEY_GRANT_SHRINK))
2925 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2926 RCL_CLIENT, vallen);
2927 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2929 ptlrpc_request_free(req);
2933 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2934 memcpy(tmp, key, keylen);
2935 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2938 memcpy(tmp, val, vallen);
2940 if (KEY_IS(KEY_GRANT_SHRINK)) {
2941 struct osc_grant_args *aa;
2944 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2945 aa = ptlrpc_req_async_args(req);
2946 OBD_SLAB_ALLOC_PTR_GFP(oa, osc_obdo_kmem, GFP_NOFS);
2948 ptlrpc_req_finished(req);
2951 *oa = ((struct ost_body *)val)->oa;
2953 req->rq_interpret_reply = osc_shrink_grant_interpret;
2956 ptlrpc_request_set_replen(req);
2957 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2958 LASSERT(set != NULL);
2959 ptlrpc_set_add_req(set, req);
2960 ptlrpc_check_set(NULL, set);
2962 ptlrpcd_add_req(req);
2967 EXPORT_SYMBOL(osc_set_info_async);
2969 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2970 struct obd_device *obd, struct obd_uuid *cluuid,
2971 struct obd_connect_data *data, void *localdata)
2973 struct client_obd *cli = &obd->u.cli;
2975 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2979 spin_lock(&cli->cl_loi_list_lock);
2980 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2981 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM) {
2982 /* restore ocd_grant_blkbits as client page bits */
2983 data->ocd_grant_blkbits = PAGE_SHIFT;
2984 grant += cli->cl_dirty_grant;
2986 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2988 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2989 lost_grant = cli->cl_lost_grant;
2990 cli->cl_lost_grant = 0;
2991 spin_unlock(&cli->cl_loi_list_lock);
2993 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2994 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2995 data->ocd_version, data->ocd_grant, lost_grant);
3000 EXPORT_SYMBOL(osc_reconnect);
3002 int osc_disconnect(struct obd_export *exp)
3004 struct obd_device *obd = class_exp2obd(exp);
3007 rc = client_disconnect_export(exp);
3009 * Initially we put del_shrink_grant before disconnect_export, but it
3010 * causes the following problem if setup (connect) and cleanup
3011 * (disconnect) are tangled together.
3012 * connect p1 disconnect p2
3013 * ptlrpc_connect_import
3014 * ............... class_manual_cleanup
3017 * ptlrpc_connect_interrupt
3019 * add this client to shrink list
3021 * Bang! grant shrink thread trigger the shrink. BUG18662
3023 osc_del_grant_list(&obd->u.cli);
3026 EXPORT_SYMBOL(osc_disconnect);
3028 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
3029 struct hlist_node *hnode, void *arg)
3031 struct lu_env *env = arg;
3032 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
3033 struct ldlm_lock *lock;
3034 struct osc_object *osc = NULL;
3038 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
3039 if (lock->l_ast_data != NULL && osc == NULL) {
3040 osc = lock->l_ast_data;
3041 cl_object_get(osc2cl(osc));
3044 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
3045 * by the 2nd round of ldlm_namespace_clean() call in
3046 * osc_import_event(). */
3047 ldlm_clear_cleaned(lock);
3052 osc_object_invalidate(env, osc);
3053 cl_object_put(env, osc2cl(osc));
3058 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3060 static int osc_import_event(struct obd_device *obd,
3061 struct obd_import *imp,
3062 enum obd_import_event event)
3064 struct client_obd *cli;
3068 LASSERT(imp->imp_obd == obd);
3071 case IMP_EVENT_DISCON: {
3073 spin_lock(&cli->cl_loi_list_lock);
3074 cli->cl_avail_grant = 0;
3075 cli->cl_lost_grant = 0;
3076 spin_unlock(&cli->cl_loi_list_lock);
3079 case IMP_EVENT_INACTIVE: {
3080 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3083 case IMP_EVENT_INVALIDATE: {
3084 struct ldlm_namespace *ns = obd->obd_namespace;
3088 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3090 env = cl_env_get(&refcheck);
3092 osc_io_unplug(env, &obd->u.cli, NULL);
3094 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3095 osc_ldlm_resource_invalidate,
3097 cl_env_put(env, &refcheck);
3099 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3104 case IMP_EVENT_ACTIVE: {
3105 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3108 case IMP_EVENT_OCD: {
3109 struct obd_connect_data *ocd = &imp->imp_connect_data;
3111 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3112 osc_init_grant(&obd->u.cli, ocd);
3115 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3116 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3118 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3121 case IMP_EVENT_DEACTIVATE: {
3122 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3125 case IMP_EVENT_ACTIVATE: {
3126 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3130 CERROR("Unknown import event %d\n", event);
3137 * Determine whether the lock can be canceled before replaying the lock
3138 * during recovery, see bug16774 for detailed information.
3140 * \retval zero the lock can't be canceled
3141 * \retval other ok to cancel
3143 static int osc_cancel_weight(struct ldlm_lock *lock)
3146 * Cancel all unused and granted extent lock.
3148 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3149 ldlm_is_granted(lock) &&
3150 osc_ldlm_weigh_ast(lock) == 0)
3156 static int brw_queue_work(const struct lu_env *env, void *data)
3158 struct client_obd *cli = data;
3160 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3162 osc_io_unplug(env, cli, NULL);
3166 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3168 struct client_obd *cli = &obd->u.cli;
3174 rc = ptlrpcd_addref();
3178 rc = client_obd_setup(obd, lcfg);
3180 GOTO(out_ptlrpcd, rc);
3183 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3184 if (IS_ERR(handler))
3185 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3186 cli->cl_writeback_work = handler;
3188 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3189 if (IS_ERR(handler))
3190 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3191 cli->cl_lru_work = handler;
3193 rc = osc_quota_setup(obd);
3195 GOTO(out_ptlrpcd_work, rc);
3197 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3198 osc_update_next_shrink(cli);
3203 if (cli->cl_writeback_work != NULL) {
3204 ptlrpcd_destroy_work(cli->cl_writeback_work);
3205 cli->cl_writeback_work = NULL;
3207 if (cli->cl_lru_work != NULL) {
3208 ptlrpcd_destroy_work(cli->cl_lru_work);
3209 cli->cl_lru_work = NULL;
3211 client_obd_cleanup(obd);
3216 EXPORT_SYMBOL(osc_setup_common);
3218 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3220 struct client_obd *cli = &obd->u.cli;
3228 rc = osc_setup_common(obd, lcfg);
3232 rc = osc_tunables_init(obd);
3237 * We try to control the total number of requests with a upper limit
3238 * osc_reqpool_maxreqcount. There might be some race which will cause
3239 * over-limit allocation, but it is fine.
3241 req_count = atomic_read(&osc_pool_req_count);
3242 if (req_count < osc_reqpool_maxreqcount) {
3243 adding = cli->cl_max_rpcs_in_flight + 2;
3244 if (req_count + adding > osc_reqpool_maxreqcount)
3245 adding = osc_reqpool_maxreqcount - req_count;
3247 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3248 atomic_add(added, &osc_pool_req_count);
3251 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3253 spin_lock(&osc_shrink_lock);
3254 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3255 spin_unlock(&osc_shrink_lock);
3256 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3257 cli->cl_import->imp_idle_debug = D_HA;
3262 int osc_precleanup_common(struct obd_device *obd)
3264 struct client_obd *cli = &obd->u.cli;
3268 * for echo client, export may be on zombie list, wait for
3269 * zombie thread to cull it, because cli.cl_import will be
3270 * cleared in client_disconnect_export():
3271 * class_export_destroy() -> obd_cleanup() ->
3272 * echo_device_free() -> echo_client_cleanup() ->
3273 * obd_disconnect() -> osc_disconnect() ->
3274 * client_disconnect_export()
3276 obd_zombie_barrier();
3277 if (cli->cl_writeback_work) {
3278 ptlrpcd_destroy_work(cli->cl_writeback_work);
3279 cli->cl_writeback_work = NULL;
3282 if (cli->cl_lru_work) {
3283 ptlrpcd_destroy_work(cli->cl_lru_work);
3284 cli->cl_lru_work = NULL;
3287 obd_cleanup_client_import(obd);
3290 EXPORT_SYMBOL(osc_precleanup_common);
3292 static int osc_precleanup(struct obd_device *obd)
3296 osc_precleanup_common(obd);
3298 ptlrpc_lprocfs_unregister_obd(obd);
3302 int osc_cleanup_common(struct obd_device *obd)
3304 struct client_obd *cli = &obd->u.cli;
3309 spin_lock(&osc_shrink_lock);
3310 list_del(&cli->cl_shrink_list);
3311 spin_unlock(&osc_shrink_lock);
3314 if (cli->cl_cache != NULL) {
3315 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3316 spin_lock(&cli->cl_cache->ccc_lru_lock);
3317 list_del_init(&cli->cl_lru_osc);
3318 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3319 cli->cl_lru_left = NULL;
3320 cl_cache_decref(cli->cl_cache);
3321 cli->cl_cache = NULL;
3324 /* free memory of osc quota cache */
3325 osc_quota_cleanup(obd);
3327 rc = client_obd_cleanup(obd);
3332 EXPORT_SYMBOL(osc_cleanup_common);
3334 static struct obd_ops osc_obd_ops = {
3335 .o_owner = THIS_MODULE,
3336 .o_setup = osc_setup,
3337 .o_precleanup = osc_precleanup,
3338 .o_cleanup = osc_cleanup_common,
3339 .o_add_conn = client_import_add_conn,
3340 .o_del_conn = client_import_del_conn,
3341 .o_connect = client_connect_import,
3342 .o_reconnect = osc_reconnect,
3343 .o_disconnect = osc_disconnect,
3344 .o_statfs = osc_statfs,
3345 .o_statfs_async = osc_statfs_async,
3346 .o_create = osc_create,
3347 .o_destroy = osc_destroy,
3348 .o_getattr = osc_getattr,
3349 .o_setattr = osc_setattr,
3350 .o_iocontrol = osc_iocontrol,
3351 .o_set_info_async = osc_set_info_async,
3352 .o_import_event = osc_import_event,
3353 .o_quotactl = osc_quotactl,
3356 static struct shrinker *osc_cache_shrinker;
3357 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3358 DEFINE_SPINLOCK(osc_shrink_lock);
3360 #ifndef HAVE_SHRINKER_COUNT
3361 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3363 struct shrink_control scv = {
3364 .nr_to_scan = shrink_param(sc, nr_to_scan),
3365 .gfp_mask = shrink_param(sc, gfp_mask)
3367 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3368 struct shrinker *shrinker = NULL;
3371 (void)osc_cache_shrink_scan(shrinker, &scv);
3373 return osc_cache_shrink_count(shrinker, &scv);
3377 static int __init osc_init(void)
3379 bool enable_proc = true;
3380 struct obd_type *type;
3381 unsigned int reqpool_size;
3382 unsigned int reqsize;
3384 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3385 osc_cache_shrink_count, osc_cache_shrink_scan);
3388 /* print an address of _any_ initialized kernel symbol from this
3389 * module, to allow debugging with gdb that doesn't support data
3390 * symbols from modules.*/
3391 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3393 rc = lu_kmem_init(osc_caches);
3397 type = class_search_type(LUSTRE_OSP_NAME);
3398 if (type != NULL && type->typ_procsym != NULL)
3399 enable_proc = false;
3401 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3402 LUSTRE_OSC_NAME, &osc_device_type);
3406 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3408 /* This is obviously too much memory, only prevent overflow here */
3409 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3410 GOTO(out_type, rc = -EINVAL);
3412 reqpool_size = osc_reqpool_mem_max << 20;
3415 while (reqsize < OST_IO_MAXREQSIZE)
3416 reqsize = reqsize << 1;
3419 * We don't enlarge the request count in OSC pool according to
3420 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3421 * tried after normal allocation failed. So a small OSC pool won't
3422 * cause much performance degression in most of cases.
3424 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3426 atomic_set(&osc_pool_req_count, 0);
3427 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3428 ptlrpc_add_rqs_to_pool);
3430 if (osc_rq_pool == NULL)
3431 GOTO(out_type, rc = -ENOMEM);
3433 rc = osc_start_grant_work();
3435 GOTO(out_req_pool, rc);
3440 ptlrpc_free_rq_pool(osc_rq_pool);
3442 class_unregister_type(LUSTRE_OSC_NAME);
3444 lu_kmem_fini(osc_caches);
3449 static void __exit osc_exit(void)
3451 osc_stop_grant_work();
3452 remove_shrinker(osc_cache_shrinker);
3453 class_unregister_type(LUSTRE_OSC_NAME);
3454 lu_kmem_fini(osc_caches);
3455 ptlrpc_free_rq_pool(osc_rq_pool);
3458 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3459 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3460 MODULE_VERSION(LUSTRE_VERSION_STRING);
3461 MODULE_LICENSE("GPL");
3463 module_init(osc_init);
3464 module_exit(osc_exit);