4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include <lprocfs_status.h>
36 #include <lustre_debug.h>
37 #include <lustre_dlm.h>
38 #include <lustre_fid.h>
39 #include <lustre_ha.h>
40 #include <uapi/linux/lustre/lustre_ioctl.h>
41 #include <lustre_net.h>
42 #include <lustre_obdo.h>
43 #include <uapi/linux/lustre/lustre_param.h>
45 #include <obd_cksum.h>
46 #include <obd_class.h>
47 #include <lustre_osc.h>
49 #include "osc_internal.h"
51 atomic_t osc_pool_req_count;
52 unsigned int osc_reqpool_maxreqcount;
53 struct ptlrpc_request_pool *osc_rq_pool;
55 /* max memory used for request pool, unit is MB */
56 static unsigned int osc_reqpool_mem_max = 5;
57 module_param(osc_reqpool_mem_max, uint, 0444);
59 static int osc_idle_timeout = 20;
60 module_param(osc_idle_timeout, uint, 0644);
62 #define osc_grant_args osc_brw_async_args
64 struct osc_setattr_args {
66 obd_enqueue_update_f sa_upcall;
70 struct osc_fsync_args {
71 struct osc_object *fa_obj;
73 obd_enqueue_update_f fa_upcall;
77 struct osc_ladvise_args {
79 obd_enqueue_update_f la_upcall;
83 static void osc_release_ppga(struct brw_page **ppga, size_t count);
84 static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req,
87 void osc_pack_req_body(struct ptlrpc_request *req, struct obdo *oa)
89 struct ost_body *body;
91 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
94 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
97 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
100 struct ptlrpc_request *req;
101 struct ost_body *body;
105 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
109 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
111 ptlrpc_request_free(req);
115 osc_pack_req_body(req, oa);
117 ptlrpc_request_set_replen(req);
119 rc = ptlrpc_queue_wait(req);
123 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
125 GOTO(out, rc = -EPROTO);
127 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
128 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
130 oa->o_blksize = cli_brw_size(exp->exp_obd);
131 oa->o_valid |= OBD_MD_FLBLKSZ;
135 ptlrpc_req_finished(req);
140 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
143 struct ptlrpc_request *req;
144 struct ost_body *body;
148 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
150 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
154 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
156 ptlrpc_request_free(req);
160 osc_pack_req_body(req, oa);
162 ptlrpc_request_set_replen(req);
164 rc = ptlrpc_queue_wait(req);
168 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
170 GOTO(out, rc = -EPROTO);
172 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
176 ptlrpc_req_finished(req);
181 static int osc_setattr_interpret(const struct lu_env *env,
182 struct ptlrpc_request *req,
183 struct osc_setattr_args *sa, int rc)
185 struct ost_body *body;
191 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
193 GOTO(out, rc = -EPROTO);
195 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
198 rc = sa->sa_upcall(sa->sa_cookie, rc);
202 int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
203 obd_enqueue_update_f upcall, void *cookie,
204 struct ptlrpc_request_set *rqset)
206 struct ptlrpc_request *req;
207 struct osc_setattr_args *sa;
212 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
216 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
218 ptlrpc_request_free(req);
222 osc_pack_req_body(req, oa);
224 ptlrpc_request_set_replen(req);
226 /* do mds to ost setattr asynchronously */
228 /* Do not wait for response. */
229 ptlrpcd_add_req(req);
231 req->rq_interpret_reply =
232 (ptlrpc_interpterer_t)osc_setattr_interpret;
234 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
235 sa = ptlrpc_req_async_args(req);
237 sa->sa_upcall = upcall;
238 sa->sa_cookie = cookie;
240 if (rqset == PTLRPCD_SET)
241 ptlrpcd_add_req(req);
243 ptlrpc_set_add_req(rqset, req);
249 static int osc_ladvise_interpret(const struct lu_env *env,
250 struct ptlrpc_request *req,
253 struct osc_ladvise_args *la = arg;
254 struct ost_body *body;
260 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
262 GOTO(out, rc = -EPROTO);
264 *la->la_oa = body->oa;
266 rc = la->la_upcall(la->la_cookie, rc);
271 * If rqset is NULL, do not wait for response. Upcall and cookie could also
272 * be NULL in this case
274 int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
275 struct ladvise_hdr *ladvise_hdr,
276 obd_enqueue_update_f upcall, void *cookie,
277 struct ptlrpc_request_set *rqset)
279 struct ptlrpc_request *req;
280 struct ost_body *body;
281 struct osc_ladvise_args *la;
283 struct lu_ladvise *req_ladvise;
284 struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
285 int num_advise = ladvise_hdr->lah_count;
286 struct ladvise_hdr *req_ladvise_hdr;
289 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
293 req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
294 num_advise * sizeof(*ladvise));
295 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
297 ptlrpc_request_free(req);
300 req->rq_request_portal = OST_IO_PORTAL;
301 ptlrpc_at_set_req_timeout(req);
303 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
305 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
308 req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
309 &RMF_OST_LADVISE_HDR);
310 memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
312 req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
313 memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
314 ptlrpc_request_set_replen(req);
317 /* Do not wait for response. */
318 ptlrpcd_add_req(req);
322 req->rq_interpret_reply = osc_ladvise_interpret;
323 CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
324 la = ptlrpc_req_async_args(req);
326 la->la_upcall = upcall;
327 la->la_cookie = cookie;
329 if (rqset == PTLRPCD_SET)
330 ptlrpcd_add_req(req);
332 ptlrpc_set_add_req(rqset, req);
337 static int osc_create(const struct lu_env *env, struct obd_export *exp,
340 struct ptlrpc_request *req;
341 struct ost_body *body;
346 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
347 LASSERT(fid_seq_is_echo(ostid_seq(&oa->o_oi)));
349 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
351 GOTO(out, rc = -ENOMEM);
353 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
355 ptlrpc_request_free(req);
359 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
362 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
364 ptlrpc_request_set_replen(req);
366 rc = ptlrpc_queue_wait(req);
370 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
372 GOTO(out_req, rc = -EPROTO);
374 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
375 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
377 oa->o_blksize = cli_brw_size(exp->exp_obd);
378 oa->o_valid |= OBD_MD_FLBLKSZ;
380 CDEBUG(D_HA, "transno: %lld\n",
381 lustre_msg_get_transno(req->rq_repmsg));
383 ptlrpc_req_finished(req);
388 int osc_punch_send(struct obd_export *exp, struct obdo *oa,
389 obd_enqueue_update_f upcall, void *cookie)
391 struct ptlrpc_request *req;
392 struct osc_setattr_args *sa;
393 struct obd_import *imp = class_exp2cliimp(exp);
394 struct ost_body *body;
399 req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
403 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
405 ptlrpc_request_free(req);
409 osc_set_io_portal(req);
411 ptlrpc_at_set_req_timeout(req);
413 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
415 lustre_set_wire_obdo(&imp->imp_connect_data, &body->oa, oa);
417 ptlrpc_request_set_replen(req);
419 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
420 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
421 sa = ptlrpc_req_async_args(req);
423 sa->sa_upcall = upcall;
424 sa->sa_cookie = cookie;
426 ptlrpcd_add_req(req);
430 EXPORT_SYMBOL(osc_punch_send);
432 static int osc_sync_interpret(const struct lu_env *env,
433 struct ptlrpc_request *req,
436 struct osc_fsync_args *fa = arg;
437 struct ost_body *body;
438 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
439 unsigned long valid = 0;
440 struct cl_object *obj;
446 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
448 CERROR("can't unpack ost_body\n");
449 GOTO(out, rc = -EPROTO);
452 *fa->fa_oa = body->oa;
453 obj = osc2cl(fa->fa_obj);
455 /* Update osc object's blocks attribute */
456 cl_object_attr_lock(obj);
457 if (body->oa.o_valid & OBD_MD_FLBLOCKS) {
458 attr->cat_blocks = body->oa.o_blocks;
463 cl_object_attr_update(env, obj, attr, valid);
464 cl_object_attr_unlock(obj);
467 rc = fa->fa_upcall(fa->fa_cookie, rc);
471 int osc_sync_base(struct osc_object *obj, struct obdo *oa,
472 obd_enqueue_update_f upcall, void *cookie,
473 struct ptlrpc_request_set *rqset)
475 struct obd_export *exp = osc_export(obj);
476 struct ptlrpc_request *req;
477 struct ost_body *body;
478 struct osc_fsync_args *fa;
482 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
486 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
488 ptlrpc_request_free(req);
492 /* overload the size and blocks fields in the oa with start/end */
493 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
495 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
497 ptlrpc_request_set_replen(req);
498 req->rq_interpret_reply = osc_sync_interpret;
500 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
501 fa = ptlrpc_req_async_args(req);
504 fa->fa_upcall = upcall;
505 fa->fa_cookie = cookie;
507 if (rqset == PTLRPCD_SET)
508 ptlrpcd_add_req(req);
510 ptlrpc_set_add_req(rqset, req);
515 /* Find and cancel locally locks matched by @mode in the resource found by
516 * @objid. Found locks are added into @cancel list. Returns the amount of
517 * locks added to @cancels list. */
518 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
519 struct list_head *cancels,
520 enum ldlm_mode mode, __u64 lock_flags)
522 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
523 struct ldlm_res_id res_id;
524 struct ldlm_resource *res;
528 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
529 * export) but disabled through procfs (flag in NS).
531 * This distinguishes from a case when ELC is not supported originally,
532 * when we still want to cancel locks in advance and just cancel them
533 * locally, without sending any RPC. */
534 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
537 ostid_build_res_name(&oa->o_oi, &res_id);
538 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
542 LDLM_RESOURCE_ADDREF(res);
543 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
544 lock_flags, 0, NULL);
545 LDLM_RESOURCE_DELREF(res);
546 ldlm_resource_putref(res);
550 static int osc_destroy_interpret(const struct lu_env *env,
551 struct ptlrpc_request *req, void *data,
554 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
556 atomic_dec(&cli->cl_destroy_in_flight);
557 wake_up(&cli->cl_destroy_waitq);
561 static int osc_can_send_destroy(struct client_obd *cli)
563 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
564 cli->cl_max_rpcs_in_flight) {
565 /* The destroy request can be sent */
568 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
569 cli->cl_max_rpcs_in_flight) {
571 * The counter has been modified between the two atomic
574 wake_up(&cli->cl_destroy_waitq);
579 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
582 struct client_obd *cli = &exp->exp_obd->u.cli;
583 struct ptlrpc_request *req;
584 struct ost_body *body;
585 struct list_head cancels = LIST_HEAD_INIT(cancels);
590 CDEBUG(D_INFO, "oa NULL\n");
594 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
595 LDLM_FL_DISCARD_DATA);
597 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
599 ldlm_lock_list_put(&cancels, l_bl_ast, count);
603 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
606 ptlrpc_request_free(req);
610 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
611 ptlrpc_at_set_req_timeout(req);
613 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
615 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
617 ptlrpc_request_set_replen(req);
619 req->rq_interpret_reply = osc_destroy_interpret;
620 if (!osc_can_send_destroy(cli)) {
621 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
624 * Wait until the number of on-going destroy RPCs drops
625 * under max_rpc_in_flight
627 rc = l_wait_event_exclusive(cli->cl_destroy_waitq,
628 osc_can_send_destroy(cli), &lwi);
630 ptlrpc_req_finished(req);
635 /* Do not wait for response */
636 ptlrpcd_add_req(req);
640 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
643 u64 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
645 LASSERT(!(oa->o_valid & bits));
648 spin_lock(&cli->cl_loi_list_lock);
649 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
650 oa->o_dirty = cli->cl_dirty_grant;
652 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
653 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
654 cli->cl_dirty_max_pages)) {
655 CERROR("dirty %lu - %lu > dirty_max %lu\n",
656 cli->cl_dirty_pages, cli->cl_dirty_transit,
657 cli->cl_dirty_max_pages);
659 } else if (unlikely(atomic_long_read(&obd_dirty_pages) -
660 atomic_long_read(&obd_dirty_transit_pages) >
661 (long)(obd_max_dirty_pages + 1))) {
662 /* The atomic_read() allowing the atomic_inc() are
663 * not covered by a lock thus they may safely race and trip
664 * this CERROR() unless we add in a small fudge factor (+1). */
665 CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
666 cli_name(cli), atomic_long_read(&obd_dirty_pages),
667 atomic_long_read(&obd_dirty_transit_pages),
668 obd_max_dirty_pages);
670 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
672 CERROR("dirty %lu - dirty_max %lu too big???\n",
673 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
676 unsigned long nrpages;
677 unsigned long undirty;
679 nrpages = cli->cl_max_pages_per_rpc;
680 nrpages *= cli->cl_max_rpcs_in_flight + 1;
681 nrpages = max(nrpages, cli->cl_dirty_max_pages);
682 undirty = nrpages << PAGE_SHIFT;
683 if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
687 /* take extent tax into account when asking for more
689 nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
690 cli->cl_max_extent_pages;
691 undirty += nrextents * cli->cl_grant_extent_tax;
693 /* Do not ask for more than OBD_MAX_GRANT - a margin for server
694 * to add extent tax, etc.
696 oa->o_undirty = min(undirty, OBD_MAX_GRANT -
697 (PTLRPC_MAX_BRW_PAGES << PAGE_SHIFT)*4UL);
699 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
700 oa->o_dropped = cli->cl_lost_grant;
701 cli->cl_lost_grant = 0;
702 spin_unlock(&cli->cl_loi_list_lock);
703 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
704 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
707 void osc_update_next_shrink(struct client_obd *cli)
709 cli->cl_next_shrink_grant = ktime_get_seconds() +
710 cli->cl_grant_shrink_interval;
712 CDEBUG(D_CACHE, "next time %lld to shrink grant\n",
713 cli->cl_next_shrink_grant);
716 static void __osc_update_grant(struct client_obd *cli, u64 grant)
718 spin_lock(&cli->cl_loi_list_lock);
719 cli->cl_avail_grant += grant;
720 spin_unlock(&cli->cl_loi_list_lock);
723 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
725 if (body->oa.o_valid & OBD_MD_FLGRANT) {
726 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
727 __osc_update_grant(cli, body->oa.o_grant);
731 static int osc_shrink_grant_interpret(const struct lu_env *env,
732 struct ptlrpc_request *req,
735 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
736 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
737 struct ost_body *body;
740 __osc_update_grant(cli, oa->o_grant);
744 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
746 osc_update_grant(cli, body);
752 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
754 spin_lock(&cli->cl_loi_list_lock);
755 oa->o_grant = cli->cl_avail_grant / 4;
756 cli->cl_avail_grant -= oa->o_grant;
757 spin_unlock(&cli->cl_loi_list_lock);
758 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
759 oa->o_valid |= OBD_MD_FLFLAGS;
762 oa->o_flags |= OBD_FL_SHRINK_GRANT;
763 osc_update_next_shrink(cli);
766 /* Shrink the current grant, either from some large amount to enough for a
767 * full set of in-flight RPCs, or if we have already shrunk to that limit
768 * then to enough for a single RPC. This avoids keeping more grant than
769 * needed, and avoids shrinking the grant piecemeal. */
770 static int osc_shrink_grant(struct client_obd *cli)
772 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
773 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
775 spin_lock(&cli->cl_loi_list_lock);
776 if (cli->cl_avail_grant <= target_bytes)
777 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
778 spin_unlock(&cli->cl_loi_list_lock);
780 return osc_shrink_grant_to_target(cli, target_bytes);
783 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
786 struct ost_body *body;
789 spin_lock(&cli->cl_loi_list_lock);
790 /* Don't shrink if we are already above or below the desired limit
791 * We don't want to shrink below a single RPC, as that will negatively
792 * impact block allocation and long-term performance. */
793 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
794 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
796 if (target_bytes >= cli->cl_avail_grant) {
797 spin_unlock(&cli->cl_loi_list_lock);
800 spin_unlock(&cli->cl_loi_list_lock);
806 osc_announce_cached(cli, &body->oa, 0);
808 spin_lock(&cli->cl_loi_list_lock);
809 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
810 cli->cl_avail_grant = target_bytes;
811 spin_unlock(&cli->cl_loi_list_lock);
812 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
813 body->oa.o_valid |= OBD_MD_FLFLAGS;
814 body->oa.o_flags = 0;
816 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
817 osc_update_next_shrink(cli);
819 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
820 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
821 sizeof(*body), body, NULL);
823 __osc_update_grant(cli, body->oa.o_grant);
828 static int osc_should_shrink_grant(struct client_obd *client)
830 time64_t next_shrink = client->cl_next_shrink_grant;
832 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
833 OBD_CONNECT_GRANT_SHRINK) == 0)
836 if (ktime_get_seconds() >= next_shrink - 5) {
837 /* Get the current RPC size directly, instead of going via:
838 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
839 * Keep comment here so that it can be found by searching. */
840 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
842 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
843 client->cl_avail_grant > brw_size)
846 osc_update_next_shrink(client);
851 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
853 struct client_obd *client;
855 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
856 if (osc_should_shrink_grant(client))
857 osc_shrink_grant(client);
862 static int osc_add_shrink_grant(struct client_obd *client)
866 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
868 osc_grant_shrink_grant_cb, NULL,
869 &client->cl_grant_shrink_list);
871 CERROR("add grant client %s error %d\n", cli_name(client), rc);
874 CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
875 osc_update_next_shrink(client);
879 static int osc_del_shrink_grant(struct client_obd *client)
881 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
885 void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
888 * ocd_grant is the total grant amount we're expect to hold: if we've
889 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
890 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
893 * race is tolerable here: if we're evicted, but imp_state already
894 * left EVICTED state, then cl_dirty_pages must be 0 already.
896 spin_lock(&cli->cl_loi_list_lock);
897 cli->cl_avail_grant = ocd->ocd_grant;
898 if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
899 cli->cl_avail_grant -= cli->cl_reserved_grant;
900 if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
901 cli->cl_avail_grant -= cli->cl_dirty_grant;
903 cli->cl_avail_grant -=
904 cli->cl_dirty_pages << PAGE_SHIFT;
907 if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
911 /* overhead for each extent insertion */
912 cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
913 /* determine the appropriate chunk size used by osc_extent. */
914 cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
915 ocd->ocd_grant_blkbits);
916 /* max_pages_per_rpc must be chunk aligned */
917 chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
918 cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
919 ~chunk_mask) & chunk_mask;
920 /* determine maximum extent size, in #pages */
921 size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
922 cli->cl_max_extent_pages = size >> PAGE_SHIFT;
923 if (cli->cl_max_extent_pages == 0)
924 cli->cl_max_extent_pages = 1;
926 cli->cl_grant_extent_tax = 0;
927 cli->cl_chunkbits = PAGE_SHIFT;
928 cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
930 spin_unlock(&cli->cl_loi_list_lock);
932 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
933 "chunk bits: %d cl_max_extent_pages: %d\n",
935 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
936 cli->cl_max_extent_pages);
938 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
939 list_empty(&cli->cl_grant_shrink_list))
940 osc_add_shrink_grant(cli);
942 EXPORT_SYMBOL(osc_init_grant);
944 /* We assume that the reason this OSC got a short read is because it read
945 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
946 * via the LOV, and it _knows_ it's reading inside the file, it's just that
947 * this stripe never got written at or beyond this stripe offset yet. */
948 static void handle_short_read(int nob_read, size_t page_count,
949 struct brw_page **pga)
954 /* skip bytes read OK */
955 while (nob_read > 0) {
956 LASSERT (page_count > 0);
958 if (pga[i]->count > nob_read) {
959 /* EOF inside this page */
960 ptr = kmap(pga[i]->pg) +
961 (pga[i]->off & ~PAGE_MASK);
962 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
969 nob_read -= pga[i]->count;
974 /* zero remaining pages */
975 while (page_count-- > 0) {
976 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
977 memset(ptr, 0, pga[i]->count);
983 static int check_write_rcs(struct ptlrpc_request *req,
984 int requested_nob, int niocount,
985 size_t page_count, struct brw_page **pga)
990 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
991 sizeof(*remote_rcs) *
993 if (remote_rcs == NULL) {
994 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
998 /* return error if any niobuf was in error */
999 for (i = 0; i < niocount; i++) {
1000 if ((int)remote_rcs[i] < 0)
1001 return(remote_rcs[i]);
1003 if (remote_rcs[i] != 0) {
1004 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1005 i, remote_rcs[i], req);
1009 if (req->rq_bulk != NULL &&
1010 req->rq_bulk->bd_nob_transferred != requested_nob) {
1011 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1012 req->rq_bulk->bd_nob_transferred, requested_nob);
1019 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1021 if (p1->flag != p2->flag) {
1022 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
1023 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1024 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
1026 /* warn if we try to combine flags that we don't know to be
1027 * safe to combine */
1028 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1029 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1030 "report this at https://jira.hpdd.intel.com/\n",
1031 p1->flag, p2->flag);
1036 return (p1->off + p1->count == p2->off);
1039 static int osc_checksum_bulk_t10pi(const char *obd_name, int nob,
1040 size_t pg_count, struct brw_page **pga,
1041 int opc, obd_dif_csum_fn *fn,
1045 struct cfs_crypto_hash_desc *hdesc;
1046 /* Used Adler as the default checksum type on top of DIF tags */
1047 unsigned char cfs_alg = cksum_obd2cfs(OBD_CKSUM_T10_TOP);
1048 struct page *__page;
1049 unsigned char *buffer;
1051 unsigned int bufsize;
1053 int used_number = 0;
1059 LASSERT(pg_count > 0);
1061 __page = alloc_page(GFP_KERNEL);
1065 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1066 if (IS_ERR(hdesc)) {
1067 rc = PTR_ERR(hdesc);
1068 CERROR("%s: unable to initialize checksum hash %s: rc = %d\n",
1069 obd_name, cfs_crypto_hash_name(cfs_alg), rc);
1073 buffer = kmap(__page);
1074 guard_start = (__u16 *)buffer;
1075 guard_number = PAGE_SIZE / sizeof(*guard_start);
1076 while (nob > 0 && pg_count > 0) {
1077 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1079 /* corrupt the data before we compute the checksum, to
1080 * simulate an OST->client data error */
1081 if (unlikely(i == 0 && opc == OST_READ &&
1082 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))) {
1083 unsigned char *ptr = kmap(pga[i]->pg);
1084 int off = pga[i]->off & ~PAGE_MASK;
1086 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1091 * The left guard number should be able to hold checksums of a
1094 rc = obd_page_dif_generate_buffer(obd_name, pga[i]->pg, 0,
1096 guard_start + used_number,
1097 guard_number - used_number,
1103 used_number += used;
1104 if (used_number == guard_number) {
1105 cfs_crypto_hash_update_page(hdesc, __page, 0,
1106 used_number * sizeof(*guard_start));
1110 nob -= pga[i]->count;
1118 if (used_number != 0)
1119 cfs_crypto_hash_update_page(hdesc, __page, 0,
1120 used_number * sizeof(*guard_start));
1122 bufsize = sizeof(cksum);
1123 cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1125 /* For sending we only compute the wrong checksum instead
1126 * of corrupting the data so it is still correct on a redo */
1127 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1132 __free_page(__page);
1136 static int osc_checksum_bulk(int nob, size_t pg_count,
1137 struct brw_page **pga, int opc,
1138 enum cksum_types cksum_type,
1142 struct cfs_crypto_hash_desc *hdesc;
1143 unsigned int bufsize;
1144 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1146 LASSERT(pg_count > 0);
1148 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1149 if (IS_ERR(hdesc)) {
1150 CERROR("Unable to initialize checksum hash %s\n",
1151 cfs_crypto_hash_name(cfs_alg));
1152 return PTR_ERR(hdesc);
1155 while (nob > 0 && pg_count > 0) {
1156 unsigned int count = pga[i]->count > nob ? nob : pga[i]->count;
1158 /* corrupt the data before we compute the checksum, to
1159 * simulate an OST->client data error */
1160 if (i == 0 && opc == OST_READ &&
1161 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1162 unsigned char *ptr = kmap(pga[i]->pg);
1163 int off = pga[i]->off & ~PAGE_MASK;
1165 memcpy(ptr + off, "bad1", min_t(typeof(nob), 4, nob));
1168 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1169 pga[i]->off & ~PAGE_MASK,
1171 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1172 (int)(pga[i]->off & ~PAGE_MASK));
1174 nob -= pga[i]->count;
1179 bufsize = sizeof(*cksum);
1180 cfs_crypto_hash_final(hdesc, (unsigned char *)cksum, &bufsize);
1182 /* For sending we only compute the wrong checksum instead
1183 * of corrupting the data so it is still correct on a redo */
1184 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1190 static int osc_checksum_bulk_rw(const char *obd_name,
1191 enum cksum_types cksum_type,
1192 int nob, size_t pg_count,
1193 struct brw_page **pga, int opc,
1196 obd_dif_csum_fn *fn = NULL;
1197 int sector_size = 0;
1201 obd_t10_cksum2dif(cksum_type, &fn, §or_size);
1204 rc = osc_checksum_bulk_t10pi(obd_name, nob, pg_count, pga,
1205 opc, fn, sector_size, check_sum);
1207 rc = osc_checksum_bulk(nob, pg_count, pga, opc, cksum_type,
1214 osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa,
1215 u32 page_count, struct brw_page **pga,
1216 struct ptlrpc_request **reqp, int resend)
1218 struct ptlrpc_request *req;
1219 struct ptlrpc_bulk_desc *desc;
1220 struct ost_body *body;
1221 struct obd_ioobj *ioobj;
1222 struct niobuf_remote *niobuf;
1223 int niocount, i, requested_nob, opc, rc, short_io_size = 0;
1224 struct osc_brw_async_args *aa;
1225 struct req_capsule *pill;
1226 struct brw_page *pg_prev;
1228 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1231 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1232 RETURN(-ENOMEM); /* Recoverable */
1233 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1234 RETURN(-EINVAL); /* Fatal */
1236 if ((cmd & OBD_BRW_WRITE) != 0) {
1238 req = ptlrpc_request_alloc_pool(cli->cl_import,
1240 &RQF_OST_BRW_WRITE);
1243 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1248 for (niocount = i = 1; i < page_count; i++) {
1249 if (!can_merge_pages(pga[i - 1], pga[i]))
1253 pill = &req->rq_pill;
1254 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1256 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1257 niocount * sizeof(*niobuf));
1259 for (i = 0; i < page_count; i++)
1260 short_io_size += pga[i]->count;
1262 /* Check if we can do a short io. */
1263 if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 &&
1264 imp_connect_shortio(cli->cl_import)))
1267 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT,
1268 opc == OST_READ ? 0 : short_io_size);
1269 if (opc == OST_READ)
1270 req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER,
1273 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1275 ptlrpc_request_free(req);
1278 osc_set_io_portal(req);
1280 ptlrpc_at_set_req_timeout(req);
1281 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1283 req->rq_no_retry_einprogress = 1;
1285 if (short_io_size != 0) {
1287 short_io_buf = NULL;
1291 desc = ptlrpc_prep_bulk_imp(req, page_count,
1292 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1293 (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
1294 PTLRPC_BULK_PUT_SINK) |
1295 PTLRPC_BULK_BUF_KIOV,
1297 &ptlrpc_bulk_kiov_pin_ops);
1300 GOTO(out, rc = -ENOMEM);
1301 /* NB request now owns desc and will free it when it gets freed */
1303 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1304 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1305 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1306 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1308 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1310 /* For READ and WRITE, we can't fill o_uid and o_gid using from_kuid()
1311 * and from_kgid(), because they are asynchronous. Fortunately, variable
1312 * oa contains valid o_uid and o_gid in these two operations.
1313 * Besides, filling o_uid and o_gid is enough for nrs-tbf, see LU-9658.
1314 * OBD_MD_FLUID and OBD_MD_FLUID is not set in order to avoid breaking
1315 * other process logic */
1316 body->oa.o_uid = oa->o_uid;
1317 body->oa.o_gid = oa->o_gid;
1319 obdo_to_ioobj(oa, ioobj);
1320 ioobj->ioo_bufcnt = niocount;
1321 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1322 * that might be send for this request. The actual number is decided
1323 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1324 * "max - 1" for old client compatibility sending "0", and also so the
1325 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1327 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1329 ioobj_max_brw_set(ioobj, 0);
1331 if (short_io_size != 0) {
1332 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1333 body->oa.o_valid |= OBD_MD_FLFLAGS;
1334 body->oa.o_flags = 0;
1336 body->oa.o_flags |= OBD_FL_SHORT_IO;
1337 CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n",
1339 if (opc == OST_WRITE) {
1340 short_io_buf = req_capsule_client_get(pill,
1342 LASSERT(short_io_buf != NULL);
1346 LASSERT(page_count > 0);
1348 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1349 struct brw_page *pg = pga[i];
1350 int poff = pg->off & ~PAGE_MASK;
1352 LASSERT(pg->count > 0);
1353 /* make sure there is no gap in the middle of page array */
1354 LASSERTF(page_count == 1 ||
1355 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
1356 ergo(i > 0 && i < page_count - 1,
1357 poff == 0 && pg->count == PAGE_SIZE) &&
1358 ergo(i == page_count - 1, poff == 0)),
1359 "i: %d/%d pg: %p off: %llu, count: %u\n",
1360 i, page_count, pg, pg->off, pg->count);
1361 LASSERTF(i == 0 || pg->off > pg_prev->off,
1362 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1363 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1365 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1366 pg_prev->pg, page_private(pg_prev->pg),
1367 pg_prev->pg->index, pg_prev->off);
1368 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1369 (pg->flag & OBD_BRW_SRVLOCK));
1370 if (short_io_size != 0 && opc == OST_WRITE) {
1371 unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0);
1373 LASSERT(short_io_size >= requested_nob + pg->count);
1374 memcpy(short_io_buf + requested_nob,
1377 ll_kunmap_atomic(ptr, KM_USER0);
1378 } else if (short_io_size == 0) {
1379 desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff,
1382 requested_nob += pg->count;
1384 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1386 niobuf->rnb_len += pg->count;
1388 niobuf->rnb_offset = pg->off;
1389 niobuf->rnb_len = pg->count;
1390 niobuf->rnb_flags = pg->flag;
1395 LASSERTF((void *)(niobuf - niocount) ==
1396 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1397 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1398 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1400 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1402 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1403 body->oa.o_valid |= OBD_MD_FLFLAGS;
1404 body->oa.o_flags = 0;
1406 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1409 if (osc_should_shrink_grant(cli))
1410 osc_shrink_grant_local(cli, &body->oa);
1412 /* size[REQ_REC_OFF] still sizeof (*body) */
1413 if (opc == OST_WRITE) {
1414 if (cli->cl_checksum &&
1415 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1416 /* store cl_cksum_type in a local variable since
1417 * it can be changed via lprocfs */
1418 enum cksum_types cksum_type = cli->cl_cksum_type;
1420 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1421 body->oa.o_flags = 0;
1423 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1425 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1427 rc = osc_checksum_bulk_rw(obd_name, cksum_type,
1428 requested_nob, page_count,
1432 CDEBUG(D_PAGE, "failed to checksum, rc = %d\n",
1436 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1439 /* save this in 'oa', too, for later checking */
1440 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1441 oa->o_flags |= obd_cksum_type_pack(obd_name,
1444 /* clear out the checksum flag, in case this is a
1445 * resend but cl_checksum is no longer set. b=11238 */
1446 oa->o_valid &= ~OBD_MD_FLCKSUM;
1448 oa->o_cksum = body->oa.o_cksum;
1449 /* 1 RC per niobuf */
1450 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1451 sizeof(__u32) * niocount);
1453 if (cli->cl_checksum &&
1454 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1455 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1456 body->oa.o_flags = 0;
1457 body->oa.o_flags |= obd_cksum_type_pack(obd_name,
1458 cli->cl_cksum_type);
1459 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1462 /* Client cksum has been already copied to wire obdo in previous
1463 * lustre_set_wire_obdo(), and in the case a bulk-read is being
1464 * resent due to cksum error, this will allow Server to
1465 * check+dump pages on its side */
1467 ptlrpc_request_set_replen(req);
1469 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1470 aa = ptlrpc_req_async_args(req);
1472 aa->aa_requested_nob = requested_nob;
1473 aa->aa_nio_count = niocount;
1474 aa->aa_page_count = page_count;
1478 INIT_LIST_HEAD(&aa->aa_oaps);
1481 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1482 CDEBUG(D_RPCTRACE, "brw rpc %p - object "DOSTID" offset %lld<>%lld\n",
1483 req, POSTID(&oa->o_oi), niobuf[0].rnb_offset,
1484 niobuf[niocount - 1].rnb_offset + niobuf[niocount - 1].rnb_len);
1488 ptlrpc_req_finished(req);
1492 char dbgcksum_file_name[PATH_MAX];
1494 static void dump_all_bulk_pages(struct obdo *oa, __u32 page_count,
1495 struct brw_page **pga, __u32 server_cksum,
1503 /* will only keep dump of pages on first error for the same range in
1504 * file/fid, not during the resends/retries. */
1505 snprintf(dbgcksum_file_name, sizeof(dbgcksum_file_name),
1506 "%s-checksum_dump-osc-"DFID":[%llu-%llu]-%x-%x",
1507 (strncmp(libcfs_debug_file_path_arr, "NONE", 4) != 0 ?
1508 libcfs_debug_file_path_arr :
1509 LIBCFS_DEBUG_FILE_PATH_DEFAULT),
1510 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL,
1511 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1512 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1514 pga[page_count-1]->off + pga[page_count-1]->count - 1,
1515 client_cksum, server_cksum);
1516 filp = filp_open(dbgcksum_file_name,
1517 O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600);
1521 CDEBUG(D_INFO, "%s: can't open to dump pages with "
1522 "checksum error: rc = %d\n", dbgcksum_file_name,
1525 CERROR("%s: can't open to dump pages with checksum "
1526 "error: rc = %d\n", dbgcksum_file_name, rc);
1530 for (i = 0; i < page_count; i++) {
1531 len = pga[i]->count;
1532 buf = kmap(pga[i]->pg);
1534 rc = cfs_kernel_write(filp, buf, len, &filp->f_pos);
1536 CERROR("%s: wanted to write %u but got %d "
1537 "error\n", dbgcksum_file_name, len, rc);
1542 CDEBUG(D_INFO, "%s: wrote %d bytes\n",
1543 dbgcksum_file_name, rc);
1548 rc = ll_vfs_fsync_range(filp, 0, LLONG_MAX, 1);
1550 CERROR("%s: sync returns %d\n", dbgcksum_file_name, rc);
1551 filp_close(filp, NULL);
1556 check_write_checksum(struct obdo *oa, const struct lnet_process_id *peer,
1557 __u32 client_cksum, __u32 server_cksum,
1558 struct osc_brw_async_args *aa)
1560 const char *obd_name = aa->aa_cli->cl_import->imp_obd->obd_name;
1561 enum cksum_types cksum_type;
1562 obd_dif_csum_fn *fn = NULL;
1563 int sector_size = 0;
1569 if (server_cksum == client_cksum) {
1570 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1574 if (aa->aa_cli->cl_checksum_dump)
1575 dump_all_bulk_pages(oa, aa->aa_page_count, aa->aa_ppga,
1576 server_cksum, client_cksum);
1578 cksum_type = obd_cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1581 switch (cksum_type) {
1582 case OBD_CKSUM_T10IP512:
1587 case OBD_CKSUM_T10IP4K:
1592 case OBD_CKSUM_T10CRC512:
1594 fn = obd_dif_crc_fn;
1597 case OBD_CKSUM_T10CRC4K:
1599 fn = obd_dif_crc_fn;
1607 rc = osc_checksum_bulk_t10pi(obd_name, aa->aa_requested_nob,
1615 rc = osc_checksum_bulk(aa->aa_requested_nob, aa->aa_page_count,
1616 aa->aa_ppga, OST_WRITE, cksum_type,
1620 msg = "failed to calculate the client write checksum";
1621 else if (cksum_type != obd_cksum_type_unpack(aa->aa_oa->o_flags))
1622 msg = "the server did not use the checksum type specified in "
1623 "the original request - likely a protocol problem";
1624 else if (new_cksum == server_cksum)
1625 msg = "changed on the client after we checksummed it - "
1626 "likely false positive due to mmap IO (bug 11742)";
1627 else if (new_cksum == client_cksum)
1628 msg = "changed in transit before arrival at OST";
1630 msg = "changed in transit AND doesn't match the original - "
1631 "likely false positive due to mmap IO (bug 11742)";
1633 LCONSOLE_ERROR_MSG(0x132, "%s: BAD WRITE CHECKSUM: %s: from %s inode "
1634 DFID " object "DOSTID" extent [%llu-%llu], original "
1635 "client csum %x (type %x), server csum %x (type %x),"
1636 " client csum now %x\n",
1637 obd_name, msg, libcfs_nid2str(peer->nid),
1638 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1639 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1640 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1641 POSTID(&oa->o_oi), aa->aa_ppga[0]->off,
1642 aa->aa_ppga[aa->aa_page_count - 1]->off +
1643 aa->aa_ppga[aa->aa_page_count-1]->count - 1,
1645 obd_cksum_type_unpack(aa->aa_oa->o_flags),
1646 server_cksum, cksum_type, new_cksum);
1650 /* Note rc enters this function as number of bytes transferred */
1651 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1653 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1654 struct client_obd *cli = aa->aa_cli;
1655 const char *obd_name = cli->cl_import->imp_obd->obd_name;
1656 const struct lnet_process_id *peer =
1657 &req->rq_import->imp_connection->c_peer;
1658 struct ost_body *body;
1659 u32 client_cksum = 0;
1662 if (rc < 0 && rc != -EDQUOT) {
1663 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1667 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1668 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1670 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1674 /* set/clear over quota flag for a uid/gid/projid */
1675 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1676 body->oa.o_valid & (OBD_MD_FLALLQUOTA)) {
1677 unsigned qid[LL_MAXQUOTAS] = {
1678 body->oa.o_uid, body->oa.o_gid,
1679 body->oa.o_projid };
1680 CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
1681 body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
1682 body->oa.o_valid, body->oa.o_flags);
1683 osc_quota_setdq(cli, qid, body->oa.o_valid,
1687 osc_update_grant(cli, body);
1692 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1693 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1695 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1697 CERROR("Unexpected +ve rc %d\n", rc);
1701 if (req->rq_bulk != NULL &&
1702 sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1705 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1706 check_write_checksum(&body->oa, peer, client_cksum,
1707 body->oa.o_cksum, aa))
1710 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1711 aa->aa_page_count, aa->aa_ppga);
1715 /* The rest of this function executes only for OST_READs */
1717 if (req->rq_bulk == NULL) {
1718 rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
1720 LASSERT(rc == req->rq_status);
1722 /* if unwrap_bulk failed, return -EAGAIN to retry */
1723 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1726 GOTO(out, rc = -EAGAIN);
1728 if (rc > aa->aa_requested_nob) {
1729 CERROR("Unexpected rc %d (%d requested)\n", rc,
1730 aa->aa_requested_nob);
1734 if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
1735 CERROR ("Unexpected rc %d (%d transferred)\n",
1736 rc, req->rq_bulk->bd_nob_transferred);
1740 if (req->rq_bulk == NULL) {
1742 int nob, pg_count, i = 0;
1745 CDEBUG(D_CACHE, "Using short io read, size %d\n", rc);
1746 pg_count = aa->aa_page_count;
1747 buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO,
1750 while (nob > 0 && pg_count > 0) {
1752 int count = aa->aa_ppga[i]->count > nob ?
1753 nob : aa->aa_ppga[i]->count;
1755 CDEBUG(D_CACHE, "page %p count %d\n",
1756 aa->aa_ppga[i]->pg, count);
1757 ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0);
1758 memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf,
1760 ll_kunmap_atomic((void *) ptr, KM_USER0);
1769 if (rc < aa->aa_requested_nob)
1770 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1772 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1773 static int cksum_counter;
1774 u32 server_cksum = body->oa.o_cksum;
1777 enum cksum_types cksum_type;
1778 u32 o_flags = body->oa.o_valid & OBD_MD_FLFLAGS ?
1779 body->oa.o_flags : 0;
1781 cksum_type = obd_cksum_type_unpack(o_flags);
1782 rc = osc_checksum_bulk_rw(obd_name, cksum_type, rc,
1783 aa->aa_page_count, aa->aa_ppga,
1784 OST_READ, &client_cksum);
1788 if (req->rq_bulk != NULL &&
1789 peer->nid != req->rq_bulk->bd_sender) {
1791 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1794 if (server_cksum != client_cksum) {
1795 struct ost_body *clbody;
1796 u32 page_count = aa->aa_page_count;
1798 clbody = req_capsule_client_get(&req->rq_pill,
1800 if (cli->cl_checksum_dump)
1801 dump_all_bulk_pages(&clbody->oa, page_count,
1802 aa->aa_ppga, server_cksum,
1805 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1806 "%s%s%s inode "DFID" object "DOSTID
1807 " extent [%llu-%llu], client %x, "
1808 "server %x, cksum_type %x\n",
1810 libcfs_nid2str(peer->nid),
1812 clbody->oa.o_valid & OBD_MD_FLFID ?
1813 clbody->oa.o_parent_seq : 0ULL,
1814 clbody->oa.o_valid & OBD_MD_FLFID ?
1815 clbody->oa.o_parent_oid : 0,
1816 clbody->oa.o_valid & OBD_MD_FLFID ?
1817 clbody->oa.o_parent_ver : 0,
1818 POSTID(&body->oa.o_oi),
1819 aa->aa_ppga[0]->off,
1820 aa->aa_ppga[page_count-1]->off +
1821 aa->aa_ppga[page_count-1]->count - 1,
1822 client_cksum, server_cksum,
1825 aa->aa_oa->o_cksum = client_cksum;
1829 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1832 } else if (unlikely(client_cksum)) {
1833 static int cksum_missed;
1836 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1837 CERROR("Checksum %u requested from %s but not sent\n",
1838 cksum_missed, libcfs_nid2str(peer->nid));
1844 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1845 aa->aa_oa, &body->oa);
1850 static int osc_brw_redo_request(struct ptlrpc_request *request,
1851 struct osc_brw_async_args *aa, int rc)
1853 struct ptlrpc_request *new_req;
1854 struct osc_brw_async_args *new_aa;
1855 struct osc_async_page *oap;
1858 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1859 "redo for recoverable error %d", rc);
1861 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1862 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
1863 aa->aa_cli, aa->aa_oa, aa->aa_page_count,
1864 aa->aa_ppga, &new_req, 1);
1868 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1869 if (oap->oap_request != NULL) {
1870 LASSERTF(request == oap->oap_request,
1871 "request %p != oap_request %p\n",
1872 request, oap->oap_request);
1873 if (oap->oap_interrupted) {
1874 ptlrpc_req_finished(new_req);
1879 /* New request takes over pga and oaps from old request.
1880 * Note that copying a list_head doesn't work, need to move it... */
1882 new_req->rq_interpret_reply = request->rq_interpret_reply;
1883 new_req->rq_async_args = request->rq_async_args;
1884 new_req->rq_commit_cb = request->rq_commit_cb;
1885 /* cap resend delay to the current request timeout, this is similar to
1886 * what ptlrpc does (see after_reply()) */
1887 if (aa->aa_resends > new_req->rq_timeout)
1888 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
1890 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
1891 new_req->rq_generation_set = 1;
1892 new_req->rq_import_generation = request->rq_import_generation;
1894 new_aa = ptlrpc_req_async_args(new_req);
1896 INIT_LIST_HEAD(&new_aa->aa_oaps);
1897 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1898 INIT_LIST_HEAD(&new_aa->aa_exts);
1899 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1900 new_aa->aa_resends = aa->aa_resends;
1902 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1903 if (oap->oap_request) {
1904 ptlrpc_req_finished(oap->oap_request);
1905 oap->oap_request = ptlrpc_request_addref(new_req);
1909 /* XXX: This code will run into problem if we're going to support
1910 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1911 * and wait for all of them to be finished. We should inherit request
1912 * set from old request. */
1913 ptlrpcd_add_req(new_req);
1915 DEBUG_REQ(D_INFO, new_req, "new request");
1920 * ugh, we want disk allocation on the target to happen in offset order. we'll
1921 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1922 * fine for our small page arrays and doesn't require allocation. its an
1923 * insertion sort that swaps elements that are strides apart, shrinking the
1924 * stride down until its '1' and the array is sorted.
1926 static void sort_brw_pages(struct brw_page **array, int num)
1929 struct brw_page *tmp;
1933 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1938 for (i = stride ; i < num ; i++) {
1941 while (j >= stride && array[j - stride]->off > tmp->off) {
1942 array[j] = array[j - stride];
1947 } while (stride > 1);
1950 static void osc_release_ppga(struct brw_page **ppga, size_t count)
1952 LASSERT(ppga != NULL);
1953 OBD_FREE(ppga, sizeof(*ppga) * count);
1956 static int brw_interpret(const struct lu_env *env,
1957 struct ptlrpc_request *req, void *data, int rc)
1959 struct osc_brw_async_args *aa = data;
1960 struct osc_extent *ext;
1961 struct osc_extent *tmp;
1962 struct client_obd *cli = aa->aa_cli;
1963 unsigned long transferred = 0;
1966 rc = osc_brw_fini_request(req, rc);
1967 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1968 /* When server return -EINPROGRESS, client should always retry
1969 * regardless of the number of times the bulk was resent already. */
1970 if (osc_recoverable_error(rc) && !req->rq_no_delay) {
1971 if (req->rq_import_generation !=
1972 req->rq_import->imp_generation) {
1973 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1974 ""DOSTID", rc = %d.\n",
1975 req->rq_import->imp_obd->obd_name,
1976 POSTID(&aa->aa_oa->o_oi), rc);
1977 } else if (rc == -EINPROGRESS ||
1978 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1979 rc = osc_brw_redo_request(req, aa, rc);
1981 CERROR("%s: too many resent retries for object: "
1982 "%llu:%llu, rc = %d.\n",
1983 req->rq_import->imp_obd->obd_name,
1984 POSTID(&aa->aa_oa->o_oi), rc);
1989 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1994 struct obdo *oa = aa->aa_oa;
1995 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1996 unsigned long valid = 0;
1997 struct cl_object *obj;
1998 struct osc_async_page *last;
2000 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
2001 obj = osc2cl(last->oap_obj);
2003 cl_object_attr_lock(obj);
2004 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2005 attr->cat_blocks = oa->o_blocks;
2006 valid |= CAT_BLOCKS;
2008 if (oa->o_valid & OBD_MD_FLMTIME) {
2009 attr->cat_mtime = oa->o_mtime;
2012 if (oa->o_valid & OBD_MD_FLATIME) {
2013 attr->cat_atime = oa->o_atime;
2016 if (oa->o_valid & OBD_MD_FLCTIME) {
2017 attr->cat_ctime = oa->o_ctime;
2021 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
2022 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
2023 loff_t last_off = last->oap_count + last->oap_obj_off +
2026 /* Change file size if this is an out of quota or
2027 * direct IO write and it extends the file size */
2028 if (loi->loi_lvb.lvb_size < last_off) {
2029 attr->cat_size = last_off;
2032 /* Extend KMS if it's not a lockless write */
2033 if (loi->loi_kms < last_off &&
2034 oap2osc_page(last)->ops_srvlock == 0) {
2035 attr->cat_kms = last_off;
2041 cl_object_attr_update(env, obj, attr, valid);
2042 cl_object_attr_unlock(obj);
2044 OBDO_FREE(aa->aa_oa);
2046 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
2047 osc_inc_unstable_pages(req);
2049 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
2050 list_del_init(&ext->oe_link);
2051 osc_extent_finish(env, ext, 1,
2052 rc && req->rq_no_delay ? -EWOULDBLOCK : rc);
2054 LASSERT(list_empty(&aa->aa_exts));
2055 LASSERT(list_empty(&aa->aa_oaps));
2057 transferred = (req->rq_bulk == NULL ? /* short io */
2058 aa->aa_requested_nob :
2059 req->rq_bulk->bd_nob_transferred);
2061 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2062 ptlrpc_lprocfs_brw(req, transferred);
2064 spin_lock(&cli->cl_loi_list_lock);
2065 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2066 * is called so we know whether to go to sync BRWs or wait for more
2067 * RPCs to complete */
2068 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2069 cli->cl_w_in_flight--;
2071 cli->cl_r_in_flight--;
2072 osc_wake_cache_waiters(cli);
2073 spin_unlock(&cli->cl_loi_list_lock);
2075 osc_io_unplug(env, cli, NULL);
2079 static void brw_commit(struct ptlrpc_request *req)
2081 /* If osc_inc_unstable_pages (via osc_extent_finish) races with
2082 * this called via the rq_commit_cb, I need to ensure
2083 * osc_dec_unstable_pages is still called. Otherwise unstable
2084 * pages may be leaked. */
2085 spin_lock(&req->rq_lock);
2086 if (likely(req->rq_unstable)) {
2087 req->rq_unstable = 0;
2088 spin_unlock(&req->rq_lock);
2090 osc_dec_unstable_pages(req);
2092 req->rq_committed = 1;
2093 spin_unlock(&req->rq_lock);
2098 * Build an RPC by the list of extent @ext_list. The caller must ensure
2099 * that the total pages in this list are NOT over max pages per RPC.
2100 * Extents in the list must be in OES_RPC state.
2102 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2103 struct list_head *ext_list, int cmd)
2105 struct ptlrpc_request *req = NULL;
2106 struct osc_extent *ext;
2107 struct brw_page **pga = NULL;
2108 struct osc_brw_async_args *aa = NULL;
2109 struct obdo *oa = NULL;
2110 struct osc_async_page *oap;
2111 struct osc_object *obj = NULL;
2112 struct cl_req_attr *crattr = NULL;
2113 loff_t starting_offset = OBD_OBJECT_EOF;
2114 loff_t ending_offset = 0;
2118 bool soft_sync = false;
2119 bool interrupted = false;
2120 bool ndelay = false;
2124 __u32 layout_version = 0;
2125 struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
2126 struct ost_body *body;
2128 LASSERT(!list_empty(ext_list));
2130 /* add pages into rpc_list to build BRW rpc */
2131 list_for_each_entry(ext, ext_list, oe_link) {
2132 LASSERT(ext->oe_state == OES_RPC);
2133 mem_tight |= ext->oe_memalloc;
2134 grant += ext->oe_grants;
2135 page_count += ext->oe_nr_pages;
2136 layout_version = MAX(layout_version, ext->oe_layout_version);
2141 soft_sync = osc_over_unstable_soft_limit(cli);
2143 mpflag = cfs_memory_pressure_get_and_set();
2145 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2147 GOTO(out, rc = -ENOMEM);
2151 GOTO(out, rc = -ENOMEM);
2154 list_for_each_entry(ext, ext_list, oe_link) {
2155 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2157 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2159 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
2160 pga[i] = &oap->oap_brw_page;
2161 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2164 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2165 if (starting_offset == OBD_OBJECT_EOF ||
2166 starting_offset > oap->oap_obj_off)
2167 starting_offset = oap->oap_obj_off;
2169 LASSERT(oap->oap_page_off == 0);
2170 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2171 ending_offset = oap->oap_obj_off +
2174 LASSERT(oap->oap_page_off + oap->oap_count ==
2176 if (oap->oap_interrupted)
2183 /* first page in the list */
2184 oap = list_entry(rpc_list.next, typeof(*oap), oap_rpc_item);
2186 crattr = &osc_env_info(env)->oti_req_attr;
2187 memset(crattr, 0, sizeof(*crattr));
2188 crattr->cra_type = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2189 crattr->cra_flags = ~0ULL;
2190 crattr->cra_page = oap2cl_page(oap);
2191 crattr->cra_oa = oa;
2192 cl_req_attr_set(env, osc2cl(obj), crattr);
2194 if (cmd == OBD_BRW_WRITE) {
2195 oa->o_grant_used = grant;
2196 if (layout_version > 0) {
2197 CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
2198 PFID(&oa->o_oi.oi_fid), layout_version);
2200 oa->o_layout_version = layout_version;
2201 oa->o_valid |= OBD_MD_LAYOUT_VERSION;
2205 sort_brw_pages(pga, page_count);
2206 rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
2208 CERROR("prep_req failed: %d\n", rc);
2212 req->rq_commit_cb = brw_commit;
2213 req->rq_interpret_reply = brw_interpret;
2214 req->rq_memalloc = mem_tight != 0;
2215 oap->oap_request = ptlrpc_request_addref(req);
2216 if (interrupted && !req->rq_intr)
2217 ptlrpc_mark_interrupted(req);
2219 req->rq_no_resend = req->rq_no_delay = 1;
2220 /* probably set a shorter timeout value.
2221 * to handle ETIMEDOUT in brw_interpret() correctly. */
2222 /* lustre_msg_set_timeout(req, req->rq_timeout / 2); */
2225 /* Need to update the timestamps after the request is built in case
2226 * we race with setattr (locally or in queue at OST). If OST gets
2227 * later setattr before earlier BRW (as determined by the request xid),
2228 * the OST will not use BRW timestamps. Sadly, there is no obvious
2229 * way to do this in a single call. bug 10150 */
2230 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2231 crattr->cra_oa = &body->oa;
2232 crattr->cra_flags = OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME;
2233 cl_req_attr_set(env, osc2cl(obj), crattr);
2234 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2236 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2237 aa = ptlrpc_req_async_args(req);
2238 INIT_LIST_HEAD(&aa->aa_oaps);
2239 list_splice_init(&rpc_list, &aa->aa_oaps);
2240 INIT_LIST_HEAD(&aa->aa_exts);
2241 list_splice_init(ext_list, &aa->aa_exts);
2243 spin_lock(&cli->cl_loi_list_lock);
2244 starting_offset >>= PAGE_SHIFT;
2245 if (cmd == OBD_BRW_READ) {
2246 cli->cl_r_in_flight++;
2247 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2248 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2249 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2250 starting_offset + 1);
2252 cli->cl_w_in_flight++;
2253 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2254 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2255 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2256 starting_offset + 1);
2258 spin_unlock(&cli->cl_loi_list_lock);
2260 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
2261 page_count, aa, cli->cl_r_in_flight,
2262 cli->cl_w_in_flight);
2263 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
2265 ptlrpcd_add_req(req);
2271 cfs_memory_pressure_restore(mpflag);
2274 LASSERT(req == NULL);
2279 OBD_FREE(pga, sizeof(*pga) * page_count);
2280 /* this should happen rarely and is pretty bad, it makes the
2281 * pending list not follow the dirty order */
2282 while (!list_empty(ext_list)) {
2283 ext = list_entry(ext_list->next, struct osc_extent,
2285 list_del_init(&ext->oe_link);
2286 osc_extent_finish(env, ext, 0, rc);
2292 static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
2296 LASSERT(lock != NULL);
2298 lock_res_and_lock(lock);
2300 if (lock->l_ast_data == NULL)
2301 lock->l_ast_data = data;
2302 if (lock->l_ast_data == data)
2305 unlock_res_and_lock(lock);
2310 int osc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall,
2311 void *cookie, struct lustre_handle *lockh,
2312 enum ldlm_mode mode, __u64 *flags, bool speculative,
2315 bool intent = *flags & LDLM_FL_HAS_INTENT;
2319 /* The request was created before ldlm_cli_enqueue call. */
2320 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2321 struct ldlm_reply *rep;
2323 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
2324 LASSERT(rep != NULL);
2326 rep->lock_policy_res1 =
2327 ptlrpc_status_ntoh(rep->lock_policy_res1);
2328 if (rep->lock_policy_res1)
2329 errcode = rep->lock_policy_res1;
2331 *flags |= LDLM_FL_LVB_READY;
2332 } else if (errcode == ELDLM_OK) {
2333 *flags |= LDLM_FL_LVB_READY;
2336 /* Call the update callback. */
2337 rc = (*upcall)(cookie, lockh, errcode);
2339 /* release the reference taken in ldlm_cli_enqueue() */
2340 if (errcode == ELDLM_LOCK_MATCHED)
2342 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2343 ldlm_lock_decref(lockh, mode);
2348 int osc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req,
2349 struct osc_enqueue_args *aa, int rc)
2351 struct ldlm_lock *lock;
2352 struct lustre_handle *lockh = &aa->oa_lockh;
2353 enum ldlm_mode mode = aa->oa_mode;
2354 struct ost_lvb *lvb = aa->oa_lvb;
2355 __u32 lvb_len = sizeof(*lvb);
2360 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2362 lock = ldlm_handle2lock(lockh);
2363 LASSERTF(lock != NULL,
2364 "lockh %#llx, req %p, aa %p - client evicted?\n",
2365 lockh->cookie, req, aa);
2367 /* Take an additional reference so that a blocking AST that
2368 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2369 * to arrive after an upcall has been executed by
2370 * osc_enqueue_fini(). */
2371 ldlm_lock_addref(lockh, mode);
2373 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2374 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2376 /* Let CP AST to grant the lock first. */
2377 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2379 if (aa->oa_speculative) {
2380 LASSERT(aa->oa_lvb == NULL);
2381 LASSERT(aa->oa_flags == NULL);
2382 aa->oa_flags = &flags;
2385 /* Complete obtaining the lock procedure. */
2386 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2387 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2389 /* Complete osc stuff. */
2390 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2391 aa->oa_flags, aa->oa_speculative, rc);
2393 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2395 ldlm_lock_decref(lockh, mode);
2396 LDLM_LOCK_PUT(lock);
2400 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2402 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2403 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2404 * other synchronous requests, however keeping some locks and trying to obtain
2405 * others may take a considerable amount of time in a case of ost failure; and
2406 * when other sync requests do not get released lock from a client, the client
2407 * is evicted from the cluster -- such scenarious make the life difficult, so
2408 * release locks just after they are obtained. */
2409 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2410 __u64 *flags, union ldlm_policy_data *policy,
2411 struct ost_lvb *lvb, int kms_valid,
2412 osc_enqueue_upcall_f upcall, void *cookie,
2413 struct ldlm_enqueue_info *einfo,
2414 struct ptlrpc_request_set *rqset, int async,
2417 struct obd_device *obd = exp->exp_obd;
2418 struct lustre_handle lockh = { 0 };
2419 struct ptlrpc_request *req = NULL;
2420 int intent = *flags & LDLM_FL_HAS_INTENT;
2421 __u64 match_flags = *flags;
2422 enum ldlm_mode mode;
2426 /* Filesystem lock extents are extended to page boundaries so that
2427 * dealing with the page cache is a little smoother. */
2428 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2429 policy->l_extent.end |= ~PAGE_MASK;
2432 * kms is not valid when either object is completely fresh (so that no
2433 * locks are cached), or object was evicted. In the latter case cached
2434 * lock cannot be used, because it would prime inode state with
2435 * potentially stale LVB.
2440 /* Next, search for already existing extent locks that will cover us */
2441 /* If we're trying to read, we also search for an existing PW lock. The
2442 * VFS and page cache already protect us locally, so lots of readers/
2443 * writers can share a single PW lock.
2445 * There are problems with conversion deadlocks, so instead of
2446 * converting a read lock to a write lock, we'll just enqueue a new
2449 * At some point we should cancel the read lock instead of making them
2450 * send us a blocking callback, but there are problems with canceling
2451 * locks out from other users right now, too. */
2452 mode = einfo->ei_mode;
2453 if (einfo->ei_mode == LCK_PR)
2455 /* Normal lock requests must wait for the LVB to be ready before
2456 * matching a lock; speculative lock requests do not need to,
2457 * because they will not actually use the lock. */
2459 match_flags |= LDLM_FL_LVB_READY;
2461 match_flags |= LDLM_FL_BLOCK_GRANTED;
2462 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
2463 einfo->ei_type, policy, mode, &lockh, 0);
2465 struct ldlm_lock *matched;
2467 if (*flags & LDLM_FL_TEST_LOCK)
2470 matched = ldlm_handle2lock(&lockh);
2472 /* This DLM lock request is speculative, and does not
2473 * have an associated IO request. Therefore if there
2474 * is already a DLM lock, it wll just inform the
2475 * caller to cancel the request for this stripe.*/
2476 lock_res_and_lock(matched);
2477 if (ldlm_extent_equal(&policy->l_extent,
2478 &matched->l_policy_data.l_extent))
2482 unlock_res_and_lock(matched);
2484 ldlm_lock_decref(&lockh, mode);
2485 LDLM_LOCK_PUT(matched);
2487 } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
2488 *flags |= LDLM_FL_LVB_READY;
2490 /* We already have a lock, and it's referenced. */
2491 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
2493 ldlm_lock_decref(&lockh, mode);
2494 LDLM_LOCK_PUT(matched);
2497 ldlm_lock_decref(&lockh, mode);
2498 LDLM_LOCK_PUT(matched);
2503 if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
2507 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2508 &RQF_LDLM_ENQUEUE_LVB);
2512 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2514 ptlrpc_request_free(req);
2518 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2520 ptlrpc_request_set_replen(req);
2523 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2524 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2526 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2527 sizeof(*lvb), LVB_T_OST, &lockh, async);
2530 struct osc_enqueue_args *aa;
2531 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2532 aa = ptlrpc_req_async_args(req);
2534 aa->oa_mode = einfo->ei_mode;
2535 aa->oa_type = einfo->ei_type;
2536 lustre_handle_copy(&aa->oa_lockh, &lockh);
2537 aa->oa_upcall = upcall;
2538 aa->oa_cookie = cookie;
2539 aa->oa_speculative = speculative;
2541 aa->oa_flags = flags;
2544 /* speculative locks are essentially to enqueue
2545 * a DLM lock in advance, so we don't care
2546 * about the result of the enqueue. */
2548 aa->oa_flags = NULL;
2551 req->rq_interpret_reply =
2552 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2553 if (rqset == PTLRPCD_SET)
2554 ptlrpcd_add_req(req);
2556 ptlrpc_set_add_req(rqset, req);
2557 } else if (intent) {
2558 ptlrpc_req_finished(req);
2563 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2564 flags, speculative, rc);
2566 ptlrpc_req_finished(req);
2571 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2572 enum ldlm_type type, union ldlm_policy_data *policy,
2573 enum ldlm_mode mode, __u64 *flags, void *data,
2574 struct lustre_handle *lockh, int unref)
2576 struct obd_device *obd = exp->exp_obd;
2577 __u64 lflags = *flags;
2581 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2584 /* Filesystem lock extents are extended to page boundaries so that
2585 * dealing with the page cache is a little smoother */
2586 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2587 policy->l_extent.end |= ~PAGE_MASK;
2589 /* Next, search for already existing extent locks that will cover us */
2590 /* If we're trying to read, we also search for an existing PW lock. The
2591 * VFS and page cache already protect us locally, so lots of readers/
2592 * writers can share a single PW lock. */
2596 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2597 res_id, type, policy, rc, lockh, unref);
2598 if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
2602 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2604 LASSERT(lock != NULL);
2605 if (!osc_set_lock_data(lock, data)) {
2606 ldlm_lock_decref(lockh, rc);
2609 LDLM_LOCK_PUT(lock);
2614 static int osc_statfs_interpret(const struct lu_env *env,
2615 struct ptlrpc_request *req,
2616 struct osc_async_args *aa, int rc)
2618 struct obd_statfs *msfs;
2622 /* The request has in fact never been sent
2623 * due to issues at a higher level (LOV).
2624 * Exit immediately since the caller is
2625 * aware of the problem and takes care
2626 * of the clean up */
2629 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2630 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2636 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2638 GOTO(out, rc = -EPROTO);
2641 *aa->aa_oi->oi_osfs = *msfs;
2643 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2647 static int osc_statfs_async(struct obd_export *exp,
2648 struct obd_info *oinfo, time64_t max_age,
2649 struct ptlrpc_request_set *rqset)
2651 struct obd_device *obd = class_exp2obd(exp);
2652 struct ptlrpc_request *req;
2653 struct osc_async_args *aa;
2657 /* We could possibly pass max_age in the request (as an absolute
2658 * timestamp or a "seconds.usec ago") so the target can avoid doing
2659 * extra calls into the filesystem if that isn't necessary (e.g.
2660 * during mount that would help a bit). Having relative timestamps
2661 * is not so great if request processing is slow, while absolute
2662 * timestamps are not ideal because they need time synchronization. */
2663 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2667 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2669 ptlrpc_request_free(req);
2672 ptlrpc_request_set_replen(req);
2673 req->rq_request_portal = OST_CREATE_PORTAL;
2674 ptlrpc_at_set_req_timeout(req);
2676 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2677 /* procfs requests not want stat in wait for avoid deadlock */
2678 req->rq_no_resend = 1;
2679 req->rq_no_delay = 1;
2682 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2683 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2684 aa = ptlrpc_req_async_args(req);
2687 ptlrpc_set_add_req(rqset, req);
2691 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2692 struct obd_statfs *osfs, time64_t max_age, __u32 flags)
2694 struct obd_device *obd = class_exp2obd(exp);
2695 struct obd_statfs *msfs;
2696 struct ptlrpc_request *req;
2697 struct obd_import *imp = NULL;
2702 /*Since the request might also come from lprocfs, so we need
2703 *sync this with client_disconnect_export Bug15684*/
2704 down_read(&obd->u.cli.cl_sem);
2705 if (obd->u.cli.cl_import)
2706 imp = class_import_get(obd->u.cli.cl_import);
2707 up_read(&obd->u.cli.cl_sem);
2711 /* We could possibly pass max_age in the request (as an absolute
2712 * timestamp or a "seconds.usec ago") so the target can avoid doing
2713 * extra calls into the filesystem if that isn't necessary (e.g.
2714 * during mount that would help a bit). Having relative timestamps
2715 * is not so great if request processing is slow, while absolute
2716 * timestamps are not ideal because they need time synchronization. */
2717 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2719 class_import_put(imp);
2724 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2726 ptlrpc_request_free(req);
2729 ptlrpc_request_set_replen(req);
2730 req->rq_request_portal = OST_CREATE_PORTAL;
2731 ptlrpc_at_set_req_timeout(req);
2733 if (flags & OBD_STATFS_NODELAY) {
2734 /* procfs requests not want stat in wait for avoid deadlock */
2735 req->rq_no_resend = 1;
2736 req->rq_no_delay = 1;
2739 rc = ptlrpc_queue_wait(req);
2743 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2745 GOTO(out, rc = -EPROTO);
2751 ptlrpc_req_finished(req);
2755 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2756 void *karg, void __user *uarg)
2758 struct obd_device *obd = exp->exp_obd;
2759 struct obd_ioctl_data *data = karg;
2763 if (!try_module_get(THIS_MODULE)) {
2764 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2765 module_name(THIS_MODULE));
2769 case OBD_IOC_CLIENT_RECOVER:
2770 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2771 data->ioc_inlbuf1, 0);
2775 case IOC_OSC_SET_ACTIVE:
2776 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2779 case OBD_IOC_PING_TARGET:
2780 err = ptlrpc_obd_ping(obd);
2783 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2784 cmd, current_comm());
2785 GOTO(out, err = -ENOTTY);
2788 module_put(THIS_MODULE);
2792 int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2793 u32 keylen, void *key, u32 vallen, void *val,
2794 struct ptlrpc_request_set *set)
2796 struct ptlrpc_request *req;
2797 struct obd_device *obd = exp->exp_obd;
2798 struct obd_import *imp = class_exp2cliimp(exp);
2803 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2805 if (KEY_IS(KEY_CHECKSUM)) {
2806 if (vallen != sizeof(int))
2808 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2812 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2813 sptlrpc_conf_client_adapt(obd);
2817 if (KEY_IS(KEY_FLUSH_CTX)) {
2818 sptlrpc_import_flush_my_ctx(imp);
2822 if (KEY_IS(KEY_CACHE_SET)) {
2823 struct client_obd *cli = &obd->u.cli;
2825 LASSERT(cli->cl_cache == NULL); /* only once */
2826 cli->cl_cache = (struct cl_client_cache *)val;
2827 cl_cache_incref(cli->cl_cache);
2828 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2830 /* add this osc into entity list */
2831 LASSERT(list_empty(&cli->cl_lru_osc));
2832 spin_lock(&cli->cl_cache->ccc_lru_lock);
2833 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2834 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2839 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2840 struct client_obd *cli = &obd->u.cli;
2841 long nr = atomic_long_read(&cli->cl_lru_in_list) >> 1;
2842 long target = *(long *)val;
2844 nr = osc_lru_shrink(env, cli, min(nr, target), true);
2849 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2852 /* We pass all other commands directly to OST. Since nobody calls osc
2853 methods directly and everybody is supposed to go through LOV, we
2854 assume lov checked invalid values for us.
2855 The only recognised values so far are evict_by_nid and mds_conn.
2856 Even if something bad goes through, we'd get a -EINVAL from OST
2859 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2860 &RQF_OST_SET_GRANT_INFO :
2865 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2866 RCL_CLIENT, keylen);
2867 if (!KEY_IS(KEY_GRANT_SHRINK))
2868 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2869 RCL_CLIENT, vallen);
2870 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2872 ptlrpc_request_free(req);
2876 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2877 memcpy(tmp, key, keylen);
2878 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2881 memcpy(tmp, val, vallen);
2883 if (KEY_IS(KEY_GRANT_SHRINK)) {
2884 struct osc_grant_args *aa;
2887 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2888 aa = ptlrpc_req_async_args(req);
2891 ptlrpc_req_finished(req);
2894 *oa = ((struct ost_body *)val)->oa;
2896 req->rq_interpret_reply = osc_shrink_grant_interpret;
2899 ptlrpc_request_set_replen(req);
2900 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2901 LASSERT(set != NULL);
2902 ptlrpc_set_add_req(set, req);
2903 ptlrpc_check_set(NULL, set);
2905 ptlrpcd_add_req(req);
2910 EXPORT_SYMBOL(osc_set_info_async);
2912 int osc_reconnect(const struct lu_env *env, struct obd_export *exp,
2913 struct obd_device *obd, struct obd_uuid *cluuid,
2914 struct obd_connect_data *data, void *localdata)
2916 struct client_obd *cli = &obd->u.cli;
2918 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
2922 spin_lock(&cli->cl_loi_list_lock);
2923 grant = cli->cl_avail_grant + cli->cl_reserved_grant;
2924 if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
2925 grant += cli->cl_dirty_grant;
2927 grant += cli->cl_dirty_pages << PAGE_SHIFT;
2928 data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
2929 lost_grant = cli->cl_lost_grant;
2930 cli->cl_lost_grant = 0;
2931 spin_unlock(&cli->cl_loi_list_lock);
2933 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
2934 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
2935 data->ocd_version, data->ocd_grant, lost_grant);
2940 EXPORT_SYMBOL(osc_reconnect);
2942 int osc_disconnect(struct obd_export *exp)
2944 struct obd_device *obd = class_exp2obd(exp);
2947 rc = client_disconnect_export(exp);
2949 * Initially we put del_shrink_grant before disconnect_export, but it
2950 * causes the following problem if setup (connect) and cleanup
2951 * (disconnect) are tangled together.
2952 * connect p1 disconnect p2
2953 * ptlrpc_connect_import
2954 * ............... class_manual_cleanup
2957 * ptlrpc_connect_interrupt
2959 * add this client to shrink list
2961 * Bang! pinger trigger the shrink.
2962 * So the osc should be disconnected from the shrink list, after we
2963 * are sure the import has been destroyed. BUG18662
2965 if (obd->u.cli.cl_import == NULL)
2966 osc_del_shrink_grant(&obd->u.cli);
2969 EXPORT_SYMBOL(osc_disconnect);
2971 int osc_ldlm_resource_invalidate(struct cfs_hash *hs, struct cfs_hash_bd *bd,
2972 struct hlist_node *hnode, void *arg)
2974 struct lu_env *env = arg;
2975 struct ldlm_resource *res = cfs_hash_object(hs, hnode);
2976 struct ldlm_lock *lock;
2977 struct osc_object *osc = NULL;
2981 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
2982 if (lock->l_ast_data != NULL && osc == NULL) {
2983 osc = lock->l_ast_data;
2984 cl_object_get(osc2cl(osc));
2987 /* clear LDLM_FL_CLEANED flag to make sure it will be canceled
2988 * by the 2nd round of ldlm_namespace_clean() call in
2989 * osc_import_event(). */
2990 ldlm_clear_cleaned(lock);
2995 osc_object_invalidate(env, osc);
2996 cl_object_put(env, osc2cl(osc));
3001 EXPORT_SYMBOL(osc_ldlm_resource_invalidate);
3003 static int osc_import_event(struct obd_device *obd,
3004 struct obd_import *imp,
3005 enum obd_import_event event)
3007 struct client_obd *cli;
3011 LASSERT(imp->imp_obd == obd);
3014 case IMP_EVENT_DISCON: {
3016 spin_lock(&cli->cl_loi_list_lock);
3017 cli->cl_avail_grant = 0;
3018 cli->cl_lost_grant = 0;
3019 spin_unlock(&cli->cl_loi_list_lock);
3022 case IMP_EVENT_INACTIVE: {
3023 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE);
3026 case IMP_EVENT_INVALIDATE: {
3027 struct ldlm_namespace *ns = obd->obd_namespace;
3031 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3033 env = cl_env_get(&refcheck);
3035 osc_io_unplug(env, &obd->u.cli, NULL);
3037 cfs_hash_for_each_nolock(ns->ns_rs_hash,
3038 osc_ldlm_resource_invalidate,
3040 cl_env_put(env, &refcheck);
3042 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3047 case IMP_EVENT_ACTIVE: {
3048 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE);
3051 case IMP_EVENT_OCD: {
3052 struct obd_connect_data *ocd = &imp->imp_connect_data;
3054 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3055 osc_init_grant(&obd->u.cli, ocd);
3058 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3059 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3061 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
3064 case IMP_EVENT_DEACTIVATE: {
3065 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE);
3068 case IMP_EVENT_ACTIVATE: {
3069 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE);
3073 CERROR("Unknown import event %d\n", event);
3080 * Determine whether the lock can be canceled before replaying the lock
3081 * during recovery, see bug16774 for detailed information.
3083 * \retval zero the lock can't be canceled
3084 * \retval other ok to cancel
3086 static int osc_cancel_weight(struct ldlm_lock *lock)
3089 * Cancel all unused and granted extent lock.
3091 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3092 lock->l_granted_mode == lock->l_req_mode &&
3093 osc_ldlm_weigh_ast(lock) == 0)
3099 static int brw_queue_work(const struct lu_env *env, void *data)
3101 struct client_obd *cli = data;
3103 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3105 osc_io_unplug(env, cli, NULL);
3109 int osc_setup_common(struct obd_device *obd, struct lustre_cfg *lcfg)
3111 struct client_obd *cli = &obd->u.cli;
3117 rc = ptlrpcd_addref();
3121 rc = client_obd_setup(obd, lcfg);
3123 GOTO(out_ptlrpcd, rc);
3126 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3127 if (IS_ERR(handler))
3128 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3129 cli->cl_writeback_work = handler;
3131 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3132 if (IS_ERR(handler))
3133 GOTO(out_ptlrpcd_work, rc = PTR_ERR(handler));
3134 cli->cl_lru_work = handler;
3136 rc = osc_quota_setup(obd);
3138 GOTO(out_ptlrpcd_work, rc);
3140 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3142 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3146 if (cli->cl_writeback_work != NULL) {
3147 ptlrpcd_destroy_work(cli->cl_writeback_work);
3148 cli->cl_writeback_work = NULL;
3150 if (cli->cl_lru_work != NULL) {
3151 ptlrpcd_destroy_work(cli->cl_lru_work);
3152 cli->cl_lru_work = NULL;
3154 client_obd_cleanup(obd);
3159 EXPORT_SYMBOL(osc_setup_common);
3161 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3163 struct client_obd *cli = &obd->u.cli;
3171 rc = osc_setup_common(obd, lcfg);
3175 rc = osc_tunables_init(obd);
3180 * We try to control the total number of requests with a upper limit
3181 * osc_reqpool_maxreqcount. There might be some race which will cause
3182 * over-limit allocation, but it is fine.
3184 req_count = atomic_read(&osc_pool_req_count);
3185 if (req_count < osc_reqpool_maxreqcount) {
3186 adding = cli->cl_max_rpcs_in_flight + 2;
3187 if (req_count + adding > osc_reqpool_maxreqcount)
3188 adding = osc_reqpool_maxreqcount - req_count;
3190 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3191 atomic_add(added, &osc_pool_req_count);
3194 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3195 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
3197 spin_lock(&osc_shrink_lock);
3198 list_add_tail(&cli->cl_shrink_list, &osc_shrink_list);
3199 spin_unlock(&osc_shrink_lock);
3200 cli->cl_import->imp_idle_timeout = osc_idle_timeout;
3205 int osc_precleanup_common(struct obd_device *obd)
3207 struct client_obd *cli = &obd->u.cli;
3211 * for echo client, export may be on zombie list, wait for
3212 * zombie thread to cull it, because cli.cl_import will be
3213 * cleared in client_disconnect_export():
3214 * class_export_destroy() -> obd_cleanup() ->
3215 * echo_device_free() -> echo_client_cleanup() ->
3216 * obd_disconnect() -> osc_disconnect() ->
3217 * client_disconnect_export()
3219 obd_zombie_barrier();
3220 if (cli->cl_writeback_work) {
3221 ptlrpcd_destroy_work(cli->cl_writeback_work);
3222 cli->cl_writeback_work = NULL;
3225 if (cli->cl_lru_work) {
3226 ptlrpcd_destroy_work(cli->cl_lru_work);
3227 cli->cl_lru_work = NULL;
3230 obd_cleanup_client_import(obd);
3233 EXPORT_SYMBOL(osc_precleanup_common);
3235 static int osc_precleanup(struct obd_device *obd)
3239 osc_precleanup_common(obd);
3241 ptlrpc_lprocfs_unregister_obd(obd);
3245 int osc_cleanup_common(struct obd_device *obd)
3247 struct client_obd *cli = &obd->u.cli;
3252 spin_lock(&osc_shrink_lock);
3253 list_del(&cli->cl_shrink_list);
3254 spin_unlock(&osc_shrink_lock);
3257 if (cli->cl_cache != NULL) {
3258 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3259 spin_lock(&cli->cl_cache->ccc_lru_lock);
3260 list_del_init(&cli->cl_lru_osc);
3261 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3262 cli->cl_lru_left = NULL;
3263 cl_cache_decref(cli->cl_cache);
3264 cli->cl_cache = NULL;
3267 /* free memory of osc quota cache */
3268 osc_quota_cleanup(obd);
3270 rc = client_obd_cleanup(obd);
3275 EXPORT_SYMBOL(osc_cleanup_common);
3277 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3279 ssize_t count = class_modify_config(lcfg, PARAM_OSC,
3280 &obd->obd_kset.kobj);
3281 return count > 0 ? 0 : count;
3284 static int osc_process_config(struct obd_device *obd, size_t len, void *buf)
3286 return osc_process_config_base(obd, buf);
3289 static struct obd_ops osc_obd_ops = {
3290 .o_owner = THIS_MODULE,
3291 .o_setup = osc_setup,
3292 .o_precleanup = osc_precleanup,
3293 .o_cleanup = osc_cleanup_common,
3294 .o_add_conn = client_import_add_conn,
3295 .o_del_conn = client_import_del_conn,
3296 .o_connect = client_connect_import,
3297 .o_reconnect = osc_reconnect,
3298 .o_disconnect = osc_disconnect,
3299 .o_statfs = osc_statfs,
3300 .o_statfs_async = osc_statfs_async,
3301 .o_create = osc_create,
3302 .o_destroy = osc_destroy,
3303 .o_getattr = osc_getattr,
3304 .o_setattr = osc_setattr,
3305 .o_iocontrol = osc_iocontrol,
3306 .o_set_info_async = osc_set_info_async,
3307 .o_import_event = osc_import_event,
3308 .o_process_config = osc_process_config,
3309 .o_quotactl = osc_quotactl,
3312 static struct shrinker *osc_cache_shrinker;
3313 struct list_head osc_shrink_list = LIST_HEAD_INIT(osc_shrink_list);
3314 DEFINE_SPINLOCK(osc_shrink_lock);
3316 #ifndef HAVE_SHRINKER_COUNT
3317 static int osc_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
3319 struct shrink_control scv = {
3320 .nr_to_scan = shrink_param(sc, nr_to_scan),
3321 .gfp_mask = shrink_param(sc, gfp_mask)
3323 #if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
3324 struct shrinker *shrinker = NULL;
3327 (void)osc_cache_shrink_scan(shrinker, &scv);
3329 return osc_cache_shrink_count(shrinker, &scv);
3333 static int __init osc_init(void)
3335 bool enable_proc = true;
3336 struct obd_type *type;
3337 unsigned int reqpool_size;
3338 unsigned int reqsize;
3340 DEF_SHRINKER_VAR(osc_shvar, osc_cache_shrink,
3341 osc_cache_shrink_count, osc_cache_shrink_scan);
3344 /* print an address of _any_ initialized kernel symbol from this
3345 * module, to allow debugging with gdb that doesn't support data
3346 * symbols from modules.*/
3347 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3349 rc = lu_kmem_init(osc_caches);
3353 type = class_search_type(LUSTRE_OSP_NAME);
3354 if (type != NULL && type->typ_procsym != NULL)
3355 enable_proc = false;
3357 rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
3358 LUSTRE_OSC_NAME, &osc_device_type);
3362 osc_cache_shrinker = set_shrinker(DEFAULT_SEEKS, &osc_shvar);
3364 /* This is obviously too much memory, only prevent overflow here */
3365 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
3366 GOTO(out_type, rc = -EINVAL);
3368 reqpool_size = osc_reqpool_mem_max << 20;
3371 while (reqsize < OST_IO_MAXREQSIZE)
3372 reqsize = reqsize << 1;
3375 * We don't enlarge the request count in OSC pool according to
3376 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3377 * tried after normal allocation failed. So a small OSC pool won't
3378 * cause much performance degression in most of cases.
3380 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3382 atomic_set(&osc_pool_req_count, 0);
3383 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
3384 ptlrpc_add_rqs_to_pool);
3386 if (osc_rq_pool != NULL)
3390 class_unregister_type(LUSTRE_OSC_NAME);
3392 lu_kmem_fini(osc_caches);
3397 static void __exit osc_exit(void)
3399 remove_shrinker(osc_cache_shrinker);
3400 class_unregister_type(LUSTRE_OSC_NAME);
3401 lu_kmem_fini(osc_caches);
3402 ptlrpc_free_rq_pool(osc_rq_pool);
3405 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3406 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3407 MODULE_VERSION(LUSTRE_VERSION_STRING);
3408 MODULE_LICENSE("GPL");
3410 module_init(osc_init);
3411 module_exit(osc_exit);